package org.apache.flink.table.runtime.bundle;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/bundle/RowtimeDeduplicateOperator.class */
public class RowtimeDeduplicateOperator extends TriggerableOperator<BaseRow, VoidNamespace, BaseRow> implements OneInputStreamOperator<BaseRow, BaseRow> {
    private static final long serialVersionUID = 1;
    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = "numLateRecordsDropped";
    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = "lateRecordsDroppedRate";
    private static final String WATERMARK_LATENCY_METRIC_NAME = "watermarkLatency";
    private final int rowtimeIndex;
    private final BaseRowTypeInfo valueType;
    private transient Collector<BaseRow> collector;
    private transient Map<BaseRow, BaseRow> buffer;
    private transient TimerService timerService;
    private transient KeyedValueState<BaseRow, BaseRow> pkRow;
    private transient TypeSerializer<BaseRow> valueSer;
    private transient Counter numLateRecordsDropped;
    private transient Meter lateRecordsDroppedRate;
    private transient Gauge<Long> watermarkLatency;
    private final long minibatchSize;
    private transient int numOfElements;

    public RowtimeDeduplicateOperator(int i, BaseRowTypeInfo baseRowTypeInfo, long j) {
        Preconditions.checkArgument(i >= 0);
        this.rowtimeIndex = i;
        this.valueType = baseRowTypeInfo;
        this.minibatchSize = j;
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator
    public void open() throws Exception {
        super.open();
        this.buffer = new HashMap();
        this.collector = new StreamRecordCollector(this.output);
        this.numOfElements = 0;
        this.valueSer = this.valueType.createSerializer();
        this.pkRow = getKeyedState(new KeyedValueStateDescriptor("rowState", getKeySerializer(), this.valueSer));
        this.timerService = new SimpleTimerService(getInternalTimerService("deduplicate-timers", VoidNamespaceSerializer.INSTANCE, this));
        this.numLateRecordsDropped = this.metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
        this.lateRecordsDroppedRate = this.metrics.meter(LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME, new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        this.watermarkLatency = this.metrics.gauge(WATERMARK_LATENCY_METRIC_NAME, () -> {
            long currentWatermark = this.timerService.currentWatermark();
            if (currentWatermark < 0) {
                return 0L;
            }
            return Long.valueOf(System.currentTimeMillis() - currentWatermark);
        });
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.numOfElements++;
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        if (baseRow.getLong(this.rowtimeIndex) <= this.timerService.currentWatermark()) {
            this.numLateRecordsDropped.inc();
            this.lateRecordsDroppedRate.markEvent();
            return;
        }
        Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(baseRow));
        BaseRow baseRow2 = (BaseRow) getCurrentKey();
        BaseRow baseRow3 = this.buffer.get(baseRow2);
        if (baseRow3 == null || isFirstRow(baseRow3, baseRow)) {
            this.buffer.put(baseRow2, this.valueSer.copy(baseRow));
            if (this.minibatchSize <= 0 || this.numOfElements <= this.minibatchSize) {
                return;
            }
            finishBundle();
        }
    }

    private void finishBundle() {
        this.numOfElements = 0;
        Map all = this.pkRow.getAll(this.buffer.keySet());
        for (Map.Entry<BaseRow, BaseRow> entry : this.buffer.entrySet()) {
            BaseRow key = entry.getKey();
            deduplicateKeepFirstRow((BaseRow) all.get(key), key, entry.getValue());
        }
        this.buffer.clear();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        finishBundle();
    }

    public void endInput() throws Exception {
    }

    protected Counter getNumLateRecordsDropped() {
        return this.numLateRecordsDropped;
    }

    protected Gauge<Long> getWatermarkLatency() {
        return this.watermarkLatency;
    }

    public boolean requireState() {
        return true;
    }

    private void deduplicateKeepFirstRow(BaseRow baseRow, BaseRow baseRow2, BaseRow baseRow3) {
        if (isFirstRow(baseRow, baseRow3)) {
            this.pkRow.put(baseRow2, baseRow3);
            setCurrentKey(baseRow2);
            if (baseRow != null) {
                this.timerService.deleteEventTimeTimer(baseRow.getLong(this.rowtimeIndex));
            }
            this.timerService.registerEventTimeTimer(baseRow3.getLong(this.rowtimeIndex));
        }
    }

    private boolean isFirstRow(BaseRow baseRow, BaseRow baseRow2) {
        return baseRow == null || baseRow2.getLong(this.rowtimeIndex) < baseRow.getLong(this.rowtimeIndex);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        finishBundle();
        super.processWatermark(watermark);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<BaseRow, VoidNamespace> internalTimer) throws Exception {
        this.collector.collect(this.pkRow.get(internalTimer.getKey()));
    }

    public void onProcessingTime(InternalTimer<BaseRow, VoidNamespace> internalTimer) throws Exception {
    }
}
