/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.temptable.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.external.ExternalBlockShuffleServiceOptions;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.service.ServiceInstance;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.temptable.TableServiceException;
import org.apache.flink.table.temptable.io.TableServiceSourceInputGate;
import org.apache.flink.table.temptable.rpc.TableServiceClient;
import org.apache.flink.table.temptable.util.BytesUtil;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.types.RowType;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.apache.flink.table.util.TableProperties;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableServiceSourceFunction
extends RichParallelSourceFunction<BaseRow> {
    private final TableProperties tableProperties;
    private final String tableName;
    private final RowType resultType;
    private TableServiceClient tableServiceClient;
    private List<Integer> requestTablePartitions;
    private Configuration globalConfig;
    private BaseRowGenerator baseRowGenerator;
    private int memorySizePerBufferInBytes;
    private static final int MAX_SEGMENT_NUMBER = 192;
    private ExecutorService executorService;
    private static final Logger LOG = LoggerFactory.getLogger(TableServiceSourceFunction.class);

    public TableServiceSourceFunction(TableProperties tableProperties, String tableName, RowType resultType2) {
        this.tableProperties = tableProperties;
        this.tableName = tableName;
        this.resultType = resultType2;
    }

    public void open(Configuration configuration) throws Exception {
        BaseRowSerializer baseRowSerializer = (BaseRowSerializer)DataTypes.createInternalSerializer(this.resultType);
        this.baseRowGenerator = new BaseRowGenerator(baseRowSerializer);
        this.memorySizePerBufferInBytes = configuration.getInteger(ExternalBlockShuffleServiceOptions.MEMORY_SIZE_PER_BUFFER_IN_BYTES);
        this.globalConfig = configuration;
        this.tableServiceClient = new TableServiceClient();
        this.tableServiceClient.open(this.tableProperties);
        this.executorService = Executors.newSingleThreadExecutor();
        this.assignPartitions();
        LOG.info("Table Service Source opened");
    }

    public void close() throws Exception {
        if (this.tableServiceClient != null) {
            this.tableServiceClient.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        LOG.info("Table Service Source closed");
    }

    public void run(SourceFunction.SourceContext<BaseRow> ctx) throws Exception {
        if (this.requestTablePartitions != null) {
            for (Integer partitionIndex : this.requestTablePartitions) {
                NetworkBufferPool networkBufferPool = null;
                TableServiceSourceInputGate inputGate = null;
                NettyConnectionManager nettyConnectionManager = null;
                try {
                    networkBufferPool = new NetworkBufferPool(192, this.memorySizePerBufferInBytes);
                    ConnectionID connectionID = this.createConnectionID(this.tableName, partitionIndex);
                    nettyConnectionManager = this.createConnectionManager(connectionID);
                    nettyConnectionManager.start((ResultPartitionProvider)new ResultPartitionManager(), new TaskEventDispatcher());
                    inputGate = this.createInputGate(this.tableName, partitionIndex, networkBufferPool, connectionID, nettyConnectionManager);
                    inputGate.open(this.globalConfig);
                    this.handleData(ctx, inputGate);
                }
                catch (Exception e2) {
                    LOG.error(e2.getMessage(), (Throwable)e2);
                    throw new TableServiceException(e2);
                }
                finally {
                    if (nettyConnectionManager != null) {
                        nettyConnectionManager.shutdown();
                    }
                    if (inputGate != null) {
                        inputGate.close();
                    }
                    if (networkBufferPool != null) {
                        networkBufferPool.destroyAllBufferPools();
                        networkBufferPool.destroy();
                    }
                }
            }
        }
    }

    private void handleData(SourceFunction.SourceContext<BaseRow> ctx, TableServiceSourceInputGate inputGate) {
        block2: while (true) {
            try {
                block3: while (true) {
                    Optional bufferOrEvent;
                    if (!(bufferOrEvent = inputGate.getNextBufferOrEvent()).isPresent()) {
                        LOG.debug("reach end of InputGate");
                        break block2;
                    }
                    if (((BufferOrEvent)bufferOrEvent.get()).isBuffer()) {
                        LOG.debug("buffer read");
                        this.baseRowGenerator.setBuffer(((BufferOrEvent)bufferOrEvent.get()).getBuffer());
                        while (true) {
                            if (!this.baseRowGenerator.hasNext()) continue block3;
                            BaseRow baseRow = this.baseRowGenerator.getNext();
                            ctx.collect((Object)baseRow);
                        }
                    }
                    if (!((BufferOrEvent)bufferOrEvent.get()).isEvent()) continue;
                    LOG.debug("event read");
                }
            }
            catch (Exception e2) {
                LOG.error(e2.getMessage(), (Throwable)e2);
                new TableServiceException(e2);
                continue;
            }
            break;
        }
    }

    public void cancel() {
    }

    private void assignPartitions() {
        List<Integer> list = this.tableServiceClient.getPartitions(this.tableName);
        if (list == null || list.isEmpty()) {
            throw new TableServiceException(new RuntimeException("Table Cache do not exists."));
        }
        Collections.sort(list);
        int workerCount = this.getRuntimeContext().getNumberOfParallelSubtasks();
        this.requestTablePartitions = new ArrayList<Integer>();
        for (int startIndex = this.getRuntimeContext().getIndexOfThisSubtask(); startIndex < list.size(); startIndex += workerCount) {
            this.requestTablePartitions.add(list.get(startIndex));
        }
    }

    private ConnectionID createConnectionID(String tableName, int partitionIndex) {
        int targetIndex;
        Map<Integer, ServiceInstance> serviceInstanceMap = this.tableServiceClient.getServiceInstanceMap();
        ServiceInstance targetInstance = serviceInstanceMap.get(targetIndex = TableServiceUtil.tablePartitionToIndex(tableName, partitionIndex, serviceInstanceMap.size()));
        if (targetInstance == null) {
            throw new TableServiceException(new RuntimeException("serviceInstanceMap does not contains service instance with instanceId = " + targetIndex));
        }
        Integer port = this.globalConfig.getInteger(ExternalBlockShuffleServiceOptions.FLINK_SHUFFLE_SERVICE_PORT_KEY);
        Preconditions.checkArgument(port != null && port > 0 && port < 65536, "Invalid port number for ExternalBlockShuffleService: " + port);
        ConnectionID connectionID = new ConnectionID(new InetSocketAddress(targetInstance.getServiceIp(), (int)port), 0);
        return connectionID;
    }

    private NettyConnectionManager createConnectionManager(ConnectionID connectionID) {
        NettyConfig nettyConfig = this.createNettyConfig(this.globalConfig, connectionID.getAddress());
        NettyConnectionManager nettyConnectionManager = new NettyConnectionManager(nettyConfig);
        return nettyConnectionManager;
    }

    private TableServiceSourceInputGate createInputGate(String tableName, int partitionIndex, NetworkBufferPool networkBufferPool, ConnectionID connectionID, NettyConnectionManager nettyConnectionManager) throws IOException {
        ResultPartitionID partitionID = TableServiceUtil.tablePartitionToResultPartition(tableName, partitionIndex);
        TableServiceSourceInputGate inputGate = new TableServiceSourceInputGate("TableServiceSource_" + tableName, new IntermediateDataSetID((AbstractID)partitionID.getPartitionId()), 1, this.executorService);
        inputGate.setNetworkProperties(networkBufferPool, 128);
        BufferPool bufferPool = networkBufferPool.createBufferPool(64, 64);
        inputGate.setBufferPool(bufferPool);
        RemoteInputChannel remoteInputChannel = new RemoteInputChannel((SingleInputGate)inputGate, 0, partitionID, connectionID, (ConnectionManager)nettyConnectionManager, 0, 0, (Counter)new SimpleCounter());
        inputGate.setInputChannel(new IntermediateResultPartitionID(partitionID.getPartitionId().getLowerPart(), partitionID.getPartitionId().getUpperPart()), (InputChannel)remoteInputChannel);
        return inputGate;
    }

    public NettyConfig createNettyConfig(Configuration configuration, InetSocketAddress address) {
        return new NettyConfig(address.getAddress(), 0, this.memorySizePerBufferInBytes, 1, configuration);
    }

    private static class BaseRowGenerator {
        private BaseRowSerializer<BaseRow> serializer;
        private byte[] dataBuffer;
        private byte[] headerBuffer = new byte[4];
        private byte[] currentBuffer;
        private int dataBufferOffset;
        private int headerBufferOffset;
        private int currentBufferOffset;
        private int status;
        private int headerExpectBytes = 4;
        private int dataExpectBytes;
        private int headerReadBytes;
        private int dataReadBytes;
        private static final int EMPTY = 0;
        private static final int HEADER_HALF_READ = 1;
        private static final int HEADER_READ = 2;
        private static final int DATA_HALF_READ = 3;
        private static final int DATA_READ = 4;

        public BaseRowGenerator(BaseRowSerializer<BaseRow> serializer) {
            this.serializer = serializer;
        }

        public void setBuffer(Buffer buffer) {
            Preconditions.checkState(this.currentBuffer == null || this.currentBufferOffset == this.currentBuffer.length, "There are some unconsumed bytes");
            int bufferReadSize = buffer.readableBytes();
            this.currentBuffer = new byte[bufferReadSize];
            buffer.asByteBuf().readBytes(this.currentBuffer);
            this.currentBufferOffset = 0;
            buffer.recycleBuffer();
        }

        BaseRow getNext() {
            BaseRow baseRow = BytesUtil.deSerialize(this.dataBuffer, this.dataBuffer.length, this.serializer);
            this.reset();
            return baseRow;
        }

        boolean hasNext() {
            this.tryChangeStatus();
            return this.status == 4;
        }

        private void reset() {
            this.dataBuffer = null;
            this.dataBufferOffset = 0;
            this.dataExpectBytes = 0;
            this.headerBufferOffset = 0;
            this.dataExpectBytes = 0;
            this.dataReadBytes = 0;
            this.headerReadBytes = 0;
            this.status = 0;
        }

        private void tryChangeStatus() {
            while (this.changeStatus()) {
            }
        }

        private int remaining() {
            return this.currentBuffer == null ? 0 : this.currentBuffer.length - this.currentBufferOffset;
        }

        private boolean changeStatus() {
            boolean statusChanged = false;
            switch (this.status) {
                case 0: {
                    if (this.remaining() <= 0) break;
                    int readCount = Math.min(this.remaining(), this.headerExpectBytes);
                    System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.headerBuffer, this.headerBufferOffset, readCount);
                    this.currentBufferOffset += readCount;
                    this.headerBufferOffset += readCount;
                    this.headerReadBytes += readCount;
                    this.status = this.headerReadBytes == this.headerExpectBytes ? 2 : 1;
                    statusChanged = true;
                    break;
                }
                case 1: {
                    if (this.remaining() <= 0) break;
                    int readCount = Math.min(this.remaining(), this.headerExpectBytes - this.headerReadBytes);
                    System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.headerBuffer, this.headerBufferOffset, readCount);
                    this.currentBufferOffset += readCount;
                    this.headerBufferOffset += readCount;
                    this.headerReadBytes += readCount;
                    this.status = this.headerReadBytes == this.headerExpectBytes ? 2 : 1;
                    statusChanged = this.status == 2;
                    break;
                }
                case 2: {
                    this.dataExpectBytes = BytesUtil.bytesToInt(this.headerBuffer);
                    this.dataBuffer = new byte[this.dataExpectBytes];
                    if (this.remaining() <= 0) break;
                    int readCount = Math.min(this.remaining(), this.dataExpectBytes);
                    System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.dataBuffer, this.dataBufferOffset, readCount);
                    this.currentBufferOffset += readCount;
                    this.dataBufferOffset += readCount;
                    this.dataReadBytes += readCount;
                    this.status = this.dataReadBytes == this.dataExpectBytes ? 4 : 3;
                    statusChanged = true;
                    break;
                }
                case 3: {
                    if (this.remaining() <= 0) break;
                    int readCount = Math.min(this.remaining(), this.dataExpectBytes);
                    System.arraycopy(this.currentBuffer, this.currentBufferOffset, this.dataBuffer, this.dataBufferOffset, readCount);
                    this.currentBufferOffset += readCount;
                    this.dataBufferOffset += readCount;
                    this.dataReadBytes += readCount;
                    this.status = this.dataReadBytes == this.dataExpectBytes ? 4 : 3;
                    statusChanged = this.status == 4;
                    break;
                }
                case 4: {
                    break;
                }
                default: {
                    throw new TableServiceException(new RuntimeException("Unsupported status: " + this.status));
                }
            }
            return statusChanged;
        }
    }
}

