/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Calc;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexProgram;
import org.apache.calcite.rex.RexProgramBuilder;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.mapping.Mapping;
import org.apache.calcite.util.mapping.Mappings;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef;
import org.apache.flink.table.planner.typeutils.RowTypeUtils;
import org.apache.flink.util.Preconditions;

public class WatermarkAssignerChangelogNormalizeTransposeRule
extends RelRule<Config> {
    public static final RelOptRule WITH_CALC = Config.EMPTY.withDescription("WatermarkAssignerChangelogNormalizeTransposeRuleWithCalc").as(Config.class).withCalc().toRule();
    public static final RelOptRule WITHOUT_CALC = Config.EMPTY.withDescription("WatermarkAssignerChangelogNormalizeTransposeRuleWithoutCalc").as(Config.class).withoutCalc().toRule();

    public WatermarkAssignerChangelogNormalizeTransposeRule(Config config) {
        super(config);
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        RelNode newTree;
        StreamPhysicalWatermarkAssigner watermark = (StreamPhysicalWatermarkAssigner)call.rel(0);
        Object node = call.rel(1);
        if (node instanceof StreamPhysicalCalc) {
            boolean shuffleKeysAreKeptByCalc;
            StreamPhysicalCalc calc = (StreamPhysicalCalc)call.rel(1);
            StreamPhysicalChangelogNormalize changelogNormalize = (StreamPhysicalChangelogNormalize)call.rel(2);
            StreamPhysicalExchange exchange = (StreamPhysicalExchange)call.rel(3);
            Mappings.TargetMapping calcMapping = this.buildMapping(calc.getProgram());
            RelDistribution exchangeDistribution = exchange.getDistribution();
            RelDistribution newExchangeDistribution = exchangeDistribution.apply(calcMapping);
            boolean bl = shuffleKeysAreKeptByCalc = newExchangeDistribution.getType() == exchangeDistribution.getType() && newExchangeDistribution.getKeys().size() == exchangeDistribution.getKeys().size();
            newTree = shuffleKeysAreKeptByCalc ? this.pushDownOriginalWatermarkAndCalc(watermark, calc, changelogNormalize, exchange, newExchangeDistribution) : this.pushDownTransformedWatermarkAndCalc(watermark, calc, changelogNormalize, exchange, exchangeDistribution.getKeys(), calcMapping);
        } else if (node instanceof StreamPhysicalChangelogNormalize) {
            StreamPhysicalChangelogNormalize changelogNormalize = (StreamPhysicalChangelogNormalize)call.rel(1);
            StreamPhysicalExchange exchange = (StreamPhysicalExchange)call.rel(2);
            newTree = this.buildTreeInOrder(exchange.getInput(), Tuple2.of((Object)watermark, (Object)watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), Tuple2.of((Object)exchange, (Object)exchange.getTraitSet()), Tuple2.of((Object)changelogNormalize, (Object)changelogNormalize.getTraitSet()));
        } else {
            throw new IllegalStateException(this.getClass().getName() + " matches a wrong relation tree: " + RelOptUtil.toString(watermark));
        }
        call.transformTo(newTree);
    }

    private RelNode pushDownOriginalWatermarkAndCalc(StreamPhysicalWatermarkAssigner watermark, StreamPhysicalCalc calc, StreamPhysicalChangelogNormalize changelogNormalize, StreamPhysicalExchange exchange, RelDistribution newExchangeDistribution) {
        return this.buildTreeInOrder(exchange.getInput(), Tuple2.of((Object)calc, (Object)calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), Tuple2.of((Object)watermark, (Object)watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), Tuple2.of((Object)exchange, (Object)exchange.getTraitSet().plus(newExchangeDistribution)), Tuple2.of((Object)changelogNormalize, (Object)changelogNormalize.getTraitSet().plus(newExchangeDistribution)));
    }

    private RelNode pushDownTransformedWatermarkAndCalc(StreamPhysicalWatermarkAssigner watermark, StreamPhysicalCalc calc, StreamPhysicalChangelogNormalize changelogNormalize, StreamPhysicalExchange exchange, List<Integer> completeShuffleKeys, Mappings.TargetMapping calcMapping) {
        ArrayList<Integer> projectedOutShuffleKeys = new ArrayList<Integer>();
        for (Integer key : completeShuffleKeys) {
            int targetIdx = calcMapping.getTargetOpt(key);
            if (targetIdx >= 0) continue;
            projectedOutShuffleKeys.add(key);
        }
        RexBuilder rexBuilder = calc.getCluster().getRexBuilder();
        RexProgram newPushDownProgram = this.createTransformedProgramWithAllShuffleKeys(calc.getProgram(), projectedOutShuffleKeys, rexBuilder);
        if (newPushDownProgram.isPermutation()) {
            return this.pushDownTransformedWatermark(watermark, calc, changelogNormalize, exchange, calcMapping, rexBuilder);
        }
        return this.pushDownTransformedWatermarkAndCalc(newPushDownProgram, watermark, exchange, changelogNormalize, calc);
    }

    private RexProgram createTransformedProgramWithAllShuffleKeys(RexProgram program, List<Integer> projectsOutShuffleKeys, RexBuilder rexBuilder) {
        RelDataType oldInputRowType = program.getInputRowType();
        ArrayList<String> visitedProjectNames = new ArrayList<String>();
        RexProgramBuilder newProgramBuilder = new RexProgramBuilder(oldInputRowType, rexBuilder);
        program.getNamedProjects().forEach(pair -> {
            newProgramBuilder.addProject(program.expandLocalRef((RexLocalRef)pair.left), (String)pair.right);
            visitedProjectNames.add((String)pair.right);
        });
        List<RelDataTypeField> oldFieldList = oldInputRowType.getFieldList();
        for (Integer projectsOutShuffleKey : projectsOutShuffleKeys) {
            RelDataTypeField oldField = oldFieldList.get(projectsOutShuffleKey);
            String oldFieldName = oldField.getName();
            String newProjectName = RowTypeUtils.getUniqueName(oldFieldName, visitedProjectNames);
            newProgramBuilder.addProject(new RexInputRef(projectsOutShuffleKey, oldField.getType()), newProjectName);
            visitedProjectNames.add(newProjectName);
        }
        if (program.getCondition() != null) {
            newProgramBuilder.addCondition(program.expandLocalRef(program.getCondition()));
        }
        return newProgramBuilder.getProgram();
    }

    private RelNode pushDownTransformedWatermarkAndCalc(RexProgram newPushDownProgram, StreamPhysicalWatermarkAssigner watermark, StreamPhysicalExchange exchange, StreamPhysicalChangelogNormalize changelogNormalize, StreamPhysicalCalc calc) {
        Calc pushDownCalc = calc.copy(calc.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), exchange.getInput(), newPushDownProgram);
        Mappings.TargetMapping mappingOfPushDownCalc = this.buildMapping(newPushDownProgram);
        RelDistribution newDistribution = exchange.getDistribution().apply(mappingOfPushDownCalc);
        RelNode newChangelogNormalize = this.buildTreeInOrder(pushDownCalc, Tuple2.of((Object)watermark, (Object)watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT())), Tuple2.of((Object)exchange, (Object)exchange.getTraitSet().plus(newDistribution)), Tuple2.of((Object)changelogNormalize, (Object)changelogNormalize.getTraitSet().plus(newDistribution)));
        List<String> newInputFieldNames = newChangelogNormalize.getRowType().getFieldNames();
        RexProgramBuilder topProgramBuilder = new RexProgramBuilder(newChangelogNormalize.getRowType(), changelogNormalize.getCluster().getRexBuilder());
        for (int fieldIdx = 0; fieldIdx < calc.getRowType().getFieldCount(); ++fieldIdx) {
            topProgramBuilder.addProject(RexInputRef.of(fieldIdx, newChangelogNormalize.getRowType()), newInputFieldNames.get(fieldIdx));
        }
        RexProgram topProgram = topProgramBuilder.getProgram();
        return calc.copy(calc.getTraitSet(), newChangelogNormalize, topProgram);
    }

    private RelNode pushDownTransformedWatermark(StreamPhysicalWatermarkAssigner watermark, StreamPhysicalCalc calc, StreamPhysicalChangelogNormalize changelogNormalize, StreamPhysicalExchange exchange, Mappings.TargetMapping calcMapping, RexBuilder rexBuilder) {
        Mapping inversedMapping = calcMapping.inverse();
        final int newRowTimeFieldIndex = inversedMapping.getTargetOpt(watermark.rowtimeFieldIndex());
        RexNode newWatermarkExpr = watermark.watermarkExpr();
        if (watermark.watermarkExpr() != null) {
            newWatermarkExpr = RexUtil.apply((Mappings.TargetMapping)inversedMapping, watermark.watermarkExpr());
        }
        RelNode newWatermark = watermark.copy(watermark.getTraitSet().plus(FlinkRelDistribution.DEFAULT()), exchange.getInput(), newRowTimeFieldIndex, newWatermarkExpr);
        final RelNode newChangelogNormalize = this.buildTreeInOrder(newWatermark, Tuple2.of((Object)exchange, (Object)exchange.getTraitSet()), Tuple2.of((Object)changelogNormalize, (Object)changelogNormalize.getTraitSet()));
        RexProgram oldProgram = calc.getProgram();
        RexProgramBuilder programBuilder = new RexProgramBuilder(newChangelogNormalize.getRowType(), rexBuilder);
        Function<RexNode, RexNode> rexShuttle = e -> e.accept(new RexShuttle(){

            @Override
            public RexNode visitInputRef(RexInputRef inputRef) {
                if (inputRef.getIndex() == newRowTimeFieldIndex) {
                    return RexInputRef.of(newRowTimeFieldIndex, newChangelogNormalize.getRowType());
                }
                return inputRef;
            }
        });
        oldProgram.getNamedProjects().forEach(pair -> programBuilder.addProject((RexNode)rexShuttle.apply(oldProgram.expandLocalRef((RexLocalRef)pair.left)), (String)pair.right));
        if (oldProgram.getCondition() != null) {
            programBuilder.addCondition(rexShuttle.apply(oldProgram.expandLocalRef(oldProgram.getCondition())));
        }
        RexProgram newProgram = programBuilder.getProgram();
        return calc.copy(calc.getTraitSet(), newChangelogNormalize, newProgram);
    }

    private Mappings.TargetMapping buildMapping(RexProgram program) {
        HashMap<Integer, Integer> mapInToOutPos = new HashMap<Integer, Integer>();
        List<RexLocalRef> projects = program.getProjectList();
        for (int idx = 0; idx < projects.size(); ++idx) {
            RexNode rexNode = program.expandLocalRef(projects.get(idx));
            if (!(rexNode instanceof RexInputRef)) continue;
            mapInToOutPos.put(((RexInputRef)rexNode).getIndex(), idx);
        }
        return Mappings.target(mapInToOutPos, program.getInputRowType().getFieldCount(), program.getOutputRowType().getFieldCount());
    }

    @SafeVarargs
    private final RelNode buildTreeInOrder(RelNode leafNode, Tuple2<RelNode, RelTraitSet> ... nodeAndTraits) {
        Preconditions.checkArgument((nodeAndTraits.length >= 1 ? 1 : 0) != 0);
        RelNode inputNode = leafNode;
        RelNode currentNode = null;
        for (Tuple2<RelNode, RelTraitSet> nodeAndTrait : nodeAndTraits) {
            currentNode = (RelNode)nodeAndTrait.f0;
            currentNode = currentNode instanceof StreamPhysicalExchange ? ((StreamPhysicalExchange)currentNode).copy((RelTraitSet)nodeAndTrait.f1, inputNode, ((RelTraitSet)nodeAndTrait.f1).getTrait(FlinkRelDistributionTraitDef.INSTANCE())) : currentNode.copy((RelTraitSet)nodeAndTrait.f1, Collections.singletonList(inputNode));
            inputNode = currentNode;
        }
        return currentNode;
    }

    public static interface Config
    extends RelRule.Config {
        @Override
        default public WatermarkAssignerChangelogNormalizeTransposeRule toRule() {
            return new WatermarkAssignerChangelogNormalizeTransposeRule(this);
        }

        default public Config withCalc() {
            return this.withOperandSupplier(b0 -> b0.operand(StreamPhysicalWatermarkAssigner.class).oneInput(b1 -> b1.operand(StreamPhysicalCalc.class).oneInput(b2 -> b2.operand(StreamPhysicalChangelogNormalize.class).oneInput(b3 -> b3.operand(StreamPhysicalExchange.class).anyInputs())))).as(Config.class);
        }

        default public Config withoutCalc() {
            return this.withOperandSupplier(b0 -> b0.operand(StreamPhysicalWatermarkAssigner.class).oneInput(b1 -> b1.operand(StreamPhysicalChangelogNormalize.class).oneInput(b2 -> b2.operand(StreamPhysicalExchange.class).anyInputs()))).as(Config.class);
        }
    }
}

