package org.apache.flink.table.runtime.operator.sort;

import java.util.Iterator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state2.keyed.KeyedListState;
import org.apache.flink.runtime.state2.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state2.keyed.KeyedValueState;
import org.apache.flink.runtime.state2.keyed.KeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.typeutils.TypeUtils$;
import scala.Predef$;
import scala.reflect.ScalaSignature;

/* compiled from: OnlyRowTimeSortOperator.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]a\u0001B\u0001\u0003\u0001E\u0011qc\u00148msJ{w\u000fV5nKN{'\u000f^(qKJ\fGo\u001c:\u000b\u0005\r!\u0011\u0001B:peRT!!\u0002\u0004\u0002\u0011=\u0004XM]1u_JT!a\u0002\u0005\u0002\u000fI,h\u000e^5nK*\u0011\u0011BC\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u00171\tQA\u001a7j].T!!\u0004\b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005y\u0011aA8sO\u000e\u00011C\u0001\u0001\u0013!\t\u0019B#D\u0001\u0003\u0013\t)\"A\u0001\tT_J$()Y:f\u001fB,'/\u0019;pe\"Aq\u0003\u0001BC\u0002\u0013%\u0001$\u0001\u0007j]B,HOU8x)f\u0004X-F\u0001\u001a!\rQRdH\u0007\u00027)\u0011A\u0004C\u0001\nif\u0004X-\u001e;jYNL!AH\u000e\u0003\u001f\t\u000b7/\u001a*poRK\b/Z%oM>\u0004\"\u0001I\u0012\u000e\u0003\u0005R!A\t\u0005\u0002\u0015\u0011\fG/\u00194pe6\fG/\u0003\u0002%C\t9!)Y:f%><\b\u0002\u0003\u0014\u0001\u0005\u0003\u0005\u000b\u0011B\r\u0002\u001b%t\u0007/\u001e;S_^$\u0016\u0010]3!\u0011!A\u0003A!b\u0001\n\u0013I\u0013A\u0003:poRLW.Z%eqV\t!\u0006\u0005\u0002,]5\tAFC\u0001.\u0003\u0015\u00198-\u00197b\u0013\tyCFA\u0002J]RD\u0001\"\r\u0001\u0003\u0002\u0003\u0006IAK\u0001\fe><H/[7f\u0013\u0012D\b\u0005C\u00034\u0001\u0011\u0005A'\u0001\u0004=S:LGO\u0010\u000b\u0004kY:\u0004CA\n\u0001\u0011\u00159\"\u00071\u0001\u001a\u0011\u0015A#\u00071\u0001+\u0011%I\u0004\u00011AA\u0002\u0013%!(A\u0007uS6,G*[:u'R\fG/Z\u000b\u0002wA!AH\u0011# \u001b\u0005i$B\u0001 @\u0003\u0015YW-_3e\u0015\t\u0001\u0015)\u0001\u0004ti\u0006$XM\r\u0006\u0003\u000f)I!aQ\u001f\u0003\u001d-+\u00170\u001a3MSN$8\u000b^1uKB\u0011QIS\u0007\u0002\r*\u0011q\tS\u0001\u0005Y\u0006twMC\u0001J\u0003\u0011Q\u0017M^1\n\u0005-3%\u0001\u0002'p]\u001eD\u0011\"\u0014\u0001A\u0002\u0003\u0007I\u0011\u0002(\u0002#QLW.\u001a'jgR\u001cF/\u0019;f?\u0012*\u0017\u000f\u0006\u0002P%B\u00111\u0006U\u0005\u0003#2\u0012A!\u00168ji\"91\u000bTA\u0001\u0002\u0004Y\u0014a\u0001=%c!1Q\u000b\u0001Q!\nm\na\u0002^5nK2K7\u000f^*uCR,\u0007\u0005\u000b\u0002U/B\u00111\u0006W\u0005\u000332\u0012\u0011\u0002\u001e:b]NLWM\u001c;\t\u0013m\u0003\u0001\u0019!a\u0001\n\u0013a\u0016!\u00067bgR$&/[4hKJLgn\u001a+t'R\fG/Z\u000b\u0002;B!AH\u00181E\u0013\tyVHA\bLKf,GMV1mk\u0016\u001cF/\u0019;f!\t\tG-D\u0001c\u0015\t\u0019\u0017)A\u0003ti\u0006$X-\u0003\u0002fE\niak\\5e\u001d\u0006lWm\u001d9bG\u0016D\u0011b\u001a\u0001A\u0002\u0003\u0007I\u0011\u00025\u000231\f7\u000f\u001e+sS\u001e<WM]5oOR\u001b8\u000b^1uK~#S-\u001d\u000b\u0003\u001f&Dqa\u00154\u0002\u0002\u0003\u0007Q\f\u0003\u0004l\u0001\u0001\u0006K!X\u0001\u0017Y\u0006\u001cH\u000f\u0016:jO\u001e,'/\u001b8h)N\u001cF/\u0019;fA!\u0012!n\u0016\u0005\u0006]\u0002!\te\\\u0001\u0005_B,g\u000eF\u0001P\u0011\u0015\t\b\u0001\"\u0011s\u00039\u0001(o\\2fgN,E.Z7f]R$\"aT:\t\u000bQ\u0004\b\u0019A;\u0002\u0005%t\u0007c\u0001<}?5\tqO\u0003\u0002ys\u0006a1\u000f\u001e:fC6\u0014XmY8sI*\u0011qA\u001f\u0006\u0003w*\t\u0011b\u001d;sK\u0006l\u0017N\\4\n\u0005u<(\u0001D*ue\u0016\fWNU3d_J$\u0007BB@\u0001\t\u0003\n\t!A\u0006p]\u00163XM\u001c;US6,GcA(\u0002\u0004!9\u0011Q\u0001@A\u0002\u0005\u001d\u0011!\u0002;j[\u0016\u0014\bCBA\u0005\u0003'y\u0002-\u0004\u0002\u0002\f)!\u0011QBA\b\u0003%y\u0007/\u001a:bi>\u00148OC\u0002\u0002\u0012i\f1!\u00199j\u0013\u0011\t)\"a\u0003\u0003\u001b%sG/\u001a:oC2$\u0016.\\3s\u0001")
/* loaded from: input_file:org/apache/flink/table/runtime/operator/sort/OnlyRowTimeSortOperator.class */
public class OnlyRowTimeSortOperator extends SortBaseOperator {
    private final BaseRowTypeInfo<BaseRow> inputRowType;
    private final int rowtimeIdx;
    private transient KeyedListState<Long, BaseRow> timeListState;
    private transient KeyedValueState<VoidNamespace, Long> lastTriggeringTsState;

    private BaseRowTypeInfo<BaseRow> inputRowType() {
        return this.inputRowType;
    }

    private int rowtimeIdx() {
        return this.rowtimeIdx;
    }

    private KeyedListState<Long, BaseRow> timeListState() {
        return this.timeListState;
    }

    private void timeListState_$eq(KeyedListState<Long, BaseRow> keyedListState) {
        this.timeListState = keyedListState;
    }

    private KeyedValueState<VoidNamespace, Long> lastTriggeringTsState() {
        return this.lastTriggeringTsState;
    }

    private void lastTriggeringTsState_$eq(KeyedValueState<VoidNamespace, Long> keyedValueState) {
        this.lastTriggeringTsState = keyedValueState;
    }

    @Override // org.apache.flink.table.runtime.operator.sort.SortBaseOperator
    public void open() {
        super.open();
        timeListState_$eq((KeyedListState) getKeyedState(new KeyedListStateDescriptor("timeListState", LongSerializer.INSTANCE, TypeUtils$.MODULE$.createSerializer((TypeInformation<?>) inputRowType()))));
        lastTriggeringTsState_$eq((KeyedValueState) getKeyedState(new KeyedValueStateDescriptor("lastTriggeringTsState", VoidNamespaceSerializer.INSTANCE, LongSerializer.INSTANCE)));
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        long j = baseRow.getLong(rowtimeIdx());
        Long l = (Long) lastTriggeringTsState().get(VoidNamespace.INSTANCE);
        if (l == null || j > Predef$.MODULE$.Long2long(l)) {
            timeListState().add(Predef$.MODULE$.long2Long(j), baseRow);
            timerService().registerEventTimeTimer(j);
        }
    }

    @Override // org.apache.flink.table.runtime.operator.sort.SortBaseOperator
    public void onEventTime(InternalTimer<BaseRow, VoidNamespace> internalTimer) {
        long timestamp = internalTimer.getTimestamp();
        Iterator it = timeListState().iterator(Predef$.MODULE$.long2Long(timestamp));
        if (!it.hasNext()) {
            return;
        }
        do {
            collector().collect(it.next());
        } while (it.hasNext());
        timeListState().remove(Predef$.MODULE$.long2Long(timestamp));
        lastTriggeringTsState().put(VoidNamespace.INSTANCE, Predef$.MODULE$.long2Long(timestamp));
    }

    public OnlyRowTimeSortOperator(BaseRowTypeInfo<BaseRow> baseRowTypeInfo, int i) {
        this.inputRowType = baseRowTypeInfo;
        this.rowtimeIdx = i;
    }
}
