package org.apache.flink.table.temptable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.service.ServiceContext;
import org.apache.flink.service.ServiceDescriptor;
import org.apache.flink.service.UserDefinedService;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/temptable/FlinkTableServiceFunction.class */
public class FlinkTableServiceFunction extends RichParallelSourceFunction<BaseRow> implements StoppableFunction {
    private static final Logger logger = LoggerFactory.getLogger(FlinkTableServiceFunction.class);
    private ServiceDescriptor serviceDescriptor;
    private UserDefinedService service;
    private ExecutorService executorService;
    private volatile boolean closed;

    public FlinkTableServiceFunction(ServiceDescriptor serviceDescriptor) {
        this.serviceDescriptor = serviceDescriptor;
    }

    public void run(SourceFunction.SourceContext<BaseRow> sourceContext) throws Exception {
        this.executorService.submit(() -> {
            this.service.start();
        });
        while (!this.executorService.isTerminated()) {
            try {
                this.executorService.awaitTermination(1L, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info("Table Service is done.");
    }

    public void cancel() {
        try {
            close();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("TableService " + getRuntimeContext().getTaskNameWithSubtasks() + " canceled");
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.service = (UserDefinedService) Class.forName(this.serviceDescriptor.getServiceClassName()).newInstance();
        this.service.setServiceContext(new ServiceContext() { // from class: org.apache.flink.table.temptable.FlinkTableServiceFunction.1
            @Override // org.apache.flink.service.ServiceContext
            public MetricGroup getMetricGroup() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getMetricGroup();
            }

            @Override // org.apache.flink.service.ServiceContext
            public int getNumberOfInstances() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getNumberOfParallelSubtasks();
            }

            @Override // org.apache.flink.service.ServiceContext
            public int getIndexOfCurrentInstance() {
                return FlinkTableServiceFunction.this.getRuntimeContext().getIndexOfThisSubtask();
            }
        });
        this.service.open(this.serviceDescriptor.getConfiguration());
        this.executorService = Executors.newFixedThreadPool(1);
        logger.info("TableService " + getRuntimeContext().getTaskNameWithSubtasks() + " opened");
    }

    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.service.close();
        this.executorService.shutdown();
        super.close();
        logger.info("TableService " + getRuntimeContext().getTaskNameWithSubtasks() + " closed");
        this.closed = true;
    }

    @Override // org.apache.flink.api.common.functions.StoppableFunction
    public void stop() {
        try {
            close();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("TableService " + getRuntimeContext().getTaskNameWithSubtasks() + " stopped");
    }
}
