/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.data;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.python.api.streaming.data.SingleElementPushBackIterator;

public abstract class PythonSender
implements Serializable {
    private static final long serialVersionUID = -2004095650353962110L;
    public static final byte TYPE_ARRAY = 63;
    public static final byte TYPE_KEY_VALUE = 62;
    public static final byte TYPE_VALUE_VALUE = 61;
    private transient RandomAccessFile outputRAF;
    private transient FileChannel outputChannel;
    private transient MappedByteBuffer fileBuffer;
    private final long mappedFileSizeBytes;
    private final Configuration config;

    protected PythonSender(Configuration config) {
        this.config = config;
        this.mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
    }

    public void open(File outputFile) throws IOException {
        outputFile.mkdirs();
        if (outputFile.exists()) {
            outputFile.delete();
        }
        outputFile.createNewFile();
        this.outputRAF = new RandomAccessFile(outputFile, "rw");
        this.outputRAF.setLength(this.mappedFileSizeBytes);
        this.outputRAF.seek(this.mappedFileSizeBytes - 1L);
        this.outputRAF.writeByte(0);
        this.outputRAF.seek(0L);
        this.outputChannel = this.outputRAF.getChannel();
        this.fileBuffer = this.outputChannel.map(FileChannel.MapMode.READ_WRITE, 0L, this.mappedFileSizeBytes);
    }

    public void close() throws IOException {
        this.closeMappedFile();
    }

    private void closeMappedFile() throws IOException {
        this.outputChannel.close();
        this.outputRAF.close();
    }

    protected <IN> int sendBuffer(SingleElementPushBackIterator<IN> input, Serializer<IN> serializer) throws IOException {
        this.fileBuffer.clear();
        while (input.hasNext()) {
            IN value = input.next();
            ByteBuffer bb = serializer.serialize(value);
            if ((long)bb.remaining() > this.mappedFileSizeBytes) {
                throw new RuntimeException("Serialized object does not fit into a single buffer.");
            }
            if (bb.remaining() <= this.fileBuffer.remaining()) {
                this.fileBuffer.put(bb);
                continue;
            }
            input.pushBack(value);
            break;
        }
        int size = this.fileBuffer.position();
        return size;
    }

    protected <IN> Serializer<IN> getSerializer(IN value) {
        if (value instanceof byte[]) {
            return new ArraySerializer();
        }
        if (((Tuple2)value).f0 instanceof byte[]) {
            return new ValuePairSerializer();
        }
        if (((Tuple2)value).f0 instanceof Tuple) {
            return new KeyValuePairSerializer();
        }
        throw new IllegalArgumentException("This object can't be serialized: " + value);
    }

    private static class KeyValuePairSerializer
    extends Serializer<Tuple2<Tuple, byte[]>> {
        private KeyValuePairSerializer() {
        }

        @Override
        public void serializeInternal(Tuple2<Tuple, byte[]> value) {
            int x;
            int keySize = 0;
            for (x = 0; x < ((Tuple)value.f0).getArity(); ++x) {
                keySize += ((byte[])((Tuple)value.f0).getField(x)).length;
            }
            this.buffer = ByteBuffer.allocate(5 + keySize + ((byte[])value.f1).length);
            this.buffer.put((byte)62);
            this.buffer.put((byte)((Tuple)value.f0).getArity());
            for (x = 0; x < ((Tuple)value.f0).getArity(); ++x) {
                this.buffer.put((byte[])((Tuple)value.f0).getField(x));
            }
            this.buffer.put((byte[])value.f1);
        }
    }

    private static class ValuePairSerializer
    extends Serializer<Tuple2<byte[], byte[]>> {
        private ValuePairSerializer() {
        }

        @Override
        public void serializeInternal(Tuple2<byte[], byte[]> value) {
            this.buffer = ByteBuffer.allocate(1 + ((byte[])value.f0).length + ((byte[])value.f1).length);
            this.buffer.put((byte)61);
            this.buffer.put((byte[])value.f0);
            this.buffer.put((byte[])value.f1);
        }
    }

    private static class ArraySerializer
    extends Serializer<byte[]> {
        private ArraySerializer() {
        }

        @Override
        public void serializeInternal(byte[] value) {
            this.buffer = ByteBuffer.allocate(value.length + 1);
            this.buffer.put((byte)63);
            this.buffer.put(value);
        }
    }

    protected static abstract class Serializer<T> {
        protected ByteBuffer buffer;

        protected Serializer() {
        }

        public ByteBuffer serialize(T value) {
            this.serializeInternal(value);
            this.buffer.flip();
            return this.buffer;
        }

        protected abstract void serializeInternal(T var1);
    }
}

