/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedProcessFunction;
import org.apache.flink.table.runtime.functions.ExecutionContextImpl;
import org.apache.flink.table.runtime.functions.ProcessFunction;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Preconditions;

public class KeyedProcessOperator<K, IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<Object, VoidNamespace> {
    private static final long serialVersionUID = 1L;
    private GeneratedProcessFunction funcCode;
    private ProcessFunction<IN, OUT> function;
    private transient StreamRecordCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;
    private transient boolean functionsClosed = false;

    public KeyedProcessOperator(GeneratedProcessFunction funcCode) {
        this.funcCode = funcCode;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
        this.function = function;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        if (this.funcCode != null) {
            Class functionClass = CodeGenUtils.compile(this.getContainingTask().getUserCodeClassLoader(), this.funcCode.name(), this.funcCode.code());
            this.function = (ProcessFunction)functionClass.newInstance();
        }
        this.function.open(new ExecutionContextImpl(this, (RuntimeContext)this.getRuntimeContext()));
        this.collector = new StreamRecordCollector(this.output);
        InternalTimerService internalTimerService = this.getInternalTimerService("user-timers", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        SimpleTimerService timerService = new SimpleTimerService(internalTimerService);
        this.context = new ContextImpl((TimerService)timerService);
        this.onTimerContext = new OnTimerContextImpl((TimerService)timerService);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (this.function instanceof CheckpointListener) {
            ((CheckpointListener)this.function).notifyCheckpointComplete(checkpointId);
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState((StateSnapshotContext)context, (OperatorStateBackend)this.getOperatorStateBackend(), this.function);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        StreamingFunctionUtils.restoreFunctionState((StateInitializationContext)context, this.function);
    }

    public void close() throws Exception {
        super.close();
        this.functionsClosed = true;
        this.function.close();
    }

    public void dispose() throws Exception {
        super.dispose();
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            this.function.close();
        }
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
        this.setCurrentKey(timer.getKey());
        this.onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
        this.function.onTimer(timer.getTimestamp(), this.onTimerContext, this.collector);
        this.onTimerContext.timeDomain = null;
    }

    public void onProcessingTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
        this.setCurrentKey(timer.getKey());
        this.onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
        this.function.onTimer(timer.getTimestamp(), this.onTimerContext, this.collector);
        this.onTimerContext.timeDomain = null;
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        this.function.processElement(element.getValue(), this.context, this.collector);
    }

    public void endInput() throws Exception {
        this.function.endInput(this.collector);
    }

    private class OnTimerContextImpl
    extends ProcessFunction.OnTimerContext {
        private final TimerService timerService;
        private TimeDomain timeDomain;

        OnTimerContextImpl(TimerService timerService) {
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }

        @Override
        public TimeDomain timeDomain() {
            Preconditions.checkState(this.timeDomain != null);
            return this.timeDomain;
        }
    }

    private class ContextImpl
    extends ProcessFunction.Context {
        private final TimerService timerService;

        ContextImpl(TimerService timerService) {
            this.timerService = Preconditions.checkNotNull(timerService);
        }

        @Override
        public TimerService timerService() {
            return this.timerService;
        }
    }
}

