/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.TwoInputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
    private static final long serialVersionUID = -1869740381935471752L;
    private long currentWatermark = Long.MIN_VALUE;
    private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
    private transient TimestampedCollector<OUT> collector;
    private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
    private transient ReadWriteContextImpl rwContext;
    private transient ReadOnlyContextImpl rContext;

    public CoBroadcastWithNonKeyedOperator(BroadcastProcessFunction<IN1, IN2, OUT> function, List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
        super(function);
        this.broadcastStateDescriptors = (List)Preconditions.checkNotNull(broadcastStateDescriptors);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
        this.broadcastStates = new HashMap(this.broadcastStateDescriptors.size());
        for (MapStateDescriptor<?, ?> descriptor : this.broadcastStateDescriptors) {
            this.broadcastStates.put(descriptor, this.getOperatorStateBackend().getBroadcastState(descriptor));
        }
        this.rwContext = new ReadWriteContextImpl(this.getExecutionConfig(), (BroadcastProcessFunction)this.userFunction, this.broadcastStates, this.getProcessingTimeService());
        this.rContext = new ReadOnlyContextImpl(this.getExecutionConfig(), (BroadcastProcessFunction)this.userFunction, this.broadcastStates, this.getProcessingTimeService());
    }

    @Override
    public TwoInputSelection firstInputSelection() {
        return TwoInputSelection.ANY;
    }

    @Override
    public TwoInputSelection processElement1(StreamRecord<IN1> element) throws Exception {
        this.collector.setTimestamp(element);
        this.rContext.setElement(element);
        ((BroadcastProcessFunction)this.userFunction).processElement(element.getValue(), this.rContext, this.collector);
        this.rContext.setElement(null);
        return TwoInputSelection.ANY;
    }

    @Override
    public TwoInputSelection processElement2(StreamRecord<IN2> element) throws Exception {
        this.collector.setTimestamp(element);
        this.rwContext.setElement(element);
        ((BroadcastProcessFunction)this.userFunction).processBroadcastElement(element.getValue(), this.rwContext, this.collector);
        this.rwContext.setElement(null);
        return TwoInputSelection.ANY;
    }

    @Override
    public void endInput1() throws Exception {
    }

    @Override
    public void endInput2() throws Exception {
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    private class ReadOnlyContextImpl
    extends BroadcastProcessFunction.ReadOnlyContext {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final ProcessingTimeService timerService;
        private StreamRecord<IN1> element;

        ReadOnlyContextImpl(ExecutionConfig executionConfig, BroadcastProcessFunction<IN1, IN2, OUT> function, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, ProcessingTimeService timerService) {
            this.config = (ExecutionConfig)Preconditions.checkNotNull((Object)executionConfig);
            this.states = (Map)Preconditions.checkNotNull(broadcastStates);
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
        }

        void setElement(StreamRecord<IN1> e) {
            this.element = e;
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState((this.element != null ? 1 : 0) != 0);
            return this.element.hasTimestamp() ? Long.valueOf(this.element.getTimestamp()) : null;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument((outputTag != null ? 1 : 0) != 0, (Object)"OutputTag must not be null.");
            CoBroadcastWithNonKeyedOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.element.getTimestamp()));
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.getCurrentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return CoBroadcastWithNonKeyedOperator.this.currentWatermark;
        }

        @Override
        public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(this.config);
            ReadOnlyBroadcastState state = (ReadOnlyBroadcastState)this.states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }
    }

    private class ReadWriteContextImpl
    extends BroadcastProcessFunction.Context {
        private final ExecutionConfig config;
        private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
        private final ProcessingTimeService timerService;
        private StreamRecord<IN2> element;

        ReadWriteContextImpl(ExecutionConfig executionConfig, BroadcastProcessFunction<IN1, IN2, OUT> function, Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, ProcessingTimeService timerService) {
            this.config = (ExecutionConfig)Preconditions.checkNotNull((Object)executionConfig);
            this.states = (Map)Preconditions.checkNotNull(broadcastStates);
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
        }

        void setElement(StreamRecord<IN2> e) {
            this.element = e;
        }

        @Override
        public Long timestamp() {
            Preconditions.checkState((this.element != null ? 1 : 0) != 0);
            return this.element.getTimestamp();
        }

        @Override
        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
            Preconditions.checkNotNull(stateDescriptor);
            stateDescriptor.initializeSerializerUnlessSet(this.config);
            BroadcastState<?, ?> state = this.states.get(stateDescriptor);
            if (state == null) {
                throw new IllegalArgumentException("The requested state does not exist. Check for typos in your state descriptor, or specify the state descriptor in the datastream.broadcast(...) call if you forgot to register it.");
            }
            return state;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            Preconditions.checkArgument((outputTag != null ? 1 : 0) != 0, (Object)"OutputTag must not be null.");
            CoBroadcastWithNonKeyedOperator.this.output.collect(outputTag, new StreamRecord<X>(value, this.element.getTimestamp()));
        }

        @Override
        public long currentProcessingTime() {
            return this.timerService.getCurrentProcessingTime();
        }

        @Override
        public long currentWatermark() {
            return CoBroadcastWithNonKeyedOperator.this.currentWatermark;
        }
    }
}

