/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.functions.python.PythonFunctionKind;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAggregateBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.assigners.CountSlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.CountTumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SessionWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonIgnoreProperties(ignoreUnknown=true)
public class StreamExecPythonGroupWindowAggregate
extends StreamExecAggregateBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class);
    private static final String ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream.StreamArrowPythonGroupWindowAggregateFunctionOperator";
    private static final String GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.aggregate.PythonStreamGroupWindowAggregateOperator";
    public static final String FIELD_NAME_WINDOW = "window";
    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
    @JsonProperty(value="grouping")
    private final int[] grouping;
    @JsonProperty(value="aggCalls")
    private final AggregateCall[] aggCalls;
    @JsonProperty(value="window")
    @JsonSerialize(using=LogicalWindowJsonSerializer.class)
    @JsonDeserialize(using=LogicalWindowJsonDeserializer.class)
    private final LogicalWindow window;
    @JsonProperty(value="namedWindowProperties")
    private final PlannerNamedWindowProperty[] namedWindowProperties;
    @JsonProperty(value="needRetraction")
    private final boolean needRetraction;
    @JsonProperty(value="generateUpdateBefore")
    private final boolean generateUpdateBefore;

    public StreamExecPythonGroupWindowAggregate(int[] grouping, AggregateCall[] aggCalls, LogicalWindow window, PlannerNamedWindowProperty[] namedWindowProperties, boolean generateUpdateBefore, boolean needRetraction, InputProperty inputProperty, RowType outputType, String description) {
        this(grouping, aggCalls, window, namedWindowProperties, generateUpdateBefore, needRetraction, StreamExecPythonGroupWindowAggregate.getNewNodeId(), Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecPythonGroupWindowAggregate(@JsonProperty(value="grouping") int[] grouping, @JsonProperty(value="aggCalls") AggregateCall[] aggCalls, @JsonProperty(value="window") LogicalWindow window, @JsonProperty(value="namedWindowProperties") PlannerNamedWindowProperty[] namedWindowProperties, @JsonProperty(value="generateUpdateBefore") boolean generateUpdateBefore, @JsonProperty(value="needRetraction") boolean needRetraction, @JsonProperty(value="id") int id, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.grouping = (int[])Preconditions.checkNotNull((Object)grouping);
        this.aggCalls = (AggregateCall[])Preconditions.checkNotNull((Object)aggCalls);
        this.window = (LogicalWindow)Preconditions.checkNotNull((Object)window);
        this.namedWindowProperties = (PlannerNamedWindowProperty[])Preconditions.checkNotNull((Object)namedWindowProperties);
        this.generateUpdateBefore = generateUpdateBefore;
        this.needRetraction = needRetraction;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        OneInputTransformation<RowData, RowData> transform;
        int inputTimeFieldIndex;
        boolean isCountWindow = this.window instanceof TumblingGroupWindow ? AggregateUtil.hasRowIntervalType(((TumblingGroupWindow)this.window).size()) : (this.window instanceof SlidingGroupWindow ? AggregateUtil.hasRowIntervalType(((SlidingGroupWindow)this.window).size()) : false);
        TableConfig tableConfig = planner.getTableConfig();
        if (isCountWindow && this.grouping.length > 0 && tableConfig.getMinIdleStateRetentionTime() < 0L) {
            LOGGER.warn("No state retention interval configured for a query which accumulates state. Please provide a query configuration with valid retention interval to prevent excessive state size. You may specify a retention time of 0 to not clean up the state.");
        }
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        RowType outputRowType = InternalTypeInfo.of((LogicalType)this.getOutputType()).toRowType();
        if (AggregateUtil.isRowtimeAttribute(this.window.timeAttribute())) {
            inputTimeFieldIndex = AggregateUtil.timeFieldIndex(FlinkTypeFactory.INSTANCE().buildRelNodeRowType(inputRowType), planner.getRelBuilder(), this.window.timeAttribute());
            if (inputTimeFieldIndex < 0) {
                throw new TableException("Group window must defined on a time attribute, but the time attribute can't be found.\nThis should never happen. Please file an issue.");
            }
        } else {
            inputTimeFieldIndex = -1;
        }
        ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone((LogicalType)this.window.timeAttribute().getOutputDataType().getLogicalType(), (TableConfig)tableConfig);
        Tuple2<WindowAssigner<?>, Trigger<?>> windowAssignerAndTrigger = this.generateWindowAssignerAndTrigger();
        WindowAssigner windowAssigner = (WindowAssigner)windowAssignerAndTrigger.f0;
        Trigger trigger = (Trigger)windowAssignerAndTrigger.f1;
        Configuration config = CommonPythonUtil.getMergedConfig(planner.getExecEnv(), tableConfig);
        boolean isGeneralPythonUDAF = Arrays.stream(this.aggCalls).anyMatch(x -> PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
        WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(tableConfig, this.window);
        if (isGeneralPythonUDAF) {
            boolean[] aggCallNeedRetractions = new boolean[this.aggCalls.length];
            Arrays.fill(aggCallNeedRetractions, this.needRetraction);
            AggregateInfoList aggInfoList = AggregateUtil.transformToStreamAggregateInfoList(inputRowType, JavaScalaConversionUtil.toScala(Arrays.asList(this.aggCalls)), aggCallNeedRetractions, this.needRetraction, true, true);
            transform = this.createGeneralPythonStreamWindowGroupOneInputTransformation(inputTransform, inputRowType, outputRowType, inputTimeFieldIndex, windowAssigner, aggInfoList, emitStrategy.getAllowLateness(), config, shiftTimeZone);
        } else {
            transform = this.createPandasPythonStreamWindowGroupOneInputTransformation(inputTransform, inputRowType, outputRowType, inputTimeFieldIndex, windowAssigner, trigger, emitStrategy.getAllowLateness(), config, shiftTimeZone);
        }
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) {
            transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(this.grouping, (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)inputRowType));
        transform.setStateKeySelector((KeySelector)selector);
        transform.setStateKeyType((TypeInformation)selector.getProducedType());
        return transform;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private Tuple2<WindowAssigner<?>, Trigger<?>> generateWindowAssignerAndTrigger() {
        ProcessingTimeTriggers.AfterEndOfWindow trigger;
        TumblingWindowAssigner windowAssiger;
        if (this.window instanceof TumblingGroupWindow) {
            TumblingGroupWindow tumblingWindow = (TumblingGroupWindow)this.window;
            FieldReferenceExpression timeField = tumblingWindow.timeField();
            ValueLiteralExpression size = tumblingWindow.size();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                windowAssiger = TumblingWindowAssigner.of((Duration)AggregateUtil.toDuration(size)).withProcessingTime();
                trigger = ProcessingTimeTriggers.afterEndOfWindow();
                return Tuple2.of((Object)windowAssiger, (Object)trigger);
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                windowAssiger = TumblingWindowAssigner.of((Duration)AggregateUtil.toDuration(size)).withEventTime();
                trigger = EventTimeTriggers.afterEndOfWindow();
                return Tuple2.of((Object)windowAssiger, (Object)trigger);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                windowAssiger = CountTumblingWindowAssigner.of((long)AggregateUtil.toLong(size));
                trigger = ElementTriggers.count((long)AggregateUtil.toLong(size));
            }
            return Tuple2.of((Object)windowAssiger, (Object)trigger);
        } else if (this.window instanceof SlidingGroupWindow) {
            SlidingGroupWindow slidingWindow = (SlidingGroupWindow)this.window;
            FieldReferenceExpression timeField = slidingWindow.timeField();
            ValueLiteralExpression size = slidingWindow.size();
            ValueLiteralExpression slide = slidingWindow.slide();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                windowAssiger = SlidingWindowAssigner.of((Duration)AggregateUtil.toDuration(size), (Duration)AggregateUtil.toDuration(slide)).withProcessingTime();
                trigger = ProcessingTimeTriggers.afterEndOfWindow();
                return Tuple2.of((Object)windowAssiger, (Object)trigger);
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                windowAssiger = SlidingWindowAssigner.of((Duration)AggregateUtil.toDuration(size), (Duration)AggregateUtil.toDuration(slide));
                trigger = EventTimeTriggers.afterEndOfWindow();
                return Tuple2.of((Object)windowAssiger, (Object)trigger);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                windowAssiger = CountSlidingWindowAssigner.of((long)AggregateUtil.toLong(size), (long)AggregateUtil.toLong(slide));
                trigger = ElementTriggers.count((long)AggregateUtil.toLong(size));
            }
            return Tuple2.of((Object)windowAssiger, (Object)trigger);
        } else {
            if (!(this.window instanceof SessionGroupWindow)) throw new TableException("Unsupported window: " + this.window.toString());
            SessionGroupWindow sessionWindow = (SessionGroupWindow)this.window;
            FieldReferenceExpression timeField = sessionWindow.timeField();
            ValueLiteralExpression gap = sessionWindow.gap();
            if (AggregateUtil.isProctimeAttribute(timeField)) {
                windowAssiger = SessionWindowAssigner.withGap((Duration)AggregateUtil.toDuration(gap));
                trigger = ProcessingTimeTriggers.afterEndOfWindow();
                return Tuple2.of((Object)windowAssiger, (Object)trigger);
            } else {
                if (!AggregateUtil.isRowtimeAttribute(timeField)) throw new UnsupportedOperationException("This should not happen.");
                windowAssiger = SessionWindowAssigner.withGap((Duration)AggregateUtil.toDuration(gap));
                trigger = EventTimeTriggers.afterEndOfWindow();
            }
        }
        return Tuple2.of((Object)windowAssiger, (Object)trigger);
    }

    private OneInputTransformation<RowData, RowData> createPandasPythonStreamWindowGroupOneInputTransformation(Transformation<RowData> inputTransform, RowType inputRowType, RowType outputRowType, int inputTimeFieldIndex, WindowAssigner<?> windowAssigner, Trigger<?> trigger, long allowance, Configuration config, ZoneId shiftTimeZone) {
        Tuple2<int[], PythonFunctionInfo[]> aggInfos = CommonPythonUtil.extractPythonAggregateFunctionInfosFromAggregateCall(this.aggCalls);
        int[] pythonUdafInputOffsets = (int[])aggInfos.f0;
        PythonFunctionInfo[] pythonFunctionInfos = (PythonFunctionInfo[])aggInfos.f1;
        OneInputStreamOperator<RowData, RowData> pythonOperator = this.getPandasPythonStreamGroupWindowAggregateFunctionOperator(config, inputRowType, outputRowType, windowAssigner, trigger, allowance, inputTimeFieldIndex, pythonUdafInputOffsets, pythonFunctionInfos, shiftTimeZone);
        return new OneInputTransformation(inputTransform, this.getDescription(), pythonOperator, (TypeInformation)InternalTypeInfo.of((RowType)outputRowType), inputTransform.getParallelism());
    }

    private OneInputTransformation<RowData, RowData> createGeneralPythonStreamWindowGroupOneInputTransformation(Transformation<RowData> inputTransform, RowType inputRowType, RowType outputRowType, int inputTimeFieldIndex, WindowAssigner<?> windowAssigner, AggregateInfoList aggInfoList, long allowance, Configuration config, ZoneId shiftTimeZone) {
        int inputCountIndex = aggInfoList.getIndexOfCountStar();
        boolean countStarInserted = aggInfoList.countStarInserted();
        Tuple2<PythonAggregateFunctionInfo[], DataViewUtils.DataViewSpec[][]> aggInfosAndDataViewSpecs = CommonPythonUtil.extractPythonAggregateFunctionInfos(aggInfoList, this.aggCalls);
        PythonAggregateFunctionInfo[] pythonFunctionInfos = (PythonAggregateFunctionInfo[])aggInfosAndDataViewSpecs.f0;
        DataViewUtils.DataViewSpec[][] dataViewSpecs = (DataViewUtils.DataViewSpec[][])aggInfosAndDataViewSpecs.f1;
        OneInputStreamOperator<RowData, RowData> pythonOperator = this.getGeneralPythonStreamGroupWindowAggregateFunctionOperator(config, inputRowType, outputRowType, windowAssigner, pythonFunctionInfos, dataViewSpecs, inputTimeFieldIndex, inputCountIndex, this.generateUpdateBefore, countStarInserted, allowance, shiftTimeZone);
        return new OneInputTransformation(inputTransform, this.getDescription(), pythonOperator, (TypeInformation)InternalTypeInfo.of((RowType)outputRowType), inputTransform.getParallelism());
    }

    private OneInputStreamOperator<RowData, RowData> getPandasPythonStreamGroupWindowAggregateFunctionOperator(Configuration config, RowType inputRowType, RowType outputRowType, WindowAssigner<?> windowAssigner, Trigger<?> trigger, long allowance, int inputTimeFieldIndex, int[] udafInputOffsets, PythonFunctionInfo[] pythonFunctionInfos, ZoneId shiftTimeZone) {
        Class clazz = CommonPythonUtil.loadClass(ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
        try {
            Constructor ctor = clazz.getConstructor(Configuration.class, PythonFunctionInfo[].class, RowType.class, RowType.class, Integer.TYPE, WindowAssigner.class, Trigger.class, Long.TYPE, PlannerNamedWindowProperty[].class, int[].class, int[].class, ZoneId.class);
            return (OneInputStreamOperator)ctor.newInstance(config, pythonFunctionInfos, inputRowType, outputRowType, inputTimeFieldIndex, windowAssigner, trigger, allowance, this.namedWindowProperties, this.grouping, udafInputOffsets, shiftTimeZone);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TableException("Python StreamArrowPythonGroupWindowAggregateFunctionOperator constructed failed.", (Throwable)e);
        }
    }

    private OneInputStreamOperator<RowData, RowData> getGeneralPythonStreamGroupWindowAggregateFunctionOperator(Configuration config, RowType inputType, RowType outputType, WindowAssigner<?> windowAssigner, PythonAggregateFunctionInfo[] aggregateFunctions, DataViewUtils.DataViewSpec[][] dataViewSpecs, int inputTimeFieldIndex, int indexOfCountStar, boolean generateUpdateBefore, boolean countStarInserted, long allowance, ZoneId shiftTimeZone) {
        Class clazz = CommonPythonUtil.loadClass(GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
        try {
            Constructor ctor = clazz.getConstructor(Configuration.class, RowType.class, RowType.class, PythonAggregateFunctionInfo[].class, DataViewUtils.DataViewSpec[][].class, int[].class, Integer.TYPE, Boolean.TYPE, Boolean.TYPE, Integer.TYPE, WindowAssigner.class, LogicalWindow.class, Long.TYPE, PlannerNamedWindowProperty[].class, ZoneId.class);
            return (OneInputStreamOperator)ctor.newInstance(new Object[]{config, inputType, outputType, aggregateFunctions, dataViewSpecs, this.grouping, indexOfCountStar, generateUpdateBefore, countStarInserted, inputTimeFieldIndex, windowAssigner, this.window, allowance, this.namedWindowProperties, shiftTimeZone});
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TableException("Python PythonStreamGroupWindowAggregateOperator constructed failed.", (Throwable)e);
        }
    }
}

