/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceV2;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
import org.apache.flink.streaming.runtime.tasks.InputSelector;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskConfigSnapshot;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OperatorChain
implements StreamStatusMaintainer,
InputSelector {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final AbstractInvokable containingTask;
    private final Map<Integer, AbstractStreamOperatorProxy<?>> allOperators;
    private final Deque<StreamOperator<?>> allOperatorsTopologySorted;
    private final RecordWriterOutput<?>[] streamOutputs;
    private final Map<Integer, WatermarkGaugeExposingOutput<StreamRecord<?>>> chainEntryPoints = new HashMap();
    private final Map<Integer, StreamOperator> headOperators = new HashMap<Integer, StreamOperator>();
    private final Map<Integer, AbstractStreamOperatorProxy<?>> sourceHeadOperators = new HashMap();
    private final StreamTaskConfigSnapshot streamTaskConfig;
    private StreamStatus streamStatus = StreamStatus.ACTIVE;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OperatorChain(StreamTask containingTask, List<StreamRecordWriter<StreamRecord<?>>> streamRecordWriters) {
        this.containingTask = containingTask;
        ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        this.streamTaskConfig = containingTask.getStreamTaskConfig();
        Map<Integer, StreamConfig> chainedConfigs = this.streamTaskConfig.getChainedNodeConfigs();
        List<StreamEdge> outEdgesInOrder = this.streamTaskConfig.getOutStreamEdgesOfChain();
        HashMap streamOutputMap = new HashMap(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        boolean success = false;
        try {
            Iterator<StreamEdge> streamOutput;
            for (int i = 0; i < outEdgesInOrder.size(); ++i) {
                StreamEdge outEdge = outEdgesInOrder.get(i);
                streamOutput = this.createStreamOutput(streamRecordWriters.get(i), outEdge, chainedConfigs.get(outEdge.getSourceId()), containingTask.getEnvironment(), i);
                this.streamOutputs[i] = streamOutput;
                streamOutputMap.put(outEdge, (RecordWriterOutput<?>)((Object)streamOutput));
            }
            HashMap allOps = new HashMap(chainedConfigs.size());
            List<Integer> headIds = this.streamTaskConfig.getChainedHeadNodeIds();
            Preconditions.checkNotNull(headIds);
            if (!chainedConfigs.isEmpty()) {
                streamOutput = headIds.iterator();
                while (streamOutput.hasNext()) {
                    int headId = (Integer)streamOutput.next();
                    if (!allOps.containsKey(headId)) {
                        Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> outputAndSuccessors = this.createOutputCollector(containingTask, chainedConfigs.get(headId), chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
                        WatermarkGaugeExposingOutput output = (WatermarkGaugeExposingOutput)outputAndSuccessors.f0;
                        List successors = (List)outputAndSuccessors.f1;
                        Object originalHeadOperator = chainedConfigs.get(headId).getStreamOperator(userCodeClassloader);
                        if (originalHeadOperator == null) continue;
                        AbstractStreamOperatorProxy headOperator = AbstractStreamOperatorProxy.proxy(originalHeadOperator, successors);
                        headOperator.setup(containingTask, chainedConfigs.get(headId), output);
                        headOperator.getMetricGroup().gauge("currentOutputWatermark", output.getWatermarkGauge());
                        this.headOperators.put(headId, (StreamOperator)originalHeadOperator);
                        if (originalHeadOperator instanceof StreamSource || originalHeadOperator instanceof StreamSourceV2) {
                            this.sourceHeadOperators.put(headId, headOperator);
                        }
                        allOps.put(headId, headOperator);
                        this.chainEntryPoints.put(headId, output);
                        continue;
                    }
                    this.headOperators.put(headId, ((AbstractStreamOperatorProxy)allOps.get(headId)).getOperator());
                }
            }
            if (!allOps.isEmpty()) {
                for (StreamEdge streamEdge : this.streamTaskConfig.getInStreamEdgesOfChain()) {
                    ((AbstractStreamOperatorProxy)allOps.get(streamEdge.getTargetId())).addInputEdge(streamEdge);
                }
            }
            HashMap originalOperators = new HashMap();
            for (Map.Entry entry : allOps.entrySet()) {
                originalOperators.put(entry.getKey(), ((AbstractStreamOperatorProxy)entry.getValue()).getOperator());
            }
            this.allOperatorsTopologySorted = OperatorChain.getTopologySortedOperators(headIds, userCodeClassloader, originalOperators, chainedConfigs);
            this.allOperators = allOps;
            success = true;
        }
        finally {
            if (!success) {
                for (RecordWriterOutput<?> output : this.streamOutputs) {
                    if (output == null) continue;
                    output.close();
                }
            }
        }
    }

    @Override
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    @Override
    public void toggleStreamStatus(StreamStatus status) {
        if (!status.equals(this.streamStatus)) {
            this.streamStatus = status;
            for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
                streamOutput.emitStreamStatus(status);
            }
        }
    }

    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
                streamOutput.broadcastEvent((AbstractEvent)barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

    public void broadcastCheckpointCancelMarker(long id) throws IOException {
        try {
            CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
            for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
                streamOutput.broadcastEvent((AbstractEvent)barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint cancellation");
        }
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        for (StreamOperator<?> op : this.getAllOperatorsTopologySorted()) {
            op.prepareSnapshotPreBarrier(checkpointId);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public Deque<StreamOperator<?>> getAllOperatorsTopologySorted() {
        return this.allOperatorsTopologySorted;
    }

    public Output<StreamRecord<?>>[] getChainEntryPoints() {
        return this.chainEntryPoints.values().toArray(new Output[0]);
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.getStreamOutputs()) {
            streamOutput.flush();
        }
    }

    public void releaseOutputs() {
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.close();
        }
    }

    public StreamOperator[] getHeadOperators() {
        return this.headOperators.values().toArray(new StreamOperator[0]);
    }

    public StreamOperator getHeadOperator(int headNodeId) {
        return this.headOperators.get(headNodeId);
    }

    public AbstractStreamOperatorProxy getOperatorProxy(int nodeId) {
        return this.allOperators.get(nodeId);
    }

    public int getChainLength() {
        return this.allOperators == null ? 0 : this.allOperators.size();
    }

    @Override
    public void registerSelectionChangedListener(InputSelector.SelectionChangedListener listener) {
        for (AbstractStreamOperatorProxy<?> operatorProxy : this.allOperators.values()) {
            if (!(operatorProxy instanceof TwoInputStreamOperatorProxy)) continue;
            ((TwoInputStreamOperatorProxy)operatorProxy).registerSelectionChangedListener(listener);
        }
    }

    @Override
    public List<InputSelector.InputSelection> getNextSelectedInputs() {
        HashMap<StreamEdge, Boolean> visited = null;
        if (this.allOperators.size() > 16) {
            visited = new HashMap<StreamEdge, Boolean>(this.allOperators.size());
        }
        ArrayList<InputSelector.InputSelection> selectedInputs = new ArrayList<InputSelector.InputSelection>();
        for (StreamEdge streamEdge : this.streamTaskConfig.getInStreamEdgesOfChain()) {
            if (!this.allOperators.get(streamEdge.getTargetId()).isSelected(streamEdge, visited)) continue;
            selectedInputs.add(InputSelector.EdgeInputSelection.create(streamEdge));
        }
        for (Map.Entry entry : this.sourceHeadOperators.entrySet()) {
            if (!((AbstractStreamOperatorProxy)entry.getValue()).isSelected(null, visited)) continue;
            selectedInputs.add(InputSelector.SourceInputSelection.create((Integer)entry.getKey()));
        }
        return selectedInputs;
    }

    private <T> Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> createOutputCollector(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, Map<Integer, AbstractStreamOperatorProxy<?>> allOperators) {
        ArrayList<Tuple2> allOutputs = new ArrayList<Tuple2>(4);
        ArrayList<Tuple2> allSuccessors = new ArrayList<Tuple2>(4);
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<?> output = streamOutputs.get(outputEdge);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
        }
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
            WatermarkGaugeExposingOutput output = this.createChainedOperator(containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
            allSuccessors.add(Tuple2.of(allOperators.get(outputId), (Object)outputEdge));
        }
        List selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
        if (selectors == null || selectors.isEmpty()) {
            if (allOutputs.size() == 1) {
                return Tuple2.of((Object)((Tuple2)allOutputs.get((int)0)).f0, allSuccessors);
            }
            Output[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); ++i) {
                asArray[i] = (Output)((Tuple2)allOutputs.get((int)i)).f0;
            }
            if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
                return Tuple2.of(new CopyingBroadcastingOutputCollector(asArray, this), allSuccessors);
            }
            return Tuple2.of(new BroadcastingOutputCollector(asArray, this), allSuccessors);
        }
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            return Tuple2.of(new CopyingDirectedOutput(selectors, allOutputs), allSuccessors);
        }
        return Tuple2.of(new DirectedOutput(selectors, allOutputs), allSuccessors);
    }

    /*
     * WARNING - void declaration
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, Map<Integer, AbstractStreamOperatorProxy<?>> allOperators, StreamEdge inputEdge) {
        void var9_16;
        AbstractStreamOperatorProxy chainedOperator = allOperators.get(inputEdge.getTargetId());
        if (chainedOperator == null) {
            Tuple2<WatermarkGaugeExposingOutput, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>>> tuple2 = this.createOutputCollector(containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
            WatermarkGaugeExposingOutput chainedOperatorOutput = (WatermarkGaugeExposingOutput)tuple2.f0;
            List successors = (List)tuple2.f1;
            chainedOperator = AbstractStreamOperatorProxy.proxy(operatorConfig.getStreamOperator(userCodeClassloader), successors);
            chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);
            allOperators.put(inputEdge.getTargetId(), chainedOperator);
            chainedOperator.getMetricGroup().gauge("currentOutputWatermark", () -> chainedOperatorOutput.getWatermarkGauge().getValue());
        }
        chainedOperator.addInputEdge(inputEdge);
        if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
            if (chainedOperator instanceof OneInputStreamOperator) {
                ChainingWithOneInputStreamOperatorOutput chainingWithOneInputStreamOperatorOutput = new ChainingWithOneInputStreamOperatorOutput((OneInputStreamOperator)chainedOperator.getOperator(), this, inputEdge);
            } else {
                if (!(chainedOperator instanceof TwoInputStreamOperator)) throw new RuntimeException("Unexpected operator type " + chainedOperator.getOperator());
                if (inputEdge.getTypeNumber() == 1) {
                    ChainingWithFirstInputOfTwoInputStreamOperatorOutput chainingWithFirstInputOfTwoInputStreamOperatorOutput = new ChainingWithFirstInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator)((Object)chainedOperator), this, inputEdge);
                } else {
                    if (inputEdge.getTypeNumber() != 2) throw new RuntimeException("Unexpected type number of edge " + inputEdge);
                    ChainingWithSecondInputOfTwoInputStreamOperatorOutput chainingWithSecondInputOfTwoInputStreamOperatorOutput = new ChainingWithSecondInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator)((Object)chainedOperator), this, inputEdge);
                }
            }
        } else {
            TypeSerializer inSerializer = inputEdge.getTypeNumber() == 2 ? operatorConfig.getTypeSerializerIn2(userCodeClassloader) : operatorConfig.getTypeSerializerIn1(userCodeClassloader);
            if (chainedOperator instanceof OneInputStreamOperator) {
                CopyingChainingWithOneInputStreamOperatorOutput copyingChainingWithOneInputStreamOperatorOutput = new CopyingChainingWithOneInputStreamOperatorOutput((OneInputStreamOperator)chainedOperator.getOperator(), inSerializer, inputEdge, this);
            } else {
                if (!(chainedOperator instanceof TwoInputStreamOperator)) throw new RuntimeException("Unexpected operator type " + chainedOperator.getOperator());
                if (inputEdge.getTypeNumber() == 1) {
                    CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput copyingChainingWithFirstInputOfTwoInputStreamOperatorOutput = new CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator)((Object)chainedOperator), inSerializer, inputEdge, this);
                } else {
                    if (inputEdge.getTypeNumber() != 2) throw new RuntimeException("Unexpected type number of edge " + inputEdge);
                    CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput copyingChainingWithSecondInputOfTwoInputStreamOperatorOutput = new CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput((TwoInputStreamOperator)((Object)chainedOperator), inSerializer, inputEdge, this);
                }
            }
        }
        chainedOperator.getMetricGroup().gauge("currentInputWatermark", () -> var9_16.getWatermarkGauge().getValue());
        return var9_16;
    }

    private RecordWriterOutput<?> createStreamOutput(StreamRecordWriter<StreamRecord<?>> streamRecordWriter, StreamEdge edge, StreamConfig upStreamConfig, Environment taskEnvironment, int outputIndex) {
        OutputTag sideOutputTag = edge.getOutputTag();
        TypeSerializer outSerializer = null;
        outSerializer = edge.getOutputTag() != null ? upStreamConfig.getTypeSerializerSideOut(edge.getOutputTag(), taskEnvironment.getUserClassLoader()) : upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
        StreamElementSerializer outRecordSerializer = new StreamElementSerializer(outSerializer);
        taskEnvironment.getWriter(outputIndex).setTypeSerializer(outRecordSerializer);
        taskEnvironment.getWriter(outputIndex).setParentTask(this.containingTask);
        return new RecordWriterOutput(streamRecordWriter, sideOutputTag, this);
    }

    static Deque<StreamOperator<?>> getTopologySortedOperators(List<Integer> headIds, ClassLoader userCodeClassloader, Map<Integer, ? extends StreamOperator<?>> allOperators, Map<Integer, StreamConfig> chainedConfigs) {
        if (allOperators == null || allOperators.isEmpty()) {
            return new ArrayDeque();
        }
        ArrayDeque<Integer> toTraversed = new ArrayDeque<Integer>();
        HashMap<Integer, Integer> operatorInputs = new HashMap<Integer, Integer>();
        for (StreamConfig streamConfig : chainedConfigs.values()) {
            for (StreamEdge edgeInChain : streamConfig.getChainedOutputs(userCodeClassloader)) {
                operatorInputs.put(edgeInChain.getTargetId(), operatorInputs.getOrDefault(edgeInChain.getTargetId(), 0) + 1);
            }
        }
        Iterator<Serializable> iterator = headIds.iterator();
        while (iterator.hasNext()) {
            int headId = (Integer)iterator.next();
            if (operatorInputs.getOrDefault(headId, 0) != 0) continue;
            toTraversed.add(headId);
        }
        Preconditions.checkState((!toTraversed.isEmpty() ? 1 : 0) != 0);
        ArrayDeque topologySortedOperators = new ArrayDeque();
        while (!toTraversed.isEmpty()) {
            int currentOperatorId = (Integer)toTraversed.poll();
            topologySortedOperators.add(allOperators.get(currentOperatorId));
            for (StreamEdge edge : chainedConfigs.get(currentOperatorId).getChainedOutputs(userCodeClassloader)) {
                int targetOperatorId = edge.getTargetId();
                int inputCountsLeft = (Integer)operatorInputs.get(targetOperatorId);
                operatorInputs.put(targetOperatorId, --inputCountsLeft);
                if (inputCountsLeft != 0) continue;
                toTraversed.add(targetOperatorId);
            }
        }
        return topologySortedOperators;
    }

    private static class SourceV2StreamOperatorProxy<OUT>
    extends AbstractStreamOperatorProxy<OUT>
    implements OneInputStreamOperator<OUT, OUT> {
        SourceV2StreamOperatorProxy(StreamOperator<OUT> operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            super(operator, successors);
        }

        @Override
        public void addInputEdge(StreamEdge inputEdge) {
            throw new UnsupportedOperationException("There should not be a input edge in source operator");
        }

        @Override
        public void endInput(StreamEdge inputEdge) throws Exception {
            this.endSuccessorsInput();
        }

        @Override
        public void processElement(StreamRecord<OUT> element) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            throw new UnsupportedOperationException("Should not come to here");
        }

        @Override
        public void endInput() throws Exception {
            this.endSuccessorsInput();
        }

        @Override
        public boolean requireState() {
            return this.getOperator().requireState();
        }
    }

    private static class SourceStreamOperatorProxy<OUT>
    extends AbstractStreamOperatorProxy<OUT>
    implements OneInputStreamOperator<OUT, OUT> {
        private final StreamSource<OUT, ?> operator;

        SourceStreamOperatorProxy(StreamSource<OUT, ?> operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            super(operator, successors);
            this.operator = operator;
        }

        @Override
        public void addInputEdge(StreamEdge inputEdge) {
            throw new UnsupportedOperationException("There should not be a input edge in source operator");
        }

        @Override
        public void endInput(StreamEdge inputEdge) throws Exception {
            this.endInput();
        }

        @Override
        public void processElement(StreamRecord<OUT> element) throws Exception {
            this.operator.getOutput().collect(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            this.operator.getOutput().emitWatermark(mark);
        }

        @Override
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.getOutput().emitLatencyMarker(latencyMarker);
        }

        @Override
        public void endInput() throws Exception {
            this.endSuccessorsInput();
        }

        @Override
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    private static class TwoInputStreamOperatorProxy<IN1, IN2, OUT>
    extends AbstractStreamOperatorProxy<OUT>
    implements TwoInputStreamOperator<IN1, IN2, OUT> {
        private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
        private volatile int unfinishedInputEdges1 = 0;
        private volatile int unfinishedInputEdges2 = 0;
        private TwoInputSelection lastSelection = null;
        private InputSelector.SelectionChangedListener listener;

        TwoInputStreamOperatorProxy(TwoInputStreamOperator<IN1, IN2, OUT> operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            super(operator, successors);
            this.operator = operator;
        }

        @Override
        public TwoInputSelection firstInputSelection() {
            this.lastSelection = this.operator.firstInputSelection();
            return this.lastSelection;
        }

        @Override
        public TwoInputSelection processElement1(StreamRecord<IN1> element) throws Exception {
            TwoInputSelection selection = this.operator.processElement1(element);
            if (selection != this.lastSelection) {
                this.lastSelection = selection;
                if (this.listener != null) {
                    this.listener.notifySelectionChanged();
                }
            }
            return this.lastSelection;
        }

        @Override
        public TwoInputSelection processElement2(StreamRecord<IN2> element) throws Exception {
            TwoInputSelection selection = this.operator.processElement2(element);
            if (selection != this.lastSelection) {
                this.lastSelection = selection;
                if (this.listener != null) {
                    this.listener.notifySelectionChanged();
                }
            }
            return this.lastSelection;
        }

        @Override
        public void processWatermark1(Watermark mark) throws Exception {
            this.operator.processWatermark1(mark);
        }

        @Override
        public void processWatermark2(Watermark mark) throws Exception {
            this.operator.processWatermark2(mark);
        }

        @Override
        public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker1(latencyMarker);
        }

        @Override
        public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker2(latencyMarker);
        }

        @Override
        public void endInput1() throws Exception {
            if (--this.unfinishedInputEdges1 == 0) {
                this.operator.endInput1();
                if (this.lastSelection != TwoInputSelection.SECOND) {
                    this.lastSelection = TwoInputSelection.SECOND;
                    if (this.listener != null) {
                        this.listener.notifySelectionChanged();
                    }
                }
            } else if (this.unfinishedInputEdges1 < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
            if (this.unfinishedInputEdges1 == 0 && this.unfinishedInputEdges2 == 0) {
                this.endSuccessorsInput();
            }
        }

        @Override
        public void endInput2() throws Exception {
            if (--this.unfinishedInputEdges2 == 0) {
                this.operator.endInput2();
                if (this.lastSelection != TwoInputSelection.FIRST) {
                    this.lastSelection = TwoInputSelection.FIRST;
                    if (this.listener != null) {
                        this.listener.notifySelectionChanged();
                    }
                }
            } else if (this.unfinishedInputEdges2 < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
            if (this.unfinishedInputEdges1 == 0 && this.unfinishedInputEdges2 == 0) {
                this.endSuccessorsInput();
            }
        }

        @Override
        public void addInputEdge(StreamEdge inputEdge) {
            if (inputEdge.getTypeNumber() == 1) {
                ++this.unfinishedInputEdges1;
            } else if (inputEdge.getTypeNumber() == 2) {
                ++this.unfinishedInputEdges2;
            } else {
                throw new RuntimeException("Unknown stream edge type number " + inputEdge.getTypeNumber());
            }
        }

        @Override
        public void endInput(StreamEdge inputEdge) throws Exception {
            if (inputEdge.getTypeNumber() == 1) {
                this.endInput1();
            } else if (inputEdge.getTypeNumber() == 2) {
                this.endInput2();
            } else {
                throw new RuntimeException("Unknown stream edge type number " + inputEdge.getTypeNumber());
            }
        }

        @Override
        public boolean isSelected(StreamEdge inputEdge, Map<StreamEdge, Boolean> visited) {
            Boolean isSelected;
            Preconditions.checkNotNull((Object)inputEdge);
            if (this.lastSelection == null) {
                this.lastSelection = this.firstInputSelection();
            }
            if (inputEdge.getTypeNumber() == 1 && this.lastSelection == TwoInputSelection.SECOND || inputEdge.getTypeNumber() == 2 && this.lastSelection == TwoInputSelection.FIRST) {
                if (visited != null) {
                    visited.put(inputEdge, false);
                }
                return false;
            }
            if (visited != null && (isSelected = visited.get(inputEdge)) != null) {
                return isSelected;
            }
            isSelected = super.isSelected(inputEdge, visited);
            if (visited != null) {
                visited.put(inputEdge, isSelected);
            }
            return isSelected;
        }

        public void registerSelectionChangedListener(InputSelector.SelectionChangedListener listener) {
            this.listener = listener;
        }

        @Override
        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            this.operator.prepareSnapshotPreBarrier(checkpointId);
        }

        @Override
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    private static class OneInputStreamOperatorProxy<IN, OUT>
    extends AbstractStreamOperatorProxy<OUT>
    implements OneInputStreamOperator<IN, OUT> {
        private final OneInputStreamOperator<IN, OUT> operator;
        private volatile int unfinishedInputEdges = 0;

        OneInputStreamOperatorProxy(OneInputStreamOperator<IN, OUT> operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            super(operator, successors);
            this.operator = operator;
        }

        @Override
        public void addInputEdge(StreamEdge inputEdge) {
            ++this.unfinishedInputEdges;
        }

        @Override
        public void endInput(StreamEdge inputEdge) throws Exception {
            this.endInput();
        }

        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            this.operator.processElement(element);
        }

        @Override
        public void processWatermark(Watermark mark) throws Exception {
            this.operator.processWatermark(mark);
        }

        @Override
        public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }

        @Override
        public void endInput() throws Exception {
            if (--this.unfinishedInputEdges == 0) {
                this.operator.endInput();
                this.endSuccessorsInput();
            } else if (this.unfinishedInputEdges < 0) {
                throw new RuntimeException("This input side is finished already, unexpected endInput invoked");
            }
        }

        @Override
        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            this.operator.prepareSnapshotPreBarrier(checkpointId);
        }

        @Override
        public boolean requireState() {
            return this.operator.requireState();
        }
    }

    static abstract class AbstractStreamOperatorProxy<OUT>
    implements StreamOperator<OUT> {
        private final StreamOperator<OUT> operator;
        protected final List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors;

        AbstractStreamOperatorProxy(StreamOperator<OUT> operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            this.operator = operator;
            this.successors = successors;
        }

        public StreamOperator<OUT> getOperator() {
            return this.operator;
        }

        public static AbstractStreamOperatorProxy proxy(StreamOperator operator, List<Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge>> successors) {
            if (operator instanceof OneInputStreamOperator) {
                return new OneInputStreamOperatorProxy((OneInputStreamOperator)operator, successors);
            }
            if (operator instanceof TwoInputStreamOperator) {
                return new TwoInputStreamOperatorProxy((TwoInputStreamOperator)operator, successors);
            }
            if (operator instanceof StreamSource) {
                return new SourceStreamOperatorProxy((StreamSource)operator, successors);
            }
            if (operator instanceof StreamSourceV2) {
                return new SourceV2StreamOperatorProxy(operator, successors);
            }
            throw new RuntimeException("Unknown input stream operator " + operator);
        }

        public abstract void addInputEdge(StreamEdge var1);

        public abstract void endInput(StreamEdge var1) throws Exception;

        public boolean isSelected(@Nullable StreamEdge inputEdge, @Nullable Map<StreamEdge, Boolean> visited) {
            Boolean isSelected;
            if (inputEdge != null && visited != null && (isSelected = visited.get(inputEdge)) != null) {
                return isSelected;
            }
            for (Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge> successor : this.successors) {
                if (((AbstractStreamOperatorProxy)successor.f0).isSelected((StreamEdge)successor.f1, visited)) continue;
                if (inputEdge != null && visited != null) {
                    visited.put(inputEdge, false);
                }
                return false;
            }
            if (inputEdge != null && visited != null) {
                visited.put(inputEdge, true);
            }
            return true;
        }

        public void endSuccessorsInput() throws Exception {
            for (Tuple2<AbstractStreamOperatorProxy<?>, StreamEdge> successor : this.successors) {
                ((AbstractStreamOperatorProxy)successor.f0).endInput((StreamEdge)successor.f1);
            }
        }

        @Override
        public void setCurrentKey(Object key) {
            this.operator.setCurrentKey(key);
        }

        @Override
        public Object getCurrentKey() {
            return this.operator.getCurrentKey();
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.operator.notifyCheckpointComplete(checkpointId);
        }

        @Override
        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
            this.operator.setup(containingTask, config, output);
        }

        @Override
        public void open() throws Exception {
            this.operator.open();
        }

        @Override
        public void close() throws Exception {
            this.operator.close();
        }

        @Override
        public void dispose() throws Exception {
            this.operator.dispose();
        }

        @Override
        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            this.operator.prepareSnapshotPreBarrier(checkpointId);
        }

        @Override
        public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
            return this.operator.snapshotState(checkpointId, timestamp, checkpointOptions, storageLocation);
        }

        @Override
        public void initializeState() throws Exception {
            this.operator.initializeState();
        }

        @Override
        public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
            this.operator.setKeyContextElement1(record);
        }

        @Override
        public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
            this.operator.setKeyContextElement2(record);
        }

        @Override
        public ChainingStrategy getChainingStrategy() {
            return this.operator.getChainingStrategy();
        }

        @Override
        public void setChainingStrategy(ChainingStrategy strategy) {
            this.operator.setChainingStrategy(strategy);
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.operator.getMetricGroup();
        }

        @Override
        public OperatorID getOperatorID() {
            return this.operator.getOperatorID();
        }
    }

    private static final class CopyingBroadcastingOutputCollector<T>
    extends BroadcastingOutputCollector<T> {
        public CopyingBroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs, StreamStatusProvider streamStatusProvider) {
            super(outputs, streamStatusProvider);
        }

        @Override
        public void collect(StreamRecord<T> record) {
            for (int i = 0; i < this.outputs.length - 1; ++i) {
                Output output = this.outputs[i];
                StreamRecord<T> shallowCopy = record.copy(record.getValue());
                output.collect(shallowCopy);
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(record);
            }
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            for (int i = 0; i < this.outputs.length - 1; ++i) {
                Output output = this.outputs[i];
                StreamRecord<X> shallowCopy = record.copy(record.getValue());
                output.collect(outputTag, shallowCopy);
            }
            if (this.outputs.length > 0) {
                this.outputs[this.outputs.length - 1].collect(outputTag, record);
            }
        }
    }

    private static class BroadcastingOutputCollector<T>
    implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final Output<StreamRecord<T>>[] outputs;
        private final Random random = new XORShiftRandom();
        private final StreamStatusProvider streamStatusProvider;
        private final WatermarkGauge watermarkGauge = new WatermarkGauge();

        public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs, StreamStatusProvider streamStatusProvider) {
            this.outputs = outputs;
            this.streamStatusProvider = streamStatusProvider;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            if (this.streamStatusProvider.getStreamStatus().isActive()) {
                for (Output<StreamRecord<T>> output : this.outputs) {
                    output.emitWatermark(mark);
                }
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            if (this.outputs.length > 0) {
                if (this.outputs.length == 1) {
                    this.outputs[0].emitLatencyMarker(latencyMarker);
                } else {
                    this.outputs[this.random.nextInt(this.outputs.length)].emitLatencyMarker(latencyMarker);
                }
            }
        }

        @Override
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }

        public void collect(StreamRecord<T> record) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(record);
            }
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(outputTag, record);
            }
        }

        public void close() {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.close();
            }
        }
    }

    private static final class CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput<T>
    extends ChainingWithSecondInputOfTwoInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithSecondInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<?, T, ?> operator, TypeSerializer<T> serializer, StreamEdge edge, StreamStatusProvider streamStatusProvider) {
            super(operator, streamStatusProvider, edge);
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        }

        @Override
        protected final <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                StreamRecord<Object> copy = castRecord.copy(this.serializer.copy(castRecord.getValue()));
                this.operator.setKeyContextElement2(copy);
                this.operator.processElement2(copy);
            }
            catch (ClassCastException e) {
                if (this.outputTag != null) {
                    ClassCastException replace = new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId()));
                    throw new ExceptionInChainedOperatorException(replace);
                }
                throw new ExceptionInChainedOperatorException(e);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }
    }

    private static final class CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput<T>
    extends ChainingWithFirstInputOfTwoInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithFirstInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<T, ?, ?> operator, TypeSerializer<T> serializer, StreamEdge edge, StreamStatusProvider streamStatusProvider) {
            super(operator, streamStatusProvider, edge);
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        }

        @Override
        protected final <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                StreamRecord<Object> copy = castRecord.copy(this.serializer.copy(castRecord.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement1(copy);
            }
            catch (ClassCastException e) {
                if (this.outputTag != null) {
                    ClassCastException replace = new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId()));
                    throw new ExceptionInChainedOperatorException(replace);
                }
                throw new ExceptionInChainedOperatorException(e);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }
    }

    private static final class CopyingChainingWithOneInputStreamOperatorOutput<T>
    extends ChainingWithOneInputStreamOperatorOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingWithOneInputStreamOperatorOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer, StreamEdge edge, StreamStatusProvider streamStatusProvider) {
            super(operator, streamStatusProvider, edge);
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer);
        }

        @Override
        protected final <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                StreamRecord<Object> copy = castRecord.copy(this.serializer.copy(castRecord.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement(copy);
            }
            catch (ClassCastException e) {
                if (this.outputTag != null) {
                    ClassCastException replace = new ClassCastException(String.format("%s. Failed to push OutputTag with id '%s' to operator. This can occur when multiple OutputTags with different types but identical names are being used.", e.getMessage(), this.outputTag.getId()));
                    throw new ExceptionInChainedOperatorException(replace);
                }
                throw new ExceptionInChainedOperatorException(e);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }
    }

    private static class ChainingWithSecondInputOfTwoInputStreamOperatorOutput<T>
    implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final TwoInputStreamOperator<?, T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithSecondInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<?, T, ?> operator, StreamStatusProvider streamStatusProvider, StreamEdge edge) {
            Counter tmpNumRecordsIn;
            this.operator = operator;
            try {
                OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup)operator.getMetricGroup()).getIOMetricGroup();
                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                tmpNumRecordsIn = new SimpleCounter();
            }
            this.numRecordsIn = tmpNumRecordsIn;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = edge.getOutputTag();
        }

        public void collect(StreamRecord<T> record) {
            if (this.outputTag != null) {
                return;
            }
            this.pushToOperator(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            this.pushToOperator(record);
        }

        protected <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement2(castRecord);
                this.operator.processElement2(castRecord);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitWatermark(Watermark mark) {
            try {
                this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark2(mark);
                }
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker2(latencyMarker);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    private static class ChainingWithFirstInputOfTwoInputStreamOperatorOutput<T>
    implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final TwoInputStreamOperator<T, ?, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithFirstInputOfTwoInputStreamOperatorOutput(TwoInputStreamOperator<T, ?, ?> operator, StreamStatusProvider streamStatusProvider, StreamEdge edge) {
            Counter tmpNumRecordsIn;
            this.operator = operator;
            try {
                OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup)operator.getMetricGroup()).getIOMetricGroup();
                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                tmpNumRecordsIn = new SimpleCounter();
            }
            this.numRecordsIn = tmpNumRecordsIn;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = edge.getOutputTag();
        }

        public void collect(StreamRecord<T> record) {
            if (this.outputTag != null) {
                return;
            }
            this.pushToOperator(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            this.pushToOperator(record);
        }

        protected <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(castRecord);
                this.operator.processElement1(castRecord);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitWatermark(Watermark mark) {
            try {
                this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark1(mark);
                }
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker2(latencyMarker);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    private static class ChainingWithOneInputStreamOperatorOutput<T>
    implements WatermarkGaugeExposingOutput<StreamRecord<T>> {
        protected final OneInputStreamOperator<T, ?> operator;
        protected final Counter numRecordsIn;
        protected final WatermarkGauge watermarkGauge = new WatermarkGauge();
        protected final StreamStatusProvider streamStatusProvider;
        protected final OutputTag<T> outputTag;

        public ChainingWithOneInputStreamOperatorOutput(OneInputStreamOperator<T, ?> operator, StreamStatusProvider streamStatusProvider, StreamEdge edge) {
            Counter tmpNumRecordsIn;
            this.operator = operator;
            try {
                OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup)operator.getMetricGroup()).getIOMetricGroup();
                tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                tmpNumRecordsIn = new SimpleCounter();
            }
            this.numRecordsIn = tmpNumRecordsIn;
            this.streamStatusProvider = streamStatusProvider;
            this.outputTag = edge.getOutputTag();
        }

        public void collect(StreamRecord<T> record) {
            if (this.outputTag != null) {
                return;
            }
            this.pushToOperator(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
                return;
            }
            this.pushToOperator(record);
        }

        protected <X> void pushToOperator(StreamRecord<X> record) {
            try {
                StreamRecord<X> castRecord = record;
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(castRecord);
                this.operator.processElement(castRecord);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitWatermark(Watermark mark) {
            try {
                this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
                if (this.streamStatusProvider.getStreamStatus().isActive()) {
                    this.operator.processWatermark(mark);
                }
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            try {
                this.operator.processLatencyMarker(latencyMarker);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public Gauge<Long> getWatermarkGauge() {
            return this.watermarkGauge;
        }
    }

    public static interface WatermarkGaugeExposingOutput<T>
    extends Output<T> {
        public Gauge<Long> getWatermarkGauge();
    }
}

