package org.apache.flink.table.temptable.rpc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.service.ServiceInstance;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.temptable.TableServiceException;
import org.apache.flink.table.temptable.TableServiceOptions;
import org.apache.flink.table.temptable.util.BytesUtil;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/temptable/rpc/TableServiceClient.class */
public class TableServiceClient implements LifeCycleAware {
    private Logger logger = LoggerFactory.getLogger(TableServiceClient.class);
    private String lastTableName = null;
    private int lastPartitionIndex = -1;
    private Bootstrap bootstrap;
    private ChannelFuture channelFuture;
    private EventLoopGroup eventLoopGroup;
    private TableServiceClientHandler clientHandler;
    private volatile TableServiceBuffer writeBuffer;
    private int writeBufferSize;
    private String tableServiceId;
    private Map<Integer, ServiceInstance> serviceInstanceMap;

    public Map<Integer, ServiceInstance> getServiceInstanceMap() {
        return this.serviceInstanceMap;
    }

    public List<Integer> getPartitions(String str) {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        ArrayList arrayList = new ArrayList();
        if (serviceInstance != null) {
            TableServiceClientHandler tableServiceClientHandler = new TableServiceClientHandler();
            ChannelFuture channelFuture = null;
            try {
                try {
                    channelFuture = getTempBootstrap(tableServiceClientHandler).connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                    List<Integer> partitions = tableServiceClientHandler.getPartitions(str);
                    if (partitions != null && !partitions.isEmpty()) {
                        arrayList.addAll(partitions);
                    }
                    if (channelFuture != null) {
                        try {
                            channelFuture.channel().close().sync();
                        } catch (InterruptedException e) {
                            this.logger.error(e.getMessage(), e);
                        }
                    }
                } catch (Exception e2) {
                    this.logger.error(e2.getMessage(), e2);
                    throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
                }
            } catch (Throwable th) {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    } catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), e3);
                    }
                }
                throw th;
            }
        }
        return arrayList;
    }

    public void unregisterPartitions(String str) {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        if (serviceInstance != null) {
            TableServiceClientHandler tableServiceClientHandler = new TableServiceClientHandler();
            ChannelFuture channelFuture = null;
            try {
                try {
                    channelFuture = getTempBootstrap(tableServiceClientHandler).connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                    tableServiceClientHandler.unregisterPartitions(str);
                    if (channelFuture != null) {
                        try {
                            channelFuture.channel().close().sync();
                        } catch (InterruptedException e) {
                            this.logger.error(e.getMessage(), e);
                        }
                    }
                } catch (Exception e2) {
                    this.logger.error(e2.getMessage(), e2);
                    throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
                }
            } catch (Throwable th) {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    } catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), e3);
                    }
                }
                throw th;
            }
        }
    }

    private Bootstrap getTempBootstrap(final TableServiceClientHandler tableServiceClientHandler) {
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(this.eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.table.temptable.rpc.TableServiceClient.1
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(65536, 0, 4, -4, 4)});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{tableServiceClientHandler});
                }
            });
            return bootstrap;
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
            throw new TableServiceException(new RuntimeException(e.getMessage(), e));
        }
    }

    public void write(String str, int i, BaseRow baseRow, BaseRowSerializer baseRowSerializer) throws Exception {
        ensureConnected(str, i);
        if (this.writeBuffer == null) {
            this.writeBuffer = new TableServiceBuffer(str, i, this.writeBufferSize);
        }
        byte[] serialize = BytesUtil.serialize(baseRow, baseRowSerializer);
        if (this.writeBuffer.getByteBuffer().remaining() >= serialize.length) {
            this.writeBuffer.getByteBuffer().put(serialize);
            return;
        }
        this.writeBuffer.getByteBuffer().flip();
        byte[] bArr = new byte[this.writeBuffer.getByteBuffer().remaining()];
        this.writeBuffer.getByteBuffer().get(bArr);
        this.writeBuffer.getByteBuffer().clear();
        this.clientHandler.write(str, i, bArr);
        this.clientHandler.write(str, i, serialize);
    }

    public void finish(String str, int i) throws Exception {
        ensureConnected(str, i);
        this.clientHandler.finishPartition(str, i);
    }

    public void initializePartition(String str, int i) throws Exception {
        ensureConnected(str, i);
        this.clientHandler.initializePartition(str, i);
    }

    public void deletePartition(String str, int i) throws Exception {
        ensureConnected(str, i);
        this.clientHandler.deletePartition(str, i);
    }

    public void registerPartition(String str, int i) throws Exception {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        if (serviceInstance != null) {
            TableServiceClientHandler tableServiceClientHandler = new TableServiceClientHandler();
            ChannelFuture channelFuture = null;
            try {
                try {
                    channelFuture = getTempBootstrap(tableServiceClientHandler).connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                    tableServiceClientHandler.registerPartition(str, i);
                    if (channelFuture != null) {
                        try {
                            channelFuture.channel().close().sync();
                        } catch (InterruptedException e) {
                            this.logger.error(e.getMessage(), e);
                        }
                    }
                } catch (Exception e2) {
                    this.logger.error(e2.getMessage(), e2);
                    throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
                }
            } catch (Throwable th) {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    } catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), e3);
                    }
                }
                throw th;
            }
        }
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void open(Configuration configuration) throws Exception {
        this.tableServiceId = configuration.getString(TableServiceOptions.TABLE_SERVICE_ID);
        this.logger.info("TableServiceClient open with tableServiceId = " + this.tableServiceId);
        this.writeBufferSize = configuration.getInteger(TableServiceOptions.TABLE_SERVICE_CLIENT_WRITE_BUFFER_SIZE);
        this.eventLoopGroup = new NioEventLoopGroup();
        this.serviceInstanceMap = new HashMap();
        this.serviceInstanceMap.putAll(TableServiceUtil.buildTableServiceInstance(configuration));
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void close() throws Exception {
        flush();
        if (this.channelFuture != null) {
            try {
                this.channelFuture.channel().close().sync();
            } catch (InterruptedException e) {
                this.logger.error(e.getMessage(), e);
            }
        }
        if (this.eventLoopGroup == null || this.eventLoopGroup.isShutdown()) {
            return;
        }
        this.eventLoopGroup.shutdownGracefully();
    }

    public void flush() throws Exception {
        if (this.writeBuffer != null) {
            this.writeBuffer.getByteBuffer().flip();
            if (this.writeBuffer.getByteBuffer().hasRemaining()) {
                ensureConnected(this.writeBuffer.getTableName(), this.writeBuffer.getPartitionIndex());
                byte[] bArr = new byte[this.writeBuffer.getByteBuffer().remaining()];
                this.writeBuffer.getByteBuffer().get(bArr);
                this.clientHandler.write(this.writeBuffer.getTableName(), this.writeBuffer.getPartitionIndex(), bArr);
                this.writeBuffer.getByteBuffer().clear();
            }
        }
    }

    private void ensureConnected(String str, int i) throws Exception {
        if (str.equals(this.lastTableName) && i == this.lastPartitionIndex) {
            return;
        }
        try {
            if (this.bootstrap != null) {
                try {
                    this.channelFuture.channel().close().sync();
                    this.bootstrap = null;
                    this.channelFuture = null;
                } catch (InterruptedException e) {
                    this.logger.error(e.getMessage(), e);
                    this.bootstrap = null;
                    this.channelFuture = null;
                }
            }
            this.lastTableName = str;
            this.lastPartitionIndex = i;
            if (this.serviceInstanceMap == null || this.serviceInstanceMap.isEmpty()) {
                throw new TableServiceException(new RuntimeException("serviceInstanceMap is empty"));
            }
            int tablePartitionToIndex = TableServiceUtil.tablePartitionToIndex(str, i, this.serviceInstanceMap.size());
            ServiceInstance serviceInstance = this.serviceInstanceMap.get(Integer.valueOf(tablePartitionToIndex));
            if (serviceInstance == null) {
                throw new TableServiceException(new RuntimeException("serviceInstanceMap does not contains service instance with instanceId = " + tablePartitionToIndex));
            }
            this.logger.info("build client with ip = " + serviceInstance.getServiceIp() + ", port = " + serviceInstance.getServicePort());
            this.bootstrap = new Bootstrap();
            this.clientHandler = new TableServiceClientHandler();
            this.bootstrap.group(this.eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.table.temptable.rpc.TableServiceClient.2
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(65536, 0, 4, -4, 4)});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{TableServiceClient.this.clientHandler});
                }
            });
            try {
                this.channelFuture = this.bootstrap.connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                this.logger.info("build client end");
            } catch (InterruptedException e2) {
                this.logger.error(e2.getMessage(), e2);
                throw new TableServiceException(e2);
            }
        } catch (Throwable th) {
            this.bootstrap = null;
            this.channelFuture = null;
            throw th;
        }
    }
}
