package org.apache.flink.table.util;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.dataformat.BinaryRow;
import org.apache.flink.table.typeutils.BinaryRowSerializer;
import org.apache.flink.table.util.InMemoryBuffer;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/util/ResettableExternalBuffer.class */
public class ResettableExternalBuffer implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ResettableExternalBuffer.class);
    static final int READ_BUFFER = 2;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final List<MemorySegment> memory;
    private final BinaryRowSerializer serializer;
    private final InMemoryBuffer inMemoryBuffer;
    private int segmentSize;
    private long fixedLength;
    private boolean isFixedLength;
    private final Set<BufferIterator> iterators;
    private int numRows = 0;
    private boolean freeMemoryInUse = false;
    private final List<ChannelWithMeta> channelIDs = new ArrayList();
    private final List<Integer> numRowsUntilThisChannel = new ArrayList();

    /* loaded from: input_file:org/apache/flink/table/util/ResettableExternalBuffer$BufferIterator.class */
    public class BufferIterator implements RowIterator<BinaryRow>, Closeable {
        MutableObjectIterator<BinaryRow> currentIterator;
        BlockChannelReader<MemorySegment> fileReader;
        int currentChannelID;
        BinaryRow reuse;
        BinaryRow row;
        int beginRow;
        InMemoryBuffer.BufferIterator reusableMemoryIterator;
        List<MemorySegment> freeMemory;
        boolean usingMemBufFreeMemory;
        int nRows;
        boolean closed;

        private BufferIterator(int i) {
            this.currentChannelID = -1;
            this.reuse = ResettableExternalBuffer.this.serializer.m5305createInstance();
            this.freeMemory = null;
            this.usingMemBufFreeMemory = false;
            this.closed = false;
            this.beginRow = Math.min(i, ResettableExternalBuffer.this.numRows);
            this.nRows = ResettableExternalBuffer.this.numRows;
            if (needReadSpilled()) {
                this.freeMemory = new ArrayList();
                if (!ResettableExternalBuffer.this.freeMemoryInUse) {
                    ArrayList<MemorySegment> freeMemory = ResettableExternalBuffer.this.inMemoryBuffer.getFreeMemory();
                    if (freeMemory.size() > 0) {
                        this.freeMemory.addAll(freeMemory);
                        ResettableExternalBuffer.this.freeMemoryInUse = true;
                        this.usingMemBufFreeMemory = true;
                    }
                }
                for (int size = this.freeMemory.size(); size < 2; size++) {
                    this.freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(ResettableExternalBuffer.this.segmentSize));
                }
            }
        }

        public void reset() throws IOException {
            validateIterator();
            closeCurrentFileReader();
            this.currentChannelID = -1;
            this.currentIterator = null;
            this.reuse.unbindMemorySegment();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            closeImpl();
            ResettableExternalBuffer.this.iterators.remove(this);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void closeImpl() {
            try {
                reset();
                if (this.usingMemBufFreeMemory) {
                    ResettableExternalBuffer.this.freeMemoryInUse = false;
                }
                if (this.freeMemory != null) {
                    this.freeMemory.clear();
                }
                this.closed = true;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private void validateIterator() {
            if (this.nRows != ResettableExternalBuffer.this.numRows) {
                throw new RuntimeException("This buffer has been modified. This iterator is no longer valid.");
            }
            if (this.closed) {
                throw new RuntimeException("This iterator is closed.");
            }
        }

        @Override // org.apache.flink.table.util.RowIterator
        public boolean advanceNext() {
            validateIterator();
            do {
                try {
                    if (this.currentIterator != null) {
                        BinaryRow binaryRow = (BinaryRow) this.currentIterator.next(this.reuse);
                        this.row = binaryRow;
                        if (binaryRow != null) {
                            return true;
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } while (nextIterator());
            return false;
        }

        private boolean nextIterator() throws IOException, RuntimeException {
            if (this.currentChannelID == -1) {
                if (ResettableExternalBuffer.this.isFixedLength) {
                    firstFixedLengthIterator();
                    return true;
                }
                firstVariableLengthIterator();
                return true;
            }
            if (this.currentChannelID == Integer.MAX_VALUE) {
                return false;
            }
            if (this.currentChannelID < ResettableExternalBuffer.this.channelIDs.size() - 1) {
                nextSpilledIterator();
                return true;
            }
            newMemoryIterator();
            return true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.table.util.RowIterator
        public BinaryRow getRow() {
            validateIterator();
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.fileReader != null) {
                this.fileReader.close();
                this.fileReader = null;
            }
        }

        private void firstFixedLengthIterator() throws IOException {
            int upperBound = upperBound(this.beginRow, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginIndexInChannel = getBeginIndexInChannel(this.beginRow, upperBound);
            if (this.beginRow == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            long j = ResettableExternalBuffer.this.segmentSize / ResettableExternalBuffer.this.fixedLength;
            long j2 = ((beginIndexInChannel / j) * ResettableExternalBuffer.this.segmentSize) + ((beginIndexInChannel % j) * ResettableExternalBuffer.this.fixedLength);
            if (upperBound >= ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                newMemoryIterator(beginIndexInChannel, j2);
                return;
            }
            List list = ResettableExternalBuffer.this.channelIDs;
            this.currentChannelID = upperBound;
            newSpilledIterator((ChannelWithMeta) list.get(upperBound), j2);
        }

        private void firstVariableLengthIterator() throws IOException {
            int upperBound = upperBound(this.beginRow, ResettableExternalBuffer.this.numRowsUntilThisChannel);
            int beginIndexInChannel = getBeginIndexInChannel(this.beginRow, upperBound);
            if (this.beginRow == ResettableExternalBuffer.this.numRows) {
                newMemoryIterator(beginIndexInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            if (upperBound >= ResettableExternalBuffer.this.numRowsUntilThisChannel.size()) {
                newMemoryIterator();
                for (int i = 0; i < beginIndexInChannel; i++) {
                    advanceNext();
                }
                return;
            }
            List list = ResettableExternalBuffer.this.channelIDs;
            this.currentChannelID = upperBound;
            newSpilledIterator((ChannelWithMeta) list.get(upperBound));
            for (int i2 = 0; i2 < beginIndexInChannel; i2++) {
                advanceNext();
            }
        }

        private void nextSpilledIterator() throws IOException {
            List list = ResettableExternalBuffer.this.channelIDs;
            int i = this.currentChannelID + 1;
            this.currentChannelID = i;
            newSpilledIterator((ChannelWithMeta) list.get(i));
        }

        private void newSpilledIterator(ChannelWithMeta channelWithMeta) throws IOException {
            newSpilledIterator(channelWithMeta, 0L);
        }

        private void newSpilledIterator(ChannelWithMeta channelWithMeta, long j) throws IOException {
            closeCurrentFileReader();
            int i = (int) (j / ResettableExternalBuffer.this.segmentSize);
            long j2 = i * ResettableExternalBuffer.this.segmentSize;
            this.fileReader = ResettableExternalBuffer.this.ioManager.createBlockChannelReader(channelWithMeta.getChannel());
            if (j > 0) {
                this.fileReader.seekToPosition(j2);
            }
            this.currentIterator = new PagedChannelReaderInputViewIterator(new HeaderlessChannelReaderInputView(this.fileReader, this.freeMemory, channelWithMeta.getBlockCount() - i, channelWithMeta.getNumBytesInLastBlock(), false, j - j2), null, ResettableExternalBuffer.this.serializer.duplicate());
        }

        private void newMemoryIterator() throws IOException {
            newMemoryIterator(0, 0L);
        }

        private void newMemoryIterator(int i, long j) throws IOException {
            this.currentChannelID = HepProgram.MATCH_UNTIL_FIXPOINT;
            closeCurrentFileReader();
            if (this.reusableMemoryIterator == null) {
                this.reusableMemoryIterator = ResettableExternalBuffer.this.inMemoryBuffer.getIterator(i, j);
            } else {
                this.reusableMemoryIterator.reset();
            }
            this.currentIterator = this.reusableMemoryIterator;
        }

        private int getBeginIndexInChannel(int i, int i2) {
            return i2 > 0 ? i - ((Integer) ResettableExternalBuffer.this.numRowsUntilThisChannel.get(i2 - 1)).intValue() : i;
        }

        private boolean needReadSpilled() {
            return upperBound(this.beginRow, ResettableExternalBuffer.this.numRowsUntilThisChannel) < ResettableExternalBuffer.this.numRowsUntilThisChannel.size();
        }

        private int upperBound(int i, List<Integer> list) {
            if (list.size() == 0) {
                return 0;
            }
            if (list.get(list.size() - 1).intValue() <= i) {
                return list.size();
            }
            int i2 = 0;
            int size = list.size() - 1;
            while (i2 < size) {
                int i3 = (i2 + size) / 2;
                if (list.get(i3).intValue() <= i) {
                    i2 = i3 + 1;
                } else {
                    size = i3;
                }
            }
            return i2;
        }
    }

    public ResettableExternalBuffer(MemoryManager memoryManager, IOManager iOManager, List<MemorySegment> list, BinaryRowSerializer binaryRowSerializer) {
        this.memoryManager = memoryManager;
        this.ioManager = iOManager;
        this.memory = list;
        this.serializer = binaryRowSerializer;
        this.inMemoryBuffer = new InMemoryBuffer(list, binaryRowSerializer);
        this.segmentSize = list.get(0).size();
        this.isFixedLength = binaryRowSerializer.isRowFixedLength();
        if (this.isFixedLength) {
            this.fixedLength = binaryRowSerializer.getSerializedRowFixedPartLength();
        }
        this.iterators = new HashSet();
    }

    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.memory.size() * this.segmentSize;
    }

    public void add(BinaryRow binaryRow) throws IOException {
        if (!this.inMemoryBuffer.write(binaryRow)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + binaryRow.getSizeInBytes() + ", Buffer: " + memorySize());
            }
            spill();
            if (!this.inMemoryBuffer.write(binaryRow)) {
                throw new IOException("Record can't be added to a empty InMemoryBuffer! Record size: " + binaryRow.getSizeInBytes() + ", Buffer: " + memorySize());
            }
        }
        this.numRows++;
    }

    private void spill() throws IOException {
        FileIOChannel.ID createChannel = this.ioManager.createChannel();
        BlockChannelWriter createBlockChannelWriter = this.ioManager.createBlockChannelWriter(createChannel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList<MemorySegment> recordBufferSegments = this.inMemoryBuffer.getRecordBufferSegments();
        for (int i = 0; i < numRecordBuffers; i++) {
            try {
                createBlockChannelWriter.writeBlock(recordBufferSegments.get(i));
            } catch (IOException e) {
                createBlockChannelWriter.closeAndDelete();
                throw e;
            }
        }
        LOG.info("here spill the reset buffer data with {} bytes", Long.valueOf(createBlockChannelWriter.getSize()));
        createBlockChannelWriter.close();
        this.channelIDs.add(new ChannelWithMeta(createChannel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.numRowsUntilThisChannel.add(Integer.valueOf(this.numRows));
        this.inMemoryBuffer.reset();
    }

    public void reset() {
        clearChannels();
        this.inMemoryBuffer.reset();
        Iterator<BufferIterator> it = this.iterators.iterator();
        while (it.hasNext()) {
            it.next().closeImpl();
        }
        this.iterators.clear();
        this.numRows = 0;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        clearChannels();
        this.memoryManager.release(this.memory);
        this.inMemoryBuffer.clear();
        Iterator<BufferIterator> it = this.iterators.iterator();
        while (it.hasNext()) {
            it.next().closeImpl();
        }
        this.iterators.clear();
    }

    private void clearChannels() {
        Iterator<ChannelWithMeta> it = this.channelIDs.iterator();
        while (it.hasNext()) {
            File file = new File(it.next().getChannel().getPath());
            if (file.exists()) {
                file.delete();
            }
        }
        this.channelIDs.clear();
        this.numRowsUntilThisChannel.clear();
    }

    public BufferIterator newIterator() {
        return newIterator(0);
    }

    public BufferIterator newIterator(int i) {
        Preconditions.checkArgument(i >= 0, "`beginRow` can't be negative!");
        BufferIterator bufferIterator = new BufferIterator(i);
        this.iterators.add(bufferIterator);
        return bufferIterator;
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.channelIDs;
    }
}
