/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.bundle;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class RowtimeDeduplicateOperator
extends TriggerableOperator<BaseRow, VoidNamespace, BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1L;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final int rowtimeIndex;
    private final BaseRowTypeInfo valueType;
    private transient Collector<BaseRow> collector;
    private transient Map<BaseRow, BaseRow> buffer;
    private transient TimerService timerService;
    private transient KeyedValueState<BaseRow, BaseRow> pkRow;
    private transient TypeSerializer<BaseRow> valueSer;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;
    private final long minibatchSize;
    private transient int numOfElements;

    public RowtimeDeduplicateOperator(int rowtimeIndex, BaseRowTypeInfo valueType, long minibatchSize) {
        Preconditions.checkArgument(rowtimeIndex >= 0);
        this.rowtimeIndex = rowtimeIndex;
        this.valueType = valueType;
        this.minibatchSize = minibatchSize;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.buffer = new HashMap<BaseRow, BaseRow>();
        this.collector = new StreamRecordCollector<BaseRow>(this.output);
        this.numOfElements = 0;
        this.valueSer = this.valueType.createSerializer();
        KeyedValueStateDescriptor rowStateDesc = new KeyedValueStateDescriptor("rowState", this.getKeySerializer(), this.valueSer);
        this.pkRow = (KeyedValueState)this.getKeyedState((KeyedStateDescriptor)rowStateDesc);
        InternalTimerService<VoidNamespace> internalTimerService = this.getInternalTimerService("deduplicate-timers", VoidNamespaceSerializer.INSTANCE, this);
        this.timerService = new SimpleTimerService(internalTimerService);
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, (Meter)new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long watermark = this.timerService.currentWatermark();
            if (watermark < 0L) {
                return 0L;
            }
            return System.currentTimeMillis() - watermark;
        });
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        ++this.numOfElements;
        BaseRow inputRow = (BaseRow)element.getValue();
        long timestamp = inputRow.getLong(this.rowtimeIndex);
        if (timestamp <= this.timerService.currentWatermark()) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
            return;
        }
        Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(inputRow));
        BaseRow key = (BaseRow)this.getCurrentKey();
        BaseRow value = this.buffer.get(key);
        if (value == null || this.isFirstRow(value, inputRow)) {
            this.buffer.put(key, this.valueSer.copy(inputRow));
            if (this.minibatchSize > 0L && (long)this.numOfElements > this.minibatchSize) {
                this.finishBundle();
            }
        }
    }

    private void finishBundle() {
        this.numOfElements = 0;
        Map pkRowsMap = this.pkRow.getAll(this.buffer.keySet());
        for (Map.Entry<BaseRow, BaseRow> entry : this.buffer.entrySet()) {
            BaseRow currentKey = entry.getKey();
            BaseRow currentRow2 = entry.getValue();
            BaseRow prevRow = (BaseRow)pkRowsMap.get(currentKey);
            this.deduplicateKeepFirstRow(prevRow, currentKey, currentRow2);
        }
        this.buffer.clear();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.finishBundle();
    }

    public void endInput() throws Exception {
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    public boolean requireState() {
        return true;
    }

    private void deduplicateKeepFirstRow(BaseRow prevRow, BaseRow currentKey, BaseRow currentRow2) {
        if (!this.isFirstRow(prevRow, currentRow2)) {
            return;
        }
        this.pkRow.put((Object)currentKey, (Object)currentRow2);
        this.setCurrentKey(currentKey);
        if (prevRow != null) {
            this.timerService.deleteEventTimeTimer(prevRow.getLong(this.rowtimeIndex));
        }
        this.timerService.registerEventTimeTimer(currentRow2.getLong(this.rowtimeIndex));
    }

    private boolean isFirstRow(BaseRow prevRow, BaseRow currentRow2) {
        return prevRow == null || currentRow2.getLong(this.rowtimeIndex) < prevRow.getLong(this.rowtimeIndex);
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.finishBundle();
        super.processWatermark(mark);
    }

    public void onEventTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
        this.collector.collect((BaseRow)this.pkRow.get(timer.getKey()));
    }

    public void onProcessingTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
    }
}

