/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.temptable.rpc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.service.ServiceInstance;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.temptable.TableServiceException;
import org.apache.flink.table.temptable.TableServiceOptions;
import org.apache.flink.table.temptable.rpc.TableServiceBuffer;
import org.apache.flink.table.temptable.rpc.TableServiceClientHandler;
import org.apache.flink.table.temptable.util.BytesUtil;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.apache.flink.table.typeutils.BaseRowSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableServiceClient
implements LifeCycleAware {
    private Logger logger = LoggerFactory.getLogger(TableServiceClient.class);
    private String lastTableName = null;
    private int lastPartitionIndex = -1;
    private Bootstrap bootstrap;
    private ChannelFuture channelFuture;
    private EventLoopGroup eventLoopGroup;
    private TableServiceClientHandler clientHandler;
    private volatile TableServiceBuffer writeBuffer;
    private int writeBufferSize;
    private String tableServiceId;
    private Map<Integer, ServiceInstance> serviceInstanceMap;

    public Map<Integer, ServiceInstance> getServiceInstanceMap() {
        return this.serviceInstanceMap;
    }

    public List<Integer> getPartitions(String tableName) {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        ArrayList<Integer> partitions = new ArrayList<Integer>();
        if (serviceInstance != null) {
            TableServiceClientHandler clientHandler = new TableServiceClientHandler();
            Bootstrap bootstrap = this.getTempBootstrap(clientHandler);
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                List<Integer> subPartitions = clientHandler.getPartitions(tableName);
                if (subPartitions != null && !subPartitions.isEmpty()) {
                    partitions.addAll(subPartitions);
                }
            }
            catch (Exception e2) {
                this.logger.error(e2.getMessage(), (Throwable)e2);
                throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
            }
            finally {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    }
                    catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), (Throwable)e3);
                    }
                }
            }
        }
        return partitions;
    }

    public void unregisterPartitions(String tableName) {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        if (serviceInstance != null) {
            TableServiceClientHandler clientHandler = new TableServiceClientHandler();
            Bootstrap bootstrap = this.getTempBootstrap(clientHandler);
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                clientHandler.unregisterPartitions(tableName);
            }
            catch (Exception e2) {
                this.logger.error(e2.getMessage(), (Throwable)e2);
                throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
            }
            finally {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    }
                    catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), (Throwable)e3);
                    }
                }
            }
        }
    }

    private Bootstrap getTempBootstrap(final TableServiceClientHandler handler) {
        Bootstrap bootstrap = new Bootstrap();
        try {
            ((Bootstrap)((Bootstrap)((Bootstrap)bootstrap.group(this.eventLoopGroup)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(65536, 0, 4, -4, 4)});
                    socketChannel.pipeline().addLast(new ChannelHandler[]{handler});
                }
            });
            return bootstrap;
        }
        catch (Exception e2) {
            this.logger.error(e2.getMessage(), (Throwable)e2);
            throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
        }
    }

    public void write(String tableName, int partitionIndex, BaseRow row2, BaseRowSerializer baseRowSerializer) throws Exception {
        this.ensureConnected(tableName, partitionIndex);
        if (this.writeBuffer == null) {
            this.writeBuffer = new TableServiceBuffer(tableName, partitionIndex, this.writeBufferSize);
        }
        byte[] serialized = BytesUtil.serialize(row2, baseRowSerializer);
        if (this.writeBuffer.getByteBuffer().remaining() >= serialized.length) {
            this.writeBuffer.getByteBuffer().put(serialized);
        } else {
            this.writeBuffer.getByteBuffer().flip();
            int remaining = this.writeBuffer.getByteBuffer().remaining();
            byte[] writeBytes = new byte[remaining];
            this.writeBuffer.getByteBuffer().get(writeBytes);
            this.writeBuffer.getByteBuffer().clear();
            this.clientHandler.write(tableName, partitionIndex, writeBytes);
            this.clientHandler.write(tableName, partitionIndex, serialized);
        }
    }

    public void finish(String tableName, int partitionIndex) throws Exception {
        this.ensureConnected(tableName, partitionIndex);
        this.clientHandler.finishPartition(tableName, partitionIndex);
    }

    public void initializePartition(String tableName, int partitionIndex) throws Exception {
        this.ensureConnected(tableName, partitionIndex);
        this.clientHandler.initializePartition(tableName, partitionIndex);
    }

    public void deletePartition(String tableName, int partitionIndex) throws Exception {
        this.ensureConnected(tableName, partitionIndex);
        this.clientHandler.deletePartition(tableName, partitionIndex);
    }

    public void registerPartition(String tableName, int partitionIndex) throws Exception {
        ServiceInstance serviceInstance = this.serviceInstanceMap.get(0);
        if (serviceInstance != null) {
            TableServiceClientHandler clientHandler = new TableServiceClientHandler();
            Bootstrap bootstrap = this.getTempBootstrap(clientHandler);
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.connect(serviceInstance.getServiceIp(), serviceInstance.getServicePort()).sync();
                clientHandler.registerPartition(tableName, partitionIndex);
            }
            catch (Exception e2) {
                this.logger.error(e2.getMessage(), (Throwable)e2);
                throw new TableServiceException(new RuntimeException(e2.getMessage(), e2));
            }
            finally {
                if (channelFuture != null) {
                    try {
                        channelFuture.channel().close().sync();
                    }
                    catch (InterruptedException e3) {
                        this.logger.error(e3.getMessage(), (Throwable)e3);
                    }
                }
            }
        }
    }

    @Override
    public void open(Configuration config) throws Exception {
        this.tableServiceId = config.getString(TableServiceOptions.TABLE_SERVICE_ID);
        this.logger.info("TableServiceClient open with tableServiceId = " + this.tableServiceId);
        this.writeBufferSize = config.getInteger(TableServiceOptions.TABLE_SERVICE_CLIENT_WRITE_BUFFER_SIZE);
        this.eventLoopGroup = new NioEventLoopGroup();
        this.serviceInstanceMap = new HashMap<Integer, ServiceInstance>();
        this.serviceInstanceMap.putAll(TableServiceUtil.buildTableServiceInstance(config));
    }

    @Override
    public void close() throws Exception {
        this.flush();
        if (this.channelFuture != null) {
            try {
                this.channelFuture.channel().close().sync();
            }
            catch (InterruptedException e2) {
                this.logger.error(e2.getMessage(), (Throwable)e2);
            }
        }
        if (this.eventLoopGroup != null && !this.eventLoopGroup.isShutdown()) {
            this.eventLoopGroup.shutdownGracefully();
        }
    }

    public void flush() throws Exception {
        if (this.writeBuffer != null) {
            this.writeBuffer.getByteBuffer().flip();
            if (this.writeBuffer.getByteBuffer().hasRemaining()) {
                this.ensureConnected(this.writeBuffer.getTableName(), this.writeBuffer.getPartitionIndex());
                byte[] writeBytes = new byte[this.writeBuffer.getByteBuffer().remaining()];
                this.writeBuffer.getByteBuffer().get(writeBytes);
                this.clientHandler.write(this.writeBuffer.getTableName(), this.writeBuffer.getPartitionIndex(), writeBytes);
                this.writeBuffer.getByteBuffer().clear();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureConnected(String tableName, int partitionIndex) throws Exception {
        if (tableName.equals(this.lastTableName) && partitionIndex == this.lastPartitionIndex) {
            return;
        }
        if (this.bootstrap != null) {
            try {
                this.channelFuture.channel().close().sync();
            }
            catch (InterruptedException e2) {
                this.logger.error(e2.getMessage(), (Throwable)e2);
            }
            finally {
                this.bootstrap = null;
                this.channelFuture = null;
            }
        }
        this.lastTableName = tableName;
        this.lastPartitionIndex = partitionIndex;
        if (this.serviceInstanceMap == null || this.serviceInstanceMap.isEmpty()) {
            throw new TableServiceException(new RuntimeException("serviceInstanceMap is empty"));
        }
        int pickIndex = TableServiceUtil.tablePartitionToIndex(tableName, partitionIndex, this.serviceInstanceMap.size());
        ServiceInstance pickedServiceInfo = this.serviceInstanceMap.get(pickIndex);
        if (pickedServiceInfo == null) {
            throw new TableServiceException(new RuntimeException("serviceInstanceMap does not contains service instance with instanceId = " + pickIndex));
        }
        this.logger.info("build client with ip = " + pickedServiceInfo.getServiceIp() + ", port = " + pickedServiceInfo.getServicePort());
        this.bootstrap = new Bootstrap();
        this.clientHandler = new TableServiceClientHandler();
        ((Bootstrap)((Bootstrap)((Bootstrap)this.bootstrap.group(this.eventLoopGroup)).channel(NioSocketChannel.class)).option(ChannelOption.TCP_NODELAY, (Object)true)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(65536, 0, 4, -4, 4)});
                socketChannel.pipeline().addLast(new ChannelHandler[]{TableServiceClient.this.clientHandler});
            }
        });
        try {
            this.channelFuture = this.bootstrap.connect(pickedServiceInfo.getServiceIp(), pickedServiceInfo.getServicePort()).sync();
        }
        catch (InterruptedException e3) {
            this.logger.error(e3.getMessage(), (Throwable)e3);
            throw new TableServiceException(e3);
        }
        this.logger.info("build client end");
    }
}

