/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;

public class TimestampsAndPeriodicWatermarksOperator<T>
extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
implements OneInputStreamOperator<T, T>,
ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.currentWatermark = Long.MIN_VALUE;
        this.watermarkInterval = this.getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval > 0L) {
            long now = this.getProcessingTimeService().getCurrentProcessingTime();
            this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        long newTimestamp = ((AssignerWithPeriodicWatermarks)this.userFunction).extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        this.output.collect(element.replace(element.getValue(), newTimestamp));
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        Watermark newWatermark = ((AssignerWithPeriodicWatermarks)this.userFunction).getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = newWatermark.getTimestamp();
            this.output.emitWatermark(newWatermark);
        }
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.getProcessingTimeService().registerTimer(now + this.watermarkInterval, this);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
        }
    }

    @Override
    public void endInput() throws Exception {
    }

    @Override
    public void close() throws Exception {
        super.close();
        Watermark newWatermark = ((AssignerWithPeriodicWatermarks)this.userFunction).getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = newWatermark.getTimestamp();
            this.output.emitWatermark(newWatermark);
        }
    }
}

