package org.apache.flink.table.temptable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.service.ServiceContext;
import org.apache.flink.table.temptable.util.TableServiceUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/temptable/TableServiceImpl.class */
public class TableServiceImpl implements LifeCycleAware, TableService {
    private TableStorage tableStorage;
    private static final Logger logger = LoggerFactory.getLogger(TableServiceImpl.class);
    private ServiceContext serviceContext;
    private TableServiceMetrics tableServiceMetrics;
    private TableMetaManager tableMetaManager;
    private boolean isLeader;

    public TableServiceImpl(ServiceContext serviceContext) {
        this.serviceContext = serviceContext;
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void open(Configuration configuration) {
        logger.info("FlinkTableService begin open.");
        this.tableStorage = new TableStorage();
        this.tableStorage.open(configuration);
        this.tableServiceMetrics = new TableServiceMetrics(this.serviceContext.getMetricGroup());
        this.isLeader = this.serviceContext.getIndexOfCurrentInstance() == 0;
        if (this.isLeader) {
            this.tableMetaManager = new TableMetaManager();
        }
        logger.info("FlinkTableService end open.");
    }

    @Override // org.apache.flink.service.LifeCycleAware
    public void close() {
        logger.info("FlinkTableService begin close.");
        if (this.tableStorage != null) {
            this.tableStorage.close();
        }
        logger.info("FlinkTableService end close.");
    }

    @Override // org.apache.flink.table.temptable.TableService
    public List<Integer> getPartitions(String str) {
        logger.debug("FlinkTableService receive getPartitionCount request");
        List<ResultPartitionID> resultPartitions = this.tableMetaManager.getResultPartitions(str);
        ArrayList arrayList = new ArrayList();
        Iterator<ResultPartitionID> it = resultPartitions.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf((int) it.next().getPartitionId().getUpperPart()));
        }
        return arrayList;
    }

    @Override // org.apache.flink.table.temptable.TableService
    public int write(String str, int i, byte[] bArr) {
        try {
            logger.debug("FlinkTableService receive write request");
            if (bArr != null) {
                this.tableStorage.write(str, i, bArr);
            }
            int length = bArr == null ? 0 : bArr.length;
            this.tableServiceMetrics.getWriteTotalBytesMetrics().inc(length);
            return length;
        } catch (Exception e) {
            logger.debug("FlinkTableService receive write request, but error occurs: " + e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.table.temptable.TableService
    public void delete(String str, int i) throws Exception {
        try {
            logger.debug("FlinkTableService receive delete request");
            this.tableStorage.delete(str, i);
        } catch (Exception e) {
            logger.debug("FlinkTableService receive delete request, but error occurs: " + e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.table.temptable.TableService
    public void initializePartition(String str, int i) throws Exception {
        logger.debug("FlinkTableService receive initial partition request");
        this.tableStorage.initializePartition(str, i);
    }

    @Override // org.apache.flink.table.temptable.TableService
    public void registerPartition(String str, int i) throws Exception {
        logger.debug("FlinkTableService receive register partition request");
        Preconditions.checkState(this.isLeader, "Only leader can handle register partition request, this should not happen");
        this.tableMetaManager.addResultPartition(str, TableServiceUtil.tablePartitionToResultPartition(str, i));
    }

    @Override // org.apache.flink.table.temptable.TableService
    public void unregisterPartition(String str) throws Exception {
        logger.debug("FlinkTableService receive remove partitions request");
        this.tableMetaManager.removeTablePartitions(str);
    }

    @Override // org.apache.flink.table.temptable.TableService
    public void finishPartition(String str, int i) throws Exception {
        logger.debug("FlinkTableService receive finish partition request");
        this.tableStorage.finishPartition(str, i);
    }
}
