/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.serialization;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;

final class SpanningWrapper {
    private final byte[] initialBuffer = new byte[1024];
    private final String[] tempDirs;
    private final Random rnd = new Random();
    private final DataInputDeserializer serializationReadBuffer;
    private final ByteBuffer lengthBuffer;
    private FileChannel spillingChannel;
    private byte[] buffer;
    private int recordLength;
    private int accumulatedRecordBytes;
    private MemorySegment leftOverData;
    private int leftOverStart;
    private int leftOverLimit;
    private File spillFile;
    private DataInputViewStreamWrapper spillFileReader;
    private static final int THRESHOLD_FOR_SPILLING = 0x500000;

    public SpanningWrapper(String[] tempDirs) {
        this.tempDirs = tempDirs;
        this.lengthBuffer = ByteBuffer.allocate(4);
        this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
        this.recordLength = -1;
        this.serializationReadBuffer = new DataInputDeserializer();
        this.buffer = this.initialBuffer;
    }

    protected void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
        this.recordLength = nextRecordLength;
        int numBytesChunk = partial.remaining();
        if (nextRecordLength > 0x500000) {
            this.spillingChannel = this.createSpillingChannel();
            ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
            FileUtils.writeCompletely((WritableByteChannel)this.spillingChannel, (ByteBuffer)toWrite);
        } else {
            this.ensureBufferCapacity(numBytesChunk);
            partial.segment.get(partial.position, this.buffer, 0, numBytesChunk);
        }
        this.accumulatedRecordBytes = numBytesChunk;
    }

    protected void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
        partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
    }

    protected void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException {
        int segmentPosition = offset;
        int segmentRemaining = numBytes;
        if (this.lengthBuffer.position() > 0) {
            int toPut = Math.min(this.lengthBuffer.remaining(), numBytes);
            segment.get(offset, this.lengthBuffer, toPut);
            if (this.lengthBuffer.hasRemaining()) {
                return;
            }
            this.recordLength = this.lengthBuffer.getInt(0);
            this.lengthBuffer.clear();
            segmentPosition += toPut;
            segmentRemaining -= toPut;
            if (this.recordLength > 0x500000) {
                this.spillingChannel = this.createSpillingChannel();
            }
        }
        int needed = this.recordLength - this.accumulatedRecordBytes;
        int toCopy = Math.min(needed, segmentRemaining);
        if (this.spillingChannel != null) {
            ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
            FileUtils.writeCompletely((WritableByteChannel)this.spillingChannel, (ByteBuffer)toWrite);
        } else {
            this.ensureBufferCapacity(this.accumulatedRecordBytes + toCopy);
            segment.get(segmentPosition, this.buffer, this.accumulatedRecordBytes, toCopy);
        }
        this.accumulatedRecordBytes += toCopy;
        if (toCopy < segmentRemaining) {
            this.leftOverData = segment;
            this.leftOverStart = segmentPosition + toCopy;
            this.leftOverLimit = numBytes + offset;
        }
        if (this.accumulatedRecordBytes == this.recordLength) {
            if (this.spillingChannel == null) {
                this.serializationReadBuffer.setBuffer(this.buffer, 0, this.recordLength);
            } else {
                this.spillingChannel.close();
                BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(this.spillFile), 0x200000);
                this.spillFileReader = new DataInputViewStreamWrapper((InputStream)inStream);
            }
        }
    }

    protected void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
        deserializer.clear();
        if (this.leftOverData != null) {
            deserializer.initializeFromMemorySegment(this.leftOverData, this.leftOverStart, this.leftOverLimit);
        }
    }

    protected boolean hasFullRecord() {
        return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
    }

    protected int getNumGatheredBytes() {
        return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : this.lengthBuffer.position());
    }

    public void clear() {
        this.buffer = this.initialBuffer;
        this.serializationReadBuffer.releaseArrays();
        this.recordLength = -1;
        this.lengthBuffer.clear();
        this.leftOverData = null;
        this.accumulatedRecordBytes = 0;
        if (this.spillingChannel != null) {
            try {
                this.spillingChannel.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.spillingChannel = null;
        }
        if (this.spillFileReader != null) {
            try {
                this.spillFileReader.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.spillFileReader = null;
        }
        if (this.spillFile != null) {
            this.spillFile.delete();
            this.spillFile = null;
        }
    }

    public DataInputView getInputView() {
        if (this.spillFileReader == null) {
            return this.serializationReadBuffer;
        }
        return this.spillFileReader;
    }

    private void ensureBufferCapacity(int minLength) {
        if (this.buffer.length < minLength) {
            byte[] newBuffer = new byte[Math.max(minLength, this.buffer.length * 2)];
            System.arraycopy(this.buffer, 0, newBuffer, 0, this.accumulatedRecordBytes);
            this.buffer = newBuffer;
        }
    }

    private FileChannel createSpillingChannel() throws IOException {
        if (this.spillFile != null) {
            throw new IllegalStateException("Spilling file already exists.");
        }
        int maxAttempts = 10;
        for (int attempt = 0; attempt < maxAttempts; ++attempt) {
            String directory = this.tempDirs[this.rnd.nextInt(this.tempDirs.length)];
            this.spillFile = new File(directory, SpanningWrapper.randomString(this.rnd) + ".inputchannel");
            if (!this.spillFile.createNewFile()) continue;
            return new RandomAccessFile(this.spillFile, "rw").getChannel();
        }
        throw new IOException("Could not find a unique file channel name in '" + Arrays.toString(this.tempDirs) + "' for spilling large records during deserialization.");
    }

    private static String randomString(Random random) {
        byte[] bytes = new byte[20];
        random.nextBytes(bytes);
        return StringUtils.byteToHexString((byte[])bytes);
    }
}

