package org.apache.flink.table.runtime;

import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedProcessFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.functions.ProcessFunction;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/KeyedProcessOperator.class */
public class KeyedProcessOperator<K, IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT>, Triggerable<Object, VoidNamespace> {
    private static final long serialVersionUID = 1;
    private GeneratedProcessFunction funcCode;
    private ProcessFunction<IN, OUT> function;
    private transient StreamRecordCollector<OUT> collector;
    private transient KeyedProcessOperator<K, IN, OUT>.ContextImpl context;
    private transient KeyedProcessOperator<K, IN, OUT>.OnTimerContextImpl onTimerContext;
    private transient boolean functionsClosed = false;

    /* loaded from: input_file:org/apache/flink/table/runtime/KeyedProcessOperator$ContextImpl.class */
    private class ContextImpl extends ProcessFunction.Context {
        private final TimerService timerService;

        ContextImpl(TimerService timerService) {
            this.timerService = (TimerService) Preconditions.checkNotNull(timerService);
        }

        @Override // org.apache.flink.table.runtime.functions.ProcessFunction.Context
        public TimerService timerService() {
            return this.timerService;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/KeyedProcessOperator$OnTimerContextImpl.class */
    private class OnTimerContextImpl extends ProcessFunction.OnTimerContext {
        private final TimerService timerService;
        private TimeDomain timeDomain;

        OnTimerContextImpl(TimerService timerService) {
            this.timerService = (TimerService) Preconditions.checkNotNull(timerService);
        }

        @Override // org.apache.flink.table.runtime.functions.ProcessFunction.Context
        public TimerService timerService() {
            return this.timerService;
        }

        @Override // org.apache.flink.table.runtime.functions.ProcessFunction.OnTimerContext
        public TimeDomain timeDomain() {
            Preconditions.checkState(this.timeDomain != null);
            return this.timeDomain;
        }
    }

    public KeyedProcessOperator(GeneratedProcessFunction generatedProcessFunction) {
        this.funcCode = generatedProcessFunction;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public KeyedProcessOperator(ProcessFunction<IN, OUT> processFunction) {
        this.function = processFunction;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        if (this.funcCode != null) {
            this.function = (ProcessFunction) CodeGenUtils.compile(getContainingTask().getUserCodeClassLoader(), this.funcCode.name(), this.funcCode.code()).newInstance();
        }
        this.function.open(new ExecutionContextImpl(this, getRuntimeContext()));
        this.collector = new StreamRecordCollector<>(this.output);
        SimpleTimerService simpleTimerService = new SimpleTimerService(getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this));
        this.context = new ContextImpl(simpleTimerService);
        this.onTimerContext = new OnTimerContextImpl(simpleTimerService);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (this.function instanceof CheckpointListener) {
            this.function.notifyCheckpointComplete(j);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        StreamingFunctionUtils.snapshotFunctionState(stateSnapshotContext, getOperatorStateBackend(), this.function);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        StreamingFunctionUtils.restoreFunctionState(stateInitializationContext, this.function);
    }

    public void close() throws Exception {
        super.close();
        this.functionsClosed = true;
        this.function.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        if (this.functionsClosed) {
            return;
        }
        this.functionsClosed = true;
        this.function.close();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = TimeDomain.EVENT_TIME;
        this.function.onTimer(internalTimer.getTimestamp(), this.onTimerContext, this.collector);
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = null;
    }

    public void onProcessingTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = TimeDomain.PROCESSING_TIME;
        this.function.onTimer(internalTimer.getTimestamp(), this.onTimerContext, this.collector);
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = null;
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.function.processElement(streamRecord.getValue(), this.context, this.collector);
    }

    public void endInput() throws Exception {
        this.function.endInput(this.collector);
    }
}
