/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sort;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.PriorityQueue;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedSorter;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.sort.RecordComparator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.util.Collector;

public class SortLimitOperator
extends AbstractStreamOperatorWithMetrics<BaseRow>
implements OneInputStreamOperator<BaseRow, BaseRow> {
    private final boolean isGlobal;
    private final long limitStart;
    private final long limitEnd;
    private GeneratedSorter gSorter;
    private transient PriorityQueue<BaseRow> heap;
    private transient Collector<BaseRow> collector;
    private transient RecordComparator comparator;
    private transient AbstractRowSerializer<BaseRow> inputSer;

    public SortLimitOperator(boolean isGlobal, long limitStart, long limitEnd, GeneratedSorter gSorter) {
        this.isGlobal = isGlobal;
        this.limitStart = limitStart;
        this.limitEnd = limitEnd;
        this.gSorter = gSorter;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.inputSer = (AbstractRowSerializer)this.getOperatorConfig().getTypeSerializerIn1(this.getUserCodeClassloader());
        this.comparator = (RecordComparator)CodeGenUtils.compile(this.getContainingTask().getUserCodeClassLoader(), this.gSorter.comparator().name(), this.gSorter.comparator().code()).newInstance();
        this.comparator.init(this.gSorter.serializers(), this.gSorter.comparators());
        this.gSorter = null;
        this.heap = new PriorityQueue<BaseRow>((int)this.limitEnd, new Comparator<BaseRow>(){

            @Override
            public int compare(BaseRow o1, BaseRow o2) {
                return SortLimitOperator.this.comparator.compare(o2, o1);
            }
        });
        this.collector = new StreamRecordCollector<BaseRow>(this.output);
    }

    public void processElement(StreamRecord<BaseRow> element) throws Exception {
        BaseRow record = (BaseRow)element.getValue();
        if ((long)this.heap.size() >= this.limitEnd) {
            BaseRow peek = this.heap.peek();
            if (this.comparator.compare(peek, record) > 0) {
                this.heap.poll();
                this.heap.add(this.inputSer.copy(record));
            }
        } else {
            this.heap.add(this.inputSer.copy(record));
        }
    }

    public void endInput() throws Exception {
        if (this.isGlobal) {
            ArrayList<BaseRow> list = new ArrayList<BaseRow>(this.heap);
            list.sort((o1, o2) -> this.comparator.compare((BaseRow)o1, (BaseRow)o2));
            int maxIndex = (int)Math.min(this.limitEnd, (long)list.size());
            for (int i = (int)this.limitStart; i < maxIndex; ++i) {
                this.collector.collect((BaseRow)list.get(i));
            }
        } else {
            for (BaseRow row2 : this.heap) {
                this.collector.collect(row2);
            }
        }
    }
}

