/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.range;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.sampling.IntermediateSampleData;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.codegen.CodeGenUtils;
import org.apache.flink.table.codegen.GeneratedProjection;
import org.apache.flink.table.codegen.Projection;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.runtime.AbstractStreamOperatorWithMetrics;
import org.apache.flink.table.runtime.range.ReservoirSamplerWithoutReplacement;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;

@Internal
public class LocalSampleOperator
extends AbstractStreamOperatorWithMetrics<IntermediateSampleData<BaseRow>>
implements OneInputStreamOperator<BaseRow, IntermediateSampleData<BaseRow>> {
    private ReservoirSamplerWithoutReplacement sampler;
    private transient Collector<IntermediateSampleData<BaseRow>> collector;
    private GeneratedProjection localSampleProjection;
    private int numSample;

    public LocalSampleOperator(GeneratedProjection localSampleProjection, int numSample) {
        this.numSample = numSample;
        this.localSampleProjection = localSampleProjection;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.collector = new StreamRecordCollector<IntermediateSampleData<BaseRow>>(this.output);
        this.sampler = new ReservoirSamplerWithoutReplacement(this.numSample, System.nanoTime());
        Class buildProjectionClass = CodeGenUtils.compile(this.getUserCodeClassloader(), this.localSampleProjection.name(), this.localSampleProjection.code());
        this.localSampleProjection = null;
        this.sampler.setProjection((Projection)buildProjectionClass.newInstance());
    }

    public void processElement(StreamRecord<BaseRow> streamRecord) throws Exception {
        this.sampler.collectPartitionData((BaseRow)streamRecord.getValue());
    }

    public void endInput() throws Exception {
        Iterator<IntermediateSampleData<BaseRow>> sampled = this.sampler.sample();
        while (sampled.hasNext()) {
            this.collector.collect(sampled.next());
        }
    }
}

