/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.temptable.rpc;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.Inet4Address;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.service.ServiceInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableServiceRegistry
implements LifeCycleAware {
    private static final Logger LOG = LoggerFactory.getLogger(TableServiceRegistry.class);
    private String ip;
    private ExecutorService executorService;
    private TableServiceRegistryServer server;
    private final int tableServiceInstanceNum;
    private Map<Integer, ServiceInstance> registedServices;

    public String getIp() {
        return this.ip;
    }

    public int getPort() {
        return this.server.getPort();
    }

    public TableServiceRegistry(int tableServiceInstanceNum) {
        this.tableServiceInstanceNum = tableServiceInstanceNum;
    }

    public Map<Integer, ServiceInstance> getRegistedServices() {
        return new HashMap<Integer, ServiceInstance>(this.registedServices);
    }

    @Override
    public void open(Configuration config) throws Exception {
        this.registedServices = new ConcurrentHashMap<Integer, ServiceInstance>();
        this.ip = Inet4Address.getLocalHost().getHostAddress();
        this.server = new TableServiceRegistryServer();
        this.executorService = Executors.newSingleThreadExecutor();
        this.executorService.submit(this.server);
        LOG.info("TableServiceRegistry opened.");
    }

    @Override
    public void close() throws Exception {
        this.server.stop();
        this.executorService.shutdown();
        LOG.info("TableServiceRegistry closed.");
    }

    public boolean isTableServiceReady() {
        if (this.server.hasError()) {
            throw new RuntimeException(this.server.getError());
        }
        LOG.info("check table service ready, expect: " + this.tableServiceInstanceNum + ", found: " + this.registedServices.size());
        return this.registedServices.size() == this.tableServiceInstanceNum;
    }

    private class TableServiceRegistryServer
    implements Runnable {
        private volatile int port = -1;
        private ServerSocket serverSocket;
        private Throwable error;
        private volatile boolean running = true;

        private TableServiceRegistryServer() {
        }

        public int getPort() {
            return this.port;
        }

        @Override
        public void run() {
            LOG.info("TableServiceRegistryServer begin running");
            try {
                this.serverSocket = new ServerSocket(0);
            }
            catch (Exception e2) {
                this.error = e2;
                LOG.error(e2.getMessage(), (Throwable)e2);
                return;
            }
            this.port = this.serverSocket.getLocalPort();
            while (this.running) {
                try {
                    Socket client = this.serverSocket.accept();
                    ObjectInputStream objectInputStream = new ObjectInputStream(new BufferedInputStream(client.getInputStream()));
                    ServiceInstance serviceInstance = (ServiceInstance)objectInputStream.readObject();
                    if (serviceInstance == null) continue;
                    TableServiceRegistry.this.registedServices.put(serviceInstance.getInstanceId(), serviceInstance);
                }
                catch (IOException ie) {
                    if (!this.running) continue;
                    this.error = ie;
                    LOG.error(ie.getMessage(), (Throwable)ie);
                }
                catch (ClassNotFoundException e3) {
                    this.error = e3;
                    LOG.error(e3.getMessage(), (Throwable)e3);
                }
            }
            LOG.info("TableServiceRegistryServer end running.");
        }

        public void stop() {
            this.running = false;
            if (this.serverSocket != null) {
                try {
                    this.serverSocket.close();
                }
                catch (IOException e2) {
                    LOG.error(e2.getMessage(), (Throwable)e2);
                }
            }
        }

        public boolean hasError() {
            return this.error != null;
        }

        public Throwable getError() {
            return this.error;
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

