package org.apache.flink.table.runtime.util;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.runtime.util.InMemoryBuffer;
import org.apache.flink.table.typeutils.AbstractRowSerializer;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer.class */
public class ResettableExternalBuffer implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ResettableExternalBuffer.class);
    static final int READ_BUFFER = 2;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final List<MemorySegment> memory;
    private final BinaryRowSerializer binaryRowSerializer;
    private final InMemoryBuffer inMemoryBuffer;
    private final List<ChannelWithMeta> channelIDs;
    private long spillSize;
    private int segmentSize;
    private long fixedLength;
    private boolean isFixedLength;
    private final List<Integer> numRowsUntilThisChannel;
    private int numRows;
    private int iteratorCount;
    private int resetCount;

    /* loaded from: input_file:org/apache/flink/table/runtime/util/ResettableExternalBuffer$BufferIterator.class */
    public class BufferIterator implements Closeable {
        MutableObjectIterator<BinaryRow> currentIterator;
        List<MemorySegment> freeMemory;
        BlockChannelReader<MemorySegment> fileReader;
        int currentChannelID;
        BinaryRow reuse;
        BinaryRow row;
        int beginRow;
        int nextRow;
        InMemoryBuffer.BufferIterator reusableMemoryIterator;
        int bufferVersion;
        boolean closed;

        private BufferIterator(int i) {
            this.freeMemory = null;
            this.currentChannelID = -1;
            this.reuse = ResettableExternalBuffer.this.binaryRowSerializer.createInstance();
            this.beginRow = Math.min(i, ResettableExternalBuffer.this.numRows);
            this.nextRow = this.beginRow;
            this.bufferVersion = ResettableExternalBuffer.this.resetCount;
            this.closed = false;
            createFreeMemoryIfNeeded();
        }

        private void checkValidity() {
            if (this.closed) {
                throw new RuntimeException("This iterator is closed!");
            }
            if (this.bufferVersion != ResettableExternalBuffer.this.resetCount) {
                throw new RuntimeException("This iterator is no longer valid!");
            }
        }

        public void reset() throws IOException {
            checkValidity();
            resetImpl();
        }

        private void resetImpl() throws IOException {
            closeCurrentFileReader();
            this.nextRow = this.beginRow;
            this.currentChannelID = -1;
            this.currentIterator = null;
            this.row = null;
            this.reuse.unbindMemorySegment();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            try {
                resetImpl();
                if (this.freeMemory != null) {
                    this.freeMemory.clear();
                }
                if (this.reusableMemoryIterator != null) {
                    this.reusableMemoryIterator.close();
                }
                this.closed = true;
                ResettableExternalBuffer.access$410(ResettableExternalBuffer.this);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean hasNext() {
            return this.nextRow < ResettableExternalBuffer.this.numRows;
        }

        public int getBeginRow() {
            return this.beginRow;
        }

        public boolean advanceNext() {
            checkValidity();
            try {
                updateIteratorIfNeeded();
                do {
                    if (this.currentIterator != null) {
                        BinaryRow next = this.currentIterator.next(this.reuse);
                        this.row = next;
                        if (next != null) {
                            this.nextRow++;
                            return true;
                        }
                    }
                } while (nextIterator());
                return false;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean nextIterator() throws IOException {
            if (this.currentChannelID == -1) {
                if (ResettableExternalBuffer.this.isFixedLength) {
                    gotoFixedLengthRow(this.beginRow);
                    return true;
                }
                gotoVariableLengthRow(this.beginRow);
                return true;
            }
            if (this.currentChannelID == Integer.MAX_VALUE) {
                return false;
            }
            if (this.currentChannelID < ResettableExternalBuffer.this.channelIDs.size() - 1) {
                nextSpilledIterator();
                return true;
            }
            newMemoryIterator();
            return true;
        }

        private boolean iteratorNeedsUpdate() {
            int size = ResettableExternalBuffer.this.numRowsUntilThisChannel.size();
            return size > 0 && this.currentChannelID == Integer.MAX_VALUE && this.nextRow <= ((Integer) ResettableExternalBuffer.this.numRowsUntilThisChannel.get(size - 1)).intValue();
        }

        private void updateIteratorIfNeeded() throws IOException {
            createFreeMemoryIfNeeded();
            if (iteratorNeedsUpdate()) {
                this.reuse.unbindMemorySegment();
                this.reusableMemoryIterator = null;
                if (ResettableExternalBuffer.this.isFixedLength) {
                    gotoFixedLengthRow(this.nextRow);
                } else {
                    gotoVariableLengthRow(this.nextRow);
                }
            }
        }

        public BinaryRow getRow() {
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.fileReader != null) {
                this.fileReader.close();
                this.fileReader = null;
            }
        }

        private void gotoFixedLengthRow(int i) throws IOException {
            int upperBound = upperBound(i, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginIndexInChannel = getBeginIndexInChannel(i, upperBound);
            if (i == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            long j = ResettableExternalBuffer.this.segmentSize / ResettableExternalBuffer.this.fixedLength;
            long j2 = ((beginIndexInChannel / j) * ResettableExternalBuffer.this.segmentSize) + ((beginIndexInChannel % j) * ResettableExternalBuffer.this.fixedLength);
            if (upperBound < ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                newSpilledIterator(upperBound, j2);
            } else {
                newMemoryIterator(beginIndexInChannel, j2);
            }
        }

        private void gotoVariableLengthRow(int i) throws IOException {
            int upperBound = upperBound(i, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginIndexInChannel = getBeginIndexInChannel(i, upperBound);
            if (i == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            if (upperBound < ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                newSpilledIterator(upperBound);
            } else {
                newMemoryIterator();
            }
            this.nextRow -= beginIndexInChannel;
            for (int i2 = 0; i2 < beginIndexInChannel; i2++) {
                advanceNext();
            }
        }

        private void nextSpilledIterator() throws IOException {
            newSpilledIterator(this.currentChannelID + 1);
        }

        private void newSpilledIterator(int i) throws IOException {
            newSpilledIterator(i, 0L);
        }

        private void newSpilledIterator(int i, long j) throws IOException {
            ChannelWithMeta channelWithMeta = (ChannelWithMeta) ResettableExternalBuffer.this.channelIDs.get(i);
            this.currentChannelID = i;
            closeCurrentFileReader();
            int i2 = (int) (j / ResettableExternalBuffer.this.segmentSize);
            long j2 = i2 * ResettableExternalBuffer.this.segmentSize;
            this.fileReader = ResettableExternalBuffer.this.ioManager.createBlockChannelReader(channelWithMeta.getChannel());
            if (j > 0) {
                this.fileReader.seekToPosition(j2);
            }
            this.currentIterator = new PagedChannelReaderInputViewIterator(new HeaderlessChannelReaderInputView(this.fileReader, this.freeMemory, channelWithMeta.getBlockCount() - i2, channelWithMeta.getNumBytesInLastBlock(), false, j - j2), null, ResettableExternalBuffer.this.binaryRowSerializer);
        }

        private void newMemoryIterator() throws IOException {
            newMemoryIterator(0, 0L);
        }

        private void newMemoryIterator(int i, long j) throws IOException {
            this.currentChannelID = Integer.MAX_VALUE;
            closeCurrentFileReader();
            if (this.reusableMemoryIterator == null) {
                this.reusableMemoryIterator = ResettableExternalBuffer.this.inMemoryBuffer.newIterator(i, j);
            } else {
                this.reusableMemoryIterator.reset(j);
            }
            this.currentIterator = this.reusableMemoryIterator;
        }

        private int getBeginIndexInChannel(int i, int i2) {
            return i2 > 0 ? i - ((Integer) ResettableExternalBuffer.this.numRowsUntilThisChannel.get(i2 - 1)).intValue() : i;
        }

        public boolean rowInSpill(int i) {
            int size = ResettableExternalBuffer.this.numRowsUntilThisChannel.size();
            return size > 0 && i < ((Integer) ResettableExternalBuffer.this.numRowsUntilThisChannel.get(size - 1)).intValue();
        }

        private void createFreeMemoryIfNeeded() {
            if (this.freeMemory == null && rowInSpill(this.beginRow)) {
                this.freeMemory = new ArrayList();
                for (int i = 0; i < 2; i++) {
                    this.freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(ResettableExternalBuffer.this.segmentSize));
                }
            }
        }

        private int upperBound(int i, List<Integer> list) {
            if (list.size() == 0) {
                return 0;
            }
            if (list.get(list.size() - 1).intValue() <= i) {
                return list.size();
            }
            int i2 = 0;
            int size = list.size() - 1;
            while (i2 < size) {
                int i3 = (i2 + size) / 2;
                if (list.get(i3).intValue() <= i) {
                    i2 = i3 + 1;
                } else {
                    size = i3;
                }
            }
            return i2;
        }
    }

    public ResettableExternalBuffer(MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list, AbstractRowSerializer abstractRowSerializer) {
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.memory = list;
        if (abstractRowSerializer instanceof BinaryRowSerializer) {
            this.binaryRowSerializer = (BinaryRowSerializer) abstractRowSerializer.duplicate();
        } else {
            this.binaryRowSerializer = new BinaryRowSerializer(abstractRowSerializer.getTypes());
        }
        this.inMemoryBuffer = new InMemoryBuffer(list, abstractRowSerializer);
        this.channelIDs = new ArrayList();
        this.spillSize = 0L;
        this.segmentSize = list.get(0).size();
        this.numRowsUntilThisChannel = new ArrayList();
        this.numRows = 0;
        this.iteratorCount = 0;
        this.resetCount = 0;
        this.isFixedLength = this.binaryRowSerializer.isRowFixedLength();
        if (this.isFixedLength) {
            this.fixedLength = this.binaryRowSerializer.getSerializedRowFixedPartLength();
        }
    }

    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.memory.size() * this.segmentSize;
    }

    private String getRowSize(BaseRow baseRow) {
        return baseRow instanceof BinaryRow ? String.valueOf(((BinaryRow) baseRow).getSizeInBytes()) : "?";
    }

    public void add(BaseRow baseRow) throws IOException {
        if (!this.inMemoryBuffer.write(baseRow)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + getRowSize(baseRow) + ", Buffer: " + memorySize());
            }
            spill();
            if (!this.inMemoryBuffer.write(baseRow)) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + getRowSize(baseRow) + ", Buffer: " + memorySize());
            }
        }
        this.numRows++;
    }

    private void spill() throws IOException {
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList<MemorySegment> recordBufferSegments = this.inMemoryBuffer.getRecordBufferSegments();
        for (int i = 0; i < numRecordBuffers; i++) {
            try {
                createBlockChannelWriter.writeBlock(recordBufferSegments.get(i));
            } catch (IOException e) {
                createBlockChannelWriter.closeAndDelete();
                throw e;
            }
        }
        LOG.info("here spill the reset buffer data with {} bytes", Long.valueOf(createBlockChannelWriter.getSize()));
        createBlockChannelWriter.close();
        this.spillSize += numRecordBuffers * this.segmentSize;
        this.channelIDs.add(new ChannelWithMeta(createChannel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.numRowsUntilThisChannel.add(Integer.valueOf(this.numRows));
        this.inMemoryBuffer.reset();
    }

    public long getUsedMemoryInBytes() {
        return memorySize() + (this.iteratorCount * 2 * this.segmentSize);
    }

    public int getNumSpillFiles() {
        return this.channelIDs.size();
    }

    public long getSpillInBytes() {
        return this.spillSize;
    }

    public void reset() {
        clearChannels();
        this.inMemoryBuffer.reset();
        this.numRows = 0;
        this.resetCount++;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        clearChannels();
        this.memoryManager.release(this.memory);
        this.inMemoryBuffer.close();
    }

    private void clearChannels() {
        Iterator<ChannelWithMeta> it = this.channelIDs.iterator();
        while (it.hasNext()) {
            File file = new File(it.next().getChannel().getPath());
            if (file.exists()) {
                file.delete();
            }
        }
        this.channelIDs.clear();
        this.spillSize = 0L;
        this.numRowsUntilThisChannel.clear();
    }

    public BufferIterator newIterator() {
        return newIterator(0);
    }

    public BufferIterator newIterator(int i) {
        Preconditions.checkArgument(i >= 0, "`beginRow` can't be negative!");
        this.iteratorCount++;
        return new BufferIterator(i);
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.channelIDs;
    }

    static /* synthetic */ int access$410(ResettableExternalBuffer resettableExternalBuffer) {
        int i = resettableExternalBuffer.iteratorCount;
        resettableExternalBuffer.iteratorCount = i - 1;
        return i;
    }
}
