package org.apache.flink.table.temptable.rpc;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Inet4Address;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.service.ServiceInstance;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/temptable/rpc/TableServiceRegistry.class */
public class TableServiceRegistry implements LifeCycleAware {
    private static final Logger LOG = LoggerFactory.getLogger(TableServiceRegistry.class);
    private String ip;
    private ExecutorService executorService;
    private TableServiceRegistryServer server;
    private final int tableServiceInstanceNum;
    private Map<Integer, ServiceInstance> registedServices;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/temptable/rpc/TableServiceRegistry$TableServiceRegistryServer.class */
    public class TableServiceRegistryServer implements Runnable {
        private volatile int port;
        private ServerSocket serverSocket;
        private Throwable error;
        private volatile boolean running;

        private TableServiceRegistryServer() {
            this.port = -1;
            this.running = true;
        }

        public int getPort() {
            return this.port;
        }

        @Override // java.lang.Runnable
        public void run() {
            TableServiceRegistry.LOG.info("TableServiceRegistryServer begin running");
            try {
                this.serverSocket = new ServerSocket(0);
                this.port = this.serverSocket.getLocalPort();
                while (this.running) {
                    try {
                        ServiceInstance serviceInstance = (ServiceInstance) new ObjectInputStream(new BufferedInputStream(this.serverSocket.accept().getInputStream())).readObject();
                        if (serviceInstance != null) {
                            TableServiceRegistry.this.registedServices.put(Integer.valueOf(serviceInstance.getInstanceId()), serviceInstance);
                        }
                    } catch (IOException e) {
                        if (this.running) {
                            this.error = e;
                            TableServiceRegistry.LOG.error(e.getMessage(), e);
                        }
                    } catch (ClassNotFoundException e2) {
                        this.error = e2;
                        TableServiceRegistry.LOG.error(e2.getMessage(), e2);
                    }
                }
                TableServiceRegistry.LOG.info("TableServiceRegistryServer end running.");
            } catch (Exception e3) {
                this.error = e3;
                TableServiceRegistry.LOG.error(e3.getMessage(), e3);
            }
        }

        public void stop() {
            this.running = false;
            if (this.serverSocket != null) {
                try {
                    this.serverSocket.close();
                } catch (IOException e) {
                    TableServiceRegistry.LOG.error(e.getMessage(), e);
                }
            }
        }

        public boolean hasError() {
            return this.error != null;
        }

        public Throwable getError() {
            return this.error;
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    public String getIp() {
        return this.ip;
    }

    public int getPort() {
        return this.server.getPort();
    }

    public TableServiceRegistry(int i) {
        this.tableServiceInstanceNum = i;
    }

    public Map<Integer, ServiceInstance> getRegistedServices() {
        return new HashMap(this.registedServices);
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void open(Configuration configuration) throws Exception {
        this.registedServices = new ConcurrentHashMap();
        this.ip = Inet4Address.getLocalHost().getHostAddress();
        this.server = new TableServiceRegistryServer();
        this.executorService = Executors.newSingleThreadExecutor();
        this.executorService.submit(this.server);
        LOG.info("TableServiceRegistry opened.");
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void close() throws Exception {
        this.server.stop();
        TableServiceUtil.shutdownAndAwaitTermination(this.executorService, 10L);
        LOG.info("TableServiceRegistry closed.");
    }

    public boolean isTableServiceReady() {
        if (this.server.hasError()) {
            throw new RuntimeException(this.server.getError());
        }
        LOG.info("check table service ready, expect: " + this.tableServiceInstanceNum + ", found: " + this.registedServices.size());
        return this.registedServices.size() == this.tableServiceInstanceNum;
    }
}
