package org.apache.flink.table.temptable.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceOptions;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.service.ServiceInstance;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.temptable.TableServiceException;
import org.apache.flink.table.temptable.rpc.TableServiceClient;
import org.apache.flink.table.temptable.util.BytesUtil;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.types.RowType;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.util.TableProperties;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/temptable/io/TableServiceSourceFunction.class */
public class TableServiceSourceFunction extends RichParallelSourceFunction<BaseRow> {
    private final TableProperties tableProperties;
    private final String tableName;
    private final RowType resultType;
    private TableServiceClient tableServiceClient;
    private List<Integer> requestTablePartitions;
    private Configuration globalConfig;
    private BaseRowGenerator baseRowGenerator;
    private int memorySizePerBufferInBytes;
    private static final int MAX_SEGMENT_NUMBER = 192;
    private ExecutorService executorService;
    private static final Logger LOG = LoggerFactory.getLogger(TableServiceSourceFunction.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/temptable/io/TableServiceSourceFunction$BaseRowGenerator.class */
    public static class BaseRowGenerator {
        private BaseRowSerializer<BaseRow> serializer;
        private byte[] dataBuffer;
        private byte[] currentBuffer;
        private int dataBufferOffset;
        private int headerBufferOffset;
        private int currentBufferOffset;
        private int status;
        private int dataExpectBytes;
        private int headerReadBytes;
        private int dataReadBytes;
        private static final int EMPTY = 0;
        private static final int HEADER_HALF_READ = 1;
        private static final int HEADER_READ = 2;
        private static final int DATA_HALF_READ = 3;
        private static final int DATA_READ = 4;
        private byte[] headerBuffer = new byte[4];
        private int headerExpectBytes = 4;

        public BaseRowGenerator(BaseRowSerializer<BaseRow> baseRowSerializer) {
            this.serializer = baseRowSerializer;
        }

        public void setBuffer(Buffer buffer) {
            Preconditions.checkState(this.currentBuffer == null || this.currentBufferOffset == this.currentBuffer.length, "There are some unconsumed bytes");
            this.currentBuffer = new byte[buffer.readableBytes()];
            buffer.asByteBuf().readBytes(this.currentBuffer);
            this.currentBufferOffset = 0;
            buffer.recycleBuffer();
        }

        BaseRow getNext() {
            BaseRow deSerialize = BytesUtil.deSerialize(this.dataBuffer, this.dataBuffer.length, this.serializer);
            reset();
            return deSerialize;
        }

        boolean hasNext() {
            tryChangeStatus();
            return this.status == 4;
        }

        private void reset() {
            this.dataBuffer = null;
            this.dataBufferOffset = 0;
            this.dataExpectBytes = 0;
            this.headerBufferOffset = 0;
            this.dataExpectBytes = 0;
            this.dataReadBytes = 0;
            this.headerReadBytes = 0;
            this.status = 0;
        }

        private void tryChangeStatus() {
            do {
            } while (changeStatus());
        }

        private int remaining() {
            if (this.currentBuffer == null) {
                return 0;
            }
            return this.currentBuffer.length - this.currentBufferOffset;
        }

        private boolean changeStatus() {
            boolean z = false;
            switch (this.status) {
                case 0:
                    if (remaining() > 0) {
                        int min = Math.min(remaining(), this.headerExpectBytes);
                        System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.headerBuffer, this.headerBufferOffset, min);
                        this.currentBufferOffset += min;
                        this.headerBufferOffset += min;
                        this.headerReadBytes += min;
                        this.status = this.headerReadBytes == this.headerExpectBytes ? 2 : 1;
                        z = true;
                        break;
                    }
                    break;
                case 1:
                    if (remaining() > 0) {
                        int min2 = Math.min(remaining(), this.headerExpectBytes - this.headerReadBytes);
                        System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.headerBuffer, this.headerBufferOffset, min2);
                        this.currentBufferOffset += min2;
                        this.headerBufferOffset += min2;
                        this.headerReadBytes += min2;
                        this.status = this.headerReadBytes == this.headerExpectBytes ? 2 : 1;
                        z = this.status == 2;
                        break;
                    }
                    break;
                case 2:
                    this.dataExpectBytes = BytesUtil.bytesToInt(this.headerBuffer);
                    this.dataBuffer = new byte[this.dataExpectBytes];
                    if (remaining() > 0) {
                        int min3 = Math.min(remaining(), this.dataExpectBytes);
                        System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.dataBuffer, this.dataBufferOffset, min3);
                        this.currentBufferOffset += min3;
                        this.dataBufferOffset += min3;
                        this.dataReadBytes += min3;
                        this.status = this.dataReadBytes == this.dataExpectBytes ? 4 : 3;
                        z = true;
                        break;
                    }
                    break;
                case 3:
                    if (remaining() > 0) {
                        int min4 = Math.min(remaining(), this.dataExpectBytes);
                        System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.dataBuffer, this.dataBufferOffset, min4);
                        this.currentBufferOffset += min4;
                        this.dataBufferOffset += min4;
                        this.dataReadBytes += min4;
                        this.status = this.dataReadBytes == this.dataExpectBytes ? 4 : 3;
                        z = this.status == 4;
                        break;
                    }
                    break;
                case 4:
                    break;
                default:
                    throw new TableServiceException(new RuntimeException("Unsupported status: " + this.status));
            }
            return z;
        }
    }

    public TableServiceSourceFunction(TableProperties tableProperties, String str, RowType rowType) {
        this.tableProperties = tableProperties;
        this.tableName = str;
        this.resultType = rowType;
    }

    public void open(Configuration configuration) throws Exception {
        this.baseRowGenerator = new BaseRowGenerator((BaseRowSerializer) DataTypes.createInternalSerializer(this.resultType));
        this.memorySizePerBufferInBytes = configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES);
        this.globalConfig = configuration;
        this.tableServiceClient = new TableServiceClient();
        this.tableServiceClient.open(this.tableProperties);
        this.executorService = Executors.newSingleThreadExecutor();
        assignPartitions();
        LOG.info("Table Service Source opened");
    }

    public void close() throws Exception {
        if (this.tableServiceClient != null) {
            this.tableServiceClient.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        LOG.info("Table Service Source closed");
    }

    public void run(SourceFunction.SourceContext<BaseRow> sourceContext) throws Exception {
        if (this.requestTablePartitions != null) {
            for (Integer num : this.requestTablePartitions) {
                NetworkBufferPool networkBufferPool = null;
                TableServiceSourceInputGate tableServiceSourceInputGate = null;
                NettyConnectionManager nettyConnectionManager = null;
                try {
                    try {
                        networkBufferPool = new NetworkBufferPool(192, this.memorySizePerBufferInBytes);
                        ConnectionID createConnectionID = createConnectionID(this.tableName, num.intValue());
                        nettyConnectionManager = createConnectionManager(createConnectionID);
                        nettyConnectionManager.start(new ResultPartitionManager(), new TaskEventDispatcher());
                        tableServiceSourceInputGate = createInputGate(this.tableName, num.intValue(), networkBufferPool, createConnectionID, nettyConnectionManager);
                        tableServiceSourceInputGate.open(this.globalConfig);
                        handleData(sourceContext, tableServiceSourceInputGate);
                        if (nettyConnectionManager != null) {
                            nettyConnectionManager.shutdown();
                        }
                        if (tableServiceSourceInputGate != null) {
                            tableServiceSourceInputGate.close();
                        }
                        if (networkBufferPool != null) {
                            networkBufferPool.destroyAllBufferPools();
                            networkBufferPool.destroy();
                        }
                    } catch (Exception e) {
                        LOG.error(e.getMessage(), e);
                        throw new TableServiceException(e);
                    }
                } catch (Throwable th) {
                    if (nettyConnectionManager != null) {
                        nettyConnectionManager.shutdown();
                    }
                    if (tableServiceSourceInputGate != null) {
                        tableServiceSourceInputGate.close();
                    }
                    if (networkBufferPool != null) {
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
                    }
                    throw th;
                }
            }
        }
    }

    private void handleData(SourceFunction.SourceContext<BaseRow> sourceContext, TableServiceSourceInputGate tableServiceSourceInputGate) {
        Optional nextBufferOrEvent;
        while (true) {
            try {
                nextBufferOrEvent = tableServiceSourceInputGate.getNextBufferOrEvent();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                new TableServiceException(e);
            }
            if (!nextBufferOrEvent.isPresent()) {
                LOG.debug("reach end of InputGate");
                return;
            } else if (((BufferOrEvent) nextBufferOrEvent.get()).isBuffer()) {
                LOG.debug("buffer read");
                this.baseRowGenerator.setBuffer(((BufferOrEvent) nextBufferOrEvent.get()).getBuffer());
                while (this.baseRowGenerator.hasNext()) {
                    sourceContext.collect(this.baseRowGenerator.getNext());
                }
            } else if (((BufferOrEvent) nextBufferOrEvent.get()).isEvent()) {
                LOG.debug("event read");
            }
        }
    }

    public void cancel() {
    }

    private void assignPartitions() {
        List<Integer> partitions = this.tableServiceClient.getPartitions(this.tableName);
        if (partitions == null || partitions.isEmpty()) {
            throw new TableServiceException(new RuntimeException("Table Cache do not exists."));
        }
        Collections.sort(partitions);
        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
        this.requestTablePartitions = new ArrayList();
        for (int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); indexOfThisSubtask < partitions.size(); indexOfThisSubtask += numberOfParallelSubtasks) {
            this.requestTablePartitions.add(partitions.get(indexOfThisSubtask));
        }
    }

    private ConnectionID createConnectionID(String str, int i) {
        Map<Integer, ServiceInstance> serviceInstanceMap = this.tableServiceClient.getServiceInstanceMap();
        int tablePartitionToIndex = TableServiceUtil.tablePartitionToIndex(str, i, serviceInstanceMap.size());
        ServiceInstance serviceInstance = serviceInstanceMap.get(Integer.valueOf(tablePartitionToIndex));
        if (serviceInstance == null) {
            throw new TableServiceException(new RuntimeException("serviceInstanceMap does not contains service instance with instanceId = " + tablePartitionToIndex));
        }
        Integer valueOf = Integer.valueOf(this.globalConfig.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY));
        Preconditions.checkArgument(valueOf != null && valueOf.intValue() > 0 && valueOf.intValue() < 65536, "Invalid port number for ExternalBlockShuffleService: " + valueOf);
        return new ConnectionID(new InetSocketAddress(serviceInstance.getServiceIp(), valueOf.intValue()), 0);
    }

    private NettyConnectionManager createConnectionManager(ConnectionID connectionID) {
        return new NettyConnectionManager(createNettyConfig(this.globalConfig, connectionID.getAddress()));
    }

    private TableServiceSourceInputGate createInputGate(String str, int i, NetworkBufferPool networkBufferPool, ConnectionID connectionID, NettyConnectionManager nettyConnectionManager) throws IOException {
        ResultPartitionID tablePartitionToResultPartition = TableServiceUtil.tablePartitionToResultPartition(str, i);
        TableServiceSourceInputGate tableServiceSourceInputGate = new TableServiceSourceInputGate("TableServiceSource_" + str, new IntermediateDataSetID(tablePartitionToResultPartition.getPartitionId()), 1, this.executorService);
        tableServiceSourceInputGate.setNetworkProperties(networkBufferPool, 128);
        tableServiceSourceInputGate.setBufferPool(networkBufferPool.createBufferPool(64, 64));
        tableServiceSourceInputGate.setInputChannel(new IntermediateResultPartitionID(tablePartitionToResultPartition.getPartitionId().getLowerPart(), tablePartitionToResultPartition.getPartitionId().getUpperPart()), new RemoteInputChannel(tableServiceSourceInputGate, 0, tablePartitionToResultPartition, connectionID, nettyConnectionManager, 0, 0, new SimpleCounter()));
        return tableServiceSourceInputGate;
    }

    public NettyConfig createNettyConfig(Configuration configuration, InetSocketAddress inetSocketAddress) {
        return new NettyConfig(inetSocketAddress.getAddress(), 0, this.memorySizePerBufferInBytes, 1, configuration);
    }
}
