package org.apache.flink.streaming.runtime.streamstatus;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve.class */
public class StatusWatermarkValve {
    private final ValveOutputHandler outputHandler;
    private final InputChannelStatus[] channelStatuses;
    private long lastOutputWatermark;
    private StreamStatus lastOutputStreamStatus;

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve$InputChannelStatus.class */
    public static class InputChannelStatus {
        protected long watermark;
        protected StreamStatus streamStatus;
        protected boolean isWatermarkAligned;

        protected InputChannelStatus() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static boolean hasActiveChannels(InputChannelStatus[] inputChannelStatusArr) {
            for (InputChannelStatus inputChannelStatus : inputChannelStatusArr) {
                if (inputChannelStatus.streamStatus.isActive()) {
                    return true;
                }
            }
            return false;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/streamstatus/StatusWatermarkValve$ValveOutputHandler.class */
    public interface ValveOutputHandler {
        void handleWatermark(Watermark watermark);

        void handleStreamStatus(StreamStatus streamStatus);
    }

    public StatusWatermarkValve(int i, ValveOutputHandler valveOutputHandler) {
        Preconditions.checkArgument(i > 0);
        this.channelStatuses = new InputChannelStatus[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.channelStatuses[i2] = new InputChannelStatus();
            this.channelStatuses[i2].watermark = Long.MIN_VALUE;
            this.channelStatuses[i2].streamStatus = StreamStatus.ACTIVE;
            this.channelStatuses[i2].isWatermarkAligned = true;
        }
        this.outputHandler = (ValveOutputHandler) Preconditions.checkNotNull(valveOutputHandler);
        this.lastOutputWatermark = Long.MIN_VALUE;
        this.lastOutputStreamStatus = StreamStatus.ACTIVE;
    }

    public void inputWatermark(Watermark watermark, int i) {
        if (this.lastOutputStreamStatus.isActive() && this.channelStatuses[i].streamStatus.isActive()) {
            long timestamp = watermark.getTimestamp();
            if (timestamp > this.channelStatuses[i].watermark) {
                this.channelStatuses[i].watermark = timestamp;
                if (!this.channelStatuses[i].isWatermarkAligned && timestamp >= this.lastOutputWatermark) {
                    this.channelStatuses[i].isWatermarkAligned = true;
                }
                findAndOutputNewMinWatermarkAcrossAlignedChannels();
            }
        }
    }

    public void inputStreamStatus(StreamStatus streamStatus, int i) {
        if (!streamStatus.isIdle() || !this.channelStatuses[i].streamStatus.isActive()) {
            if (streamStatus.isActive() && this.channelStatuses[i].streamStatus.isIdle()) {
                this.channelStatuses[i].streamStatus = StreamStatus.ACTIVE;
                if (this.channelStatuses[i].watermark >= this.lastOutputWatermark) {
                    this.channelStatuses[i].isWatermarkAligned = true;
                }
                if (this.lastOutputStreamStatus.isIdle()) {
                    this.lastOutputStreamStatus = StreamStatus.ACTIVE;
                    this.outputHandler.handleStreamStatus(this.lastOutputStreamStatus);
                    return;
                }
                return;
            }
            return;
        }
        this.channelStatuses[i].streamStatus = StreamStatus.IDLE;
        this.channelStatuses[i].isWatermarkAligned = false;
        if (InputChannelStatus.hasActiveChannels(this.channelStatuses)) {
            if (this.channelStatuses[i].watermark == this.lastOutputWatermark) {
                findAndOutputNewMinWatermarkAcrossAlignedChannels();
            }
        } else {
            if (this.channelStatuses[i].watermark == this.lastOutputWatermark) {
                findAndOutputMaxWatermarkAcrossAllChannels();
            }
            this.lastOutputStreamStatus = StreamStatus.IDLE;
            this.outputHandler.handleStreamStatus(this.lastOutputStreamStatus);
        }
    }

    private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
        long j = Long.MAX_VALUE;
        boolean z = false;
        for (InputChannelStatus inputChannelStatus : this.channelStatuses) {
            if (inputChannelStatus.isWatermarkAligned) {
                z = true;
                j = Math.min(inputChannelStatus.watermark, j);
            }
        }
        if (!z || j <= this.lastOutputWatermark) {
            return;
        }
        this.lastOutputWatermark = j;
        this.outputHandler.handleWatermark(new Watermark(this.lastOutputWatermark));
    }

    private void findAndOutputMaxWatermarkAcrossAllChannels() {
        long j = Long.MIN_VALUE;
        for (InputChannelStatus inputChannelStatus : this.channelStatuses) {
            j = Math.max(inputChannelStatus.watermark, j);
        }
        if (j > this.lastOutputWatermark) {
            this.lastOutputWatermark = j;
            this.outputHandler.handleWatermark(new Watermark(this.lastOutputWatermark));
        }
    }

    @VisibleForTesting
    protected InputChannelStatus getInputChannelStatus(int i) {
        Preconditions.checkArgument(i >= 0 && i < this.channelStatuses.length, "Invalid channel index. Number of input channels: " + this.channelStatuses.length);
        return this.channelStatuses[i];
    }
}
