/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

@Internal
public class RecordWriterOutput<OUT>
implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
    private StreamRecordWriter<StreamElement> recordWriter;
    private final StreamStatusProvider streamStatusProvider;
    private final OutputTag outputTag;
    private final WatermarkGauge watermarkGauge = new WatermarkGauge();

    public RecordWriterOutput(StreamRecordWriter<StreamRecord<OUT>> recordWriter, OutputTag outputTag, StreamStatusProvider streamStatusProvider) {
        Preconditions.checkNotNull(recordWriter);
        this.outputTag = outputTag;
        this.recordWriter = recordWriter;
        this.streamStatusProvider = (StreamStatusProvider)Preconditions.checkNotNull((Object)streamStatusProvider);
    }

    public void collect(StreamRecord<OUT> record) {
        if (this.outputTag != null) {
            return;
        }
        this.pushToRecordWriter(record);
    }

    @Override
    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
        if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
            return;
        }
        this.pushToRecordWriter(record);
    }

    private <X> void pushToRecordWriter(StreamRecord<X> record) {
        try {
            this.recordWriter.emit(record);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void emitWatermark(Watermark mark) {
        this.watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        if (this.streamStatusProvider.getStreamStatus().isActive()) {
            try {
                this.recordWriter.broadcastEmit(mark);
            }
            catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }

    public void emitStreamStatus(StreamStatus streamStatus) {
        try {
            this.recordWriter.broadcastEmit(streamStatus);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        try {
            this.recordWriter.randomEmit(latencyMarker);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
        this.recordWriter.broadcastEvent(event);
    }

    public void flush() throws IOException {
        this.recordWriter.flushAll();
    }

    public void close() {
        this.recordWriter.close();
    }

    @Override
    public Gauge<Long> getWatermarkGauge() {
        return this.watermarkGauge;
    }
}

