/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.temptable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.service.ServiceContext;
import org.apache.flink.service.ServiceDescriptor;
import org.apache.flink.service.UserDefinedService;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkTableServiceFunction
extends RichParallelSourceFunction<BaseRow>
implements StoppableFunction {
    private static final Logger logger = LoggerFactory.getLogger(FlinkTableServiceFunction.class);
    private ServiceDescriptor serviceDescriptor;
    private UserDefinedService service;
    private ExecutorService executorService;
    private volatile boolean closed;

    public FlinkTableServiceFunction(ServiceDescriptor serviceDescriptor) {
        this.serviceDescriptor = serviceDescriptor;
    }

    public void run(SourceFunction.SourceContext<BaseRow> ctx) throws Exception {
        this.executorService.submit(() -> this.service.start());
        while (!this.executorService.isTerminated()) {
            try {
                this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (Exception e2) {
                logger.error(e2.getMessage(), (Throwable)e2);
            }
        }
        logger.info("Table Service is done.");
    }

    public void cancel() {
        try {
            this.close();
        }
        catch (Exception e2) {
            logger.error(e2.getMessage(), (Throwable)e2);
        }
        logger.info("TableService " + this.getRuntimeContext().getTaskNameWithSubtasks() + " canceled");
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.service = (UserDefinedService)Class.forName(this.serviceDescriptor.getServiceClassName()).newInstance();
        this.service.setServiceContext(new ServiceContext(){

            @Override
            public MetricGroup getMetricGroup() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getMetricGroup();
            }

            @Override
            public int getNumberOfInstances() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getNumberOfParallelSubtasks();
            }

            @Override
            public int getIndexOfCurrentInstance() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getIndexOfThisSubtask();
            }
        });
        this.service.open(this.serviceDescriptor.getConfiguration());
        this.executorService = Executors.newFixedThreadPool(1);
        logger.info("TableService " + this.getRuntimeContext().getTaskNameWithSubtasks() + " opened");
    }

    public void close() throws Exception {
        if (!this.closed) {
            this.service.close();
            TableServiceUtil.shutdownAndAwaitTermination(this.executorService, 10L);
            super.close();
            logger.info("TableService " + this.getRuntimeContext().getTaskNameWithSubtasks() + " closed");
            this.closed = true;
        }
    }

    @Override
    public void stop() {
        try {
            this.close();
        }
        catch (Exception e2) {
            logger.error(e2.getMessage(), (Throwable)e2);
        }
        logger.info("TableService " + this.getRuntimeContext().getTaskNameWithSubtasks() + " stopped");
    }
}

