/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.util;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.ChannelWithMeta;
import org.apache.flink.table.runtime.util.InMemoryBuffer;
import org.apache.flink.table.runtime.util.PagedChannelReaderInputViewIterator;
import org.apache.flink.table.runtime.util.ResettableRowBuffer;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResettableExternalBuffer
implements ResettableRowBuffer<BinaryRow> {
    private static final Logger LOG = LoggerFactory.getLogger(ResettableExternalBuffer.class);
    static final int READ_BUFFER = 2;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final List<MemorySegment> memory;
    private final BinaryRowSerializer binaryRowSerializer;
    private final InMemoryBuffer inMemoryBuffer;
    private final List<ChannelWithMeta> channelIDs;
    private long spillSize;
    private int segmentSize;
    private long fixedLength;
    private boolean isFixedLength;
    private final List<Integer> numRowsUntilThisChannel;
    private int numRows;
    private int iteratorCount;
    private int resetCount;

    public ResettableExternalBuffer(MemoryManager memoryManager, IOManager ioManager, List<MemorySegment> memory, AbstractRowSerializer serializer) {
        this.memoryManager = memoryManager;
        this.ioManager = ioManager;
        this.memory = memory;
        this.binaryRowSerializer = serializer instanceof BinaryRowSerializer ? (BinaryRowSerializer)serializer.duplicate() : new BinaryRowSerializer(serializer.getTypes());
        this.inMemoryBuffer = new InMemoryBuffer(memory, serializer);
        this.channelIDs = new ArrayList<ChannelWithMeta>();
        this.spillSize = 0L;
        this.segmentSize = memory.get(0).size();
        this.numRowsUntilThisChannel = new ArrayList<Integer>();
        this.numRows = 0;
        this.iteratorCount = 0;
        this.resetCount = 0;
        this.isFixedLength = this.binaryRowSerializer.isRowFixedLength();
        if (this.isFixedLength) {
            this.fixedLength = this.binaryRowSerializer.getSerializedRowFixedPartLength();
        }
    }

    @Override
    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.memory.size() * this.segmentSize;
    }

    private String getRowSize(BaseRow row2) {
        if (row2 instanceof BinaryRow) {
            return String.valueOf(((BinaryRow)row2).getSizeInBytes());
        }
        return "?";
    }

    @Override
    public void add(BaseRow row2) throws IOException {
        if (!this.inMemoryBuffer.write(row2)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0L) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + this.getRowSize(row2) + ", Buffer: " + this.memorySize());
            }
            this.spill();
            if (!this.inMemoryBuffer.write(row2)) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + this.getRowSize(row2) + ", Buffer: " + this.memorySize());
            }
        }
        ++this.numRows;
    }

    private void spill() throws IOException {
        FileIOChannel.ID channel = this.ioManager.createChannel();
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList<MemorySegment> segments = this.inMemoryBuffer.getRecordBufferSegments();
        try {
            for (int i = 0; i < numRecordBuffers; ++i) {
                writer.writeBlock((Object)segments.get(i));
            }
            LOG.info("here spill the reset buffer data with {} bytes", (Object)writer.getSize());
            writer.close();
        }
        catch (IOException e2) {
            writer.closeAndDelete();
            throw e2;
        }
        this.spillSize += (long)(numRecordBuffers * this.segmentSize);
        this.channelIDs.add(new ChannelWithMeta(channel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.numRowsUntilThisChannel.add(this.numRows);
        this.inMemoryBuffer.reset();
    }

    public long getUsedMemoryInBytes() {
        return this.memorySize() + this.iteratorCount * 2 * this.segmentSize;
    }

    public int getNumSpillFiles() {
        return this.channelIDs.size();
    }

    public long getSpillInBytes() {
        return this.spillSize;
    }

    @Override
    public void reset() {
        this.clearChannels();
        this.inMemoryBuffer.reset();
        this.numRows = 0;
        ++this.resetCount;
    }

    @Override
    public void close() {
        this.clearChannels();
        this.memoryManager.release(this.memory);
        this.inMemoryBuffer.close();
    }

    private void clearChannels() {
        for (ChannelWithMeta meta : this.channelIDs) {
            File f = new File(meta.getChannel().getPath());
            if (!f.exists()) continue;
            f.delete();
        }
        this.channelIDs.clear();
        this.spillSize = 0L;
        this.numRowsUntilThisChannel.clear();
    }

    public BufferIterator newIterator() {
        return this.newIterator(0);
    }

    public BufferIterator newIterator(int beginRow) {
        Preconditions.checkArgument(beginRow >= 0, "`beginRow` can't be negative!");
        ++this.iteratorCount;
        return new BufferIterator(beginRow);
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.channelIDs;
    }

    public class BufferIterator
    implements ResettableRowBuffer.ResettableIterator<BinaryRow> {
        MutableObjectIterator<BinaryRow> currentIterator;
        List<MemorySegment> freeMemory = null;
        BlockChannelReader<MemorySegment> fileReader;
        int currentChannelID = -1;
        BinaryRow reuse = ResettableExternalBuffer.access$100(ResettableExternalBuffer.this).createInstance();
        BinaryRow row;
        int beginRow;
        int nextRow;
        InMemoryBuffer.BufferIterator reusableMemoryIterator;
        int bufferVersion;
        boolean closed;

        private BufferIterator(int beginRow) {
            this.nextRow = this.beginRow = Math.min(beginRow, ResettableExternalBuffer.this.numRows);
            this.bufferVersion = ResettableExternalBuffer.this.resetCount;
            this.closed = false;
            this.createFreeMemoryIfNeeded();
        }

        private void checkValidity() {
            if (this.closed) {
                throw new RuntimeException("This iterator is closed!");
            }
            if (this.bufferVersion != ResettableExternalBuffer.this.resetCount) {
                throw new RuntimeException("This iterator is no longer valid!");
            }
        }

        @Override
        public void reset() throws IOException {
            this.checkValidity();
            this.resetImpl();
        }

        private void resetImpl() throws IOException {
            this.closeCurrentFileReader();
            this.nextRow = this.beginRow;
            this.currentChannelID = -1;
            this.currentIterator = null;
            this.row = null;
            this.reuse.unbindMemorySegment();
        }

        @Override
        public void close() {
            if (this.closed) {
                return;
            }
            try {
                this.resetImpl();
            }
            catch (IOException e2) {
                throw new RuntimeException(e2);
            }
            if (this.freeMemory != null) {
                this.freeMemory.clear();
            }
            if (this.reusableMemoryIterator != null) {
                this.reusableMemoryIterator.close();
            }
            this.closed = true;
            ResettableExternalBuffer.this.iteratorCount--;
        }

        public boolean hasNext() {
            return this.nextRow < ResettableExternalBuffer.this.numRows;
        }

        public int getBeginRow() {
            return this.beginRow;
        }

        @Override
        public boolean advanceNext() {
            this.checkValidity();
            try {
                this.updateIteratorIfNeeded();
                do {
                    if (this.currentIterator == null || (this.row = this.currentIterator.next(this.reuse)) == null) continue;
                    ++this.nextRow;
                    return true;
                } while (this.nextIterator());
                return false;
            }
            catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }

        private boolean nextIterator() throws IOException {
            if (this.currentChannelID == -1) {
                if (ResettableExternalBuffer.this.isFixedLength) {
                    this.gotoFixedLengthRow(this.beginRow);
                } else {
                    this.gotoVariableLengthRow(this.beginRow);
                }
            } else {
                if (this.currentChannelID == Integer.MAX_VALUE) {
                    return false;
                }
                if (this.currentChannelID < ResettableExternalBuffer.this.channelIDs.size() - 1) {
                    this.nextSpilledIterator();
                } else {
                    this.newMemoryIterator();
                }
            }
            return true;
        }

        private boolean iteratorNeedsUpdate() {
            int size = ResettableExternalBuffer.this.numRowsUntilThisChannel.size();
            return size > 0 && this.currentChannelID == Integer.MAX_VALUE && this.nextRow <= (Integer)ResettableExternalBuffer.this.numRowsUntilThisChannel.get(size - 1);
        }

        private void updateIteratorIfNeeded() throws IOException {
            this.createFreeMemoryIfNeeded();
            if (this.iteratorNeedsUpdate()) {
                this.reuse.unbindMemorySegment();
                this.reusableMemoryIterator = null;
                if (ResettableExternalBuffer.this.isFixedLength) {
                    this.gotoFixedLengthRow(this.nextRow);
                } else {
                    this.gotoVariableLengthRow(this.nextRow);
                }
            }
        }

        @Override
        public BinaryRow getRow() {
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.fileReader != null) {
                this.fileReader.close();
                this.fileReader = null;
            }
        }

        private void gotoFixedLengthRow(int beginRow) throws IOException {
            int beginChannel = this.upperBound(beginRow, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginRowInChannel = this.getBeginIndexInChannel(beginRow, beginChannel);
            if (beginRow == ResettableExternalBuffer.this.numRows) {
                this.newMemoryIterator(beginRowInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            long numRecordsInSegment = (long)ResettableExternalBuffer.this.segmentSize / ResettableExternalBuffer.this.fixedLength;
            long offset = (long)beginRowInChannel / numRecordsInSegment * (long)ResettableExternalBuffer.this.segmentSize + (long)beginRowInChannel % numRecordsInSegment * ResettableExternalBuffer.this.fixedLength;
            if (beginChannel < ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                this.newSpilledIterator(beginChannel, offset);
            } else {
                this.newMemoryIterator(beginRowInChannel, offset);
            }
        }

        private void gotoVariableLengthRow(int beginRow) throws IOException {
            int beginChannel = this.upperBound(beginRow, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginRowInChannel = this.getBeginIndexInChannel(beginRow, beginChannel);
            if (beginRow == ResettableExternalBuffer.this.numRows) {
                this.newMemoryIterator(beginRowInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            if (beginChannel < ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                this.newSpilledIterator(beginChannel);
            } else {
                this.newMemoryIterator();
            }
            this.nextRow -= beginRowInChannel;
            for (int i = 0; i < beginRowInChannel; ++i) {
                this.advanceNext();
            }
        }

        private void nextSpilledIterator() throws IOException {
            this.newSpilledIterator(this.currentChannelID + 1);
        }

        private void newSpilledIterator(int channelID) throws IOException {
            this.newSpilledIterator(channelID, 0L);
        }

        private void newSpilledIterator(int channelID, long offset) throws IOException {
            ChannelWithMeta channel = (ChannelWithMeta)ResettableExternalBuffer.this.channelIDs.get(channelID);
            this.currentChannelID = channelID;
            this.closeCurrentFileReader();
            int segmentNum = (int)(offset / (long)ResettableExternalBuffer.this.segmentSize);
            long seekPosition = segmentNum * ResettableExternalBuffer.this.segmentSize;
            this.fileReader = ResettableExternalBuffer.this.ioManager.createBlockChannelReader(channel.getChannel());
            if (offset > 0L) {
                this.fileReader.seekToPosition(seekPosition);
            }
            HeaderlessChannelReaderInputView inView = new HeaderlessChannelReaderInputView(this.fileReader, this.freeMemory, channel.getBlockCount() - segmentNum, channel.getNumBytesInLastBlock(), false, offset - seekPosition);
            this.currentIterator = new PagedChannelReaderInputViewIterator<BinaryRow>((ChannelReaderInputView)inView, null, ResettableExternalBuffer.this.binaryRowSerializer);
        }

        private void newMemoryIterator() throws IOException {
            this.newMemoryIterator(0, 0L);
        }

        private void newMemoryIterator(int beginRow, long offset) throws IOException {
            this.currentChannelID = Integer.MAX_VALUE;
            this.closeCurrentFileReader();
            if (this.reusableMemoryIterator == null) {
                this.reusableMemoryIterator = ResettableExternalBuffer.this.inMemoryBuffer.newIterator(beginRow, offset);
            } else {
                this.reusableMemoryIterator.reset(offset);
            }
            this.currentIterator = this.reusableMemoryIterator;
        }

        private int getBeginIndexInChannel(int beginRow, int beginChannel) {
            if (beginChannel > 0) {
                return beginRow - (Integer)ResettableExternalBuffer.this.numRowsUntilThisChannel.get(beginChannel - 1);
            }
            return beginRow;
        }

        public boolean rowInSpill(int rowNum) {
            int size = ResettableExternalBuffer.this.numRowsUntilThisChannel.size();
            return size > 0 && rowNum < (Integer)ResettableExternalBuffer.this.numRowsUntilThisChannel.get(size - 1);
        }

        private void createFreeMemoryIfNeeded() {
            if (this.freeMemory == null && this.rowInSpill(this.beginRow)) {
                this.freeMemory = new ArrayList<MemorySegment>();
                for (int i = 0; i < 2; ++i) {
                    this.freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(ResettableExternalBuffer.this.segmentSize));
                }
            }
        }

        private int upperBound(int goal, List<Integer> list) {
            if (list.size() == 0) {
                return 0;
            }
            if (list.get(list.size() - 1) <= goal) {
                return list.size();
            }
            int head = 0;
            int tail = list.size() - 1;
            while (head < tail) {
                int mid = (head + tail) / 2;
                if (list.get(mid) <= goal) {
                    head = mid + 1;
                    continue;
                }
                tail = mid;
            }
            return head;
        }
    }
}

