/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.data;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.util.Collector;

public class PythonReceiver<OUT>
implements Serializable {
    private static final long serialVersionUID = -2474088929850009968L;
    private transient RandomAccessFile inputRAF;
    private transient FileChannel inputChannel;
    private transient MappedByteBuffer fileBuffer;
    private final long mappedFileSizeBytes;
    private final boolean readAsByteArray;
    private transient Deserializer<OUT> deserializer;

    public PythonReceiver(Configuration config, boolean usesByteArray) {
        this.readAsByteArray = usesByteArray;
        this.mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
    }

    public void open(File inputFile) throws IOException {
        this.deserializer = this.readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer();
        inputFile.getParentFile().mkdirs();
        if (inputFile.exists()) {
            inputFile.delete();
        }
        inputFile.createNewFile();
        this.inputRAF = new RandomAccessFile(inputFile, "rw");
        this.inputRAF.setLength(this.mappedFileSizeBytes);
        this.inputRAF.seek(this.mappedFileSizeBytes - 1L);
        this.inputRAF.writeByte(0);
        this.inputRAF.seek(0L);
        this.inputChannel = this.inputRAF.getChannel();
        this.fileBuffer = this.inputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, this.mappedFileSizeBytes);
    }

    public void close() throws IOException {
        this.closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.inputChannel.close();
        this.inputRAF.close();
    }

    public void collectBuffer(Collector<OUT> c, int bufferSize) throws IOException {
        this.fileBuffer.position(0);
        while (this.fileBuffer.position() < bufferSize) {
            c.collect(this.deserializer.deserialize());
        }
    }

    public static Tuple createTuple(int size) {
        try {
            return Tuple.getTupleClass(size).newInstance();
        }
        catch (IllegalAccessException | InstantiationException e2) {
            throw new RuntimeException(e2);
        }
    }

    private class TupleDeserializer
    implements Deserializer<Tuple2<Tuple, byte[]>> {
        private TupleDeserializer() {
        }

        @Override
        public Tuple2<Tuple, byte[]> deserialize() {
            int keyTupleSize = PythonReceiver.this.fileBuffer.get();
            Tuple keys = PythonReceiver.createTuple(keyTupleSize);
            for (int x = 0; x < keyTupleSize; ++x) {
                byte[] data = new byte[PythonReceiver.this.fileBuffer.getInt()];
                PythonReceiver.this.fileBuffer.get(data);
                keys.setField(data, x);
            }
            byte[] value = new byte[PythonReceiver.this.fileBuffer.getInt()];
            PythonReceiver.this.fileBuffer.get(value);
            return new Tuple2<Tuple, byte[]>(keys, value);
        }
    }

    private class ByteArrayDeserializer
    implements Deserializer<byte[]> {
        private ByteArrayDeserializer() {
        }

        @Override
        public byte[] deserialize() {
            int size = PythonReceiver.this.fileBuffer.getInt();
            byte[] value = new byte[size];
            PythonReceiver.this.fileBuffer.get(value);
            return value;
        }
    }

    private static interface Deserializer<T> {
        public T deserialize();
    }
}

