package org.apache.flink.runtime.state.gemini.engine;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContextImpl;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.dbms.SupervisorImpl;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filter.CompositeStateFilter;
import org.apache.flink.runtime.state.gemini.engine.filter.RemoveAllStateFilter;
import org.apache.flink.runtime.state.gemini.engine.filter.TtlStateFilter;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.metrics.CacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.metrics.ExceptionMetrics;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCleanerMetrics;
import org.apache.flink.runtime.state.gemini.engine.metrics.GeminiMetrics;
import org.apache.flink.runtime.state.gemini.engine.metrics.HandlerMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexHashImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotResult;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.runtime.state.gemini.time.ProcessingTimeProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/GeminiDB.class */
public class GeminiDB {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiDB.class);
    private String dbName;
    private Supervisor geminiSupervisor;
    private final Object lock;
    private volatile Status geminiDBStatus;
    private volatile Throwable internalError;
    private final Map<String, GTable> geminiTableMap;
    private GContext gContext;
    private MetricGroup dbMetricGroup;
    private GConfiguration configuration;

    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/GeminiDB$Status.class */
    public enum Status {
        INITIALIZE,
        OPENED,
        CLOSING,
        CLOSED,
        INTERNAL_ERROR
    }

    @VisibleForTesting
    public GeminiDB() {
        this.lock = new Object();
        this.geminiDBStatus = Status.INITIALIZE;
        this.geminiTableMap = new ConcurrentHashMap();
        this.geminiDBStatus = Status.OPENED;
    }

    public GeminiDB(String str, GConfiguration gConfiguration, int i, int i2, MetricGroup metricGroup) {
        this.lock = new Object();
        this.geminiDBStatus = Status.INITIALIZE;
        this.geminiTableMap = new ConcurrentHashMap();
        this.dbName = (String) Preconditions.checkNotNull(str);
        this.configuration = (GConfiguration) Preconditions.checkNotNull(gConfiguration);
        this.gContext = new GContextImpl(this, i, i2, gConfiguration);
        this.gContext.setTimeProvider(new ProcessingTimeProvider());
        int metricSampleCount = gConfiguration.getMetricSampleCount();
        int metricHistogramWindowSize = gConfiguration.getMetricHistogramWindowSize();
        this.dbMetricGroup = metricGroup;
        this.gContext.setDBMetricGroup(this.dbMetricGroup);
        this.gContext.setFileManagerMetricGroup(this.dbMetricGroup.addGroup("fileManager"));
        this.gContext.setGeminiMetric(new GeminiMetrics(this.dbMetricGroup.addGroup(JobDetailsInfo.FIELD_NAME_JOB_STATUS), metricSampleCount, metricHistogramWindowSize));
        this.gContext.setCacheMetric(new CacheMetrics(this.dbMetricGroup.addGroup("cache"), metricSampleCount, metricHistogramWindowSize));
        HandlerMetrics handlerMetrics = new HandlerMetrics(this.dbMetricGroup.addGroup("handler"), metricSampleCount, metricHistogramWindowSize);
        this.gContext.setHandlerMetric(handlerMetrics);
        this.gContext.setFileCacheMetrics(new FileCacheMetrics(this.dbMetricGroup.addGroup("fileCache"), metricSampleCount, metricHistogramWindowSize));
        this.gContext.setFileCleanerMetrics(new FileCleanerMetrics(this.dbMetricGroup.addGroup("fileCleaner"), metricSampleCount, metricHistogramWindowSize));
        this.gContext.setExceptionMetrics(new ExceptionMetrics(this.dbMetricGroup.addGroup(JobExceptionsInfo.ExecutionExceptionInfo.FIELD_NAME_EXCEPTION), metricSampleCount, metricHistogramWindowSize));
        CompositeStateFilter compositeStateFilter = new CompositeStateFilter();
        if (this.gContext.hasTtl()) {
            compositeStateFilter.addStateFilter(new TtlStateFilter());
        }
        compositeStateFilter.addStateFilter(new RemoveAllStateFilter());
        this.gContext.setStateFilter(compositeStateFilter);
        this.geminiSupervisor = new SupervisorImpl(this.gContext);
        this.gContext.setSupervisor(this.geminiSupervisor);
        handlerMetrics.registerMetricsCacheStat(this.geminiSupervisor.getWriteBufferManager());
        LOG.info("GeminiDB is created.");
    }

    public boolean setStatus(Status status, Status status2) {
        synchronized (this.lock) {
            if (status != null) {
                if (this.geminiDBStatus != status) {
                    return false;
                }
            }
            this.geminiDBStatus = status2;
            return true;
        }
    }

    public Status getStatus() {
        return this.geminiDBStatus;
    }

    public Throwable getInternalError() {
        return this.internalError;
    }

    public void setInternalError(Throwable th) {
        if (setStatus(Status.OPENED, Status.INTERNAL_ERROR)) {
            this.internalError = th;
        }
    }

    public synchronized void open() {
        if (!setStatus(Status.INITIALIZE, Status.OPENED)) {
            throw new GeminiRuntimeException("open db failed, current status is " + this.geminiDBStatus.toString());
        }
        this.geminiSupervisor.start();
        LOG.info("GeminiDB is opened");
    }

    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) throws Exception {
        this.geminiSupervisor.startSnapshot(backendSnapshotMeta);
    }

    public Future<DBSnapshotResult> getSnapshotResult(long j) {
        CompletableFuture completableFuture = new CompletableFuture();
        SnapshotManager.PendingSnapshot pendingSnapshot = this.geminiSupervisor.getPendingSnapshot(j);
        pendingSnapshot.getResultFuture().whenCompleteAsync((bool, th) -> {
            if (!this.gContext.isDBNormal()) {
                GeminiRuntimeException geminiRuntimeException = new GeminiRuntimeException("DB is in abnormal status: " + this.geminiDBStatus.name());
                completableFuture.completeExceptionally(geminiRuntimeException);
                this.gContext.getSupervisor().getSnapshotManager().endSnapshot(j, geminiRuntimeException);
                return;
            }
            if (th != null) {
                completableFuture.completeExceptionally(th);
                this.gContext.getSupervisor().getSnapshotManager().endSnapshot(j, th);
                return;
            }
            Throwable th = null;
            DBSnapshotResult dBSnapshotResult = null;
            try {
                dBSnapshotResult = pendingSnapshot.getSnapshotOperation().getSnapshotResult();
                try {
                    this.gContext.getSupervisor().getSnapshotManager().endSnapshot(j, null);
                } catch (Exception e) {
                    th = ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                }
                if (th == null) {
                    completableFuture.complete(dBSnapshotResult);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            } catch (Exception e2) {
                Throwable th2 = e2;
                try {
                    this.gContext.getSupervisor().getSnapshotManager().endSnapshot(j, th2);
                } catch (Exception e3) {
                    th2 = ExceptionUtils.firstOrSuppressed(e3, th2);
                }
                if (th2 == null) {
                    completableFuture.complete(dBSnapshotResult);
                } else {
                    completableFuture.completeExceptionally(th2);
                }
            } catch (Throwable th3) {
                try {
                    this.gContext.getSupervisor().getSnapshotManager().endSnapshot(j, null);
                } catch (Exception e4) {
                    th = ExceptionUtils.firstOrSuppressed(e4, (Throwable) null);
                }
                if (th == null) {
                    completableFuture.complete(dBSnapshotResult);
                } else {
                    completableFuture.completeExceptionally(th);
                }
                throw th3;
            }
        });
        return completableFuture;
    }

    /* JADX WARN: Type inference failed for: r0v58, types: [org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile$Reader, java.io.InputStream] */
    public void restoreFromSnapshot(List<DBSnapshotMeta> list, Map<String, GTable> map, int i, int i2) throws Exception {
        Preconditions.checkNotNull(Boolean.valueOf(list.size() == 1), "not consider rescale currently");
        MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMappingSerializer = getFileMappingSerializer();
        long j = 0;
        int i3 = 0;
        String str = null;
        HashMap hashMap = new HashMap();
        Map<Integer, Map<Integer, Tuple2<Integer, Long>>> hashMap2 = new HashMap();
        boolean z = false;
        int i4 = 0;
        String str2 = null;
        HashMap hashMap3 = null;
        Map<Integer, Map<Integer, Tuple2<Integer, Long>>> map2 = null;
        LOG.info("Start to restore from snapshot for GeminiDB, metas {}, tables {}, region from {} to {}.", new Object[]{list, map, Integer.valueOf(i), Integer.valueOf(i2)});
        for (DBSnapshotMeta dBSnapshotMeta : list) {
            j = dBSnapshotMeta.getCheckPointId();
            ?? reader = SnapshotMetaFile.getReader(new Path(dBSnapshotMeta.getSnapshotMetaPath()));
            Throwable th = null;
            try {
                try {
                    int readInt = reader.readInt();
                    for (int i5 = 0; i5 < readInt; i5++) {
                        String readUTF = reader.readUTF();
                        int readInt2 = reader.readInt();
                        for (int i6 = 0; i6 < readInt2; i6++) {
                            GRegionIDImpl gRegionIDImpl = new GRegionIDImpl(reader.readInt());
                            long readLong = reader.readLong();
                            long readLong2 = reader.readLong();
                            GTable gTable = map.get(readUTF);
                            PageIndex build = new PageIndexHashImpl.Builder(reader, new GRegionContext(this.gContext, readUTF, gRegionIDImpl, gTable.getTableDescription().getPageSerde(), readLong, readLong2)).build();
                            if (build != null) {
                                if (gRegionIDImpl.getIndexID() == 0) {
                                    gTable.setRegion(gRegionIDImpl.getId(), gTable.getTableDescription().createRegion(this.gContext, gTable, gRegionIDImpl, build));
                                } else {
                                    gTable.setIndexRegion(gRegionIDImpl.getId(), gTable.getIndexDescription().createRegion(this.gContext, gTable, gRegionIDImpl, build));
                                }
                            }
                        }
                    }
                    Preconditions.checkState(reader.readBoolean(), "dfs meta should always has file mapping");
                    i3 = reader.readInt();
                    str = reader.readUTF();
                    if (i3 > 0) {
                        for (int i7 = 0; i7 < i3; i7++) {
                            hashMap.put(Integer.valueOf(reader.readInt()), reader.readUTF());
                        }
                        hashMap2 = fileMappingSerializer.deserialize(new DataInputViewStreamWrapper((InputStream) reader));
                    }
                    z = reader.readBoolean();
                    if (z) {
                        i4 = reader.readInt();
                        str2 = reader.readUTF();
                        if (i4 > 0) {
                            hashMap3 = new HashMap();
                            for (int i8 = 0; i8 < i4; i8++) {
                                hashMap3.put(Integer.valueOf(reader.readInt()), reader.readUTF());
                            }
                            map2 = fileMappingSerializer.deserialize(new DataInputViewStreamWrapper((InputStream) reader));
                        }
                    }
                    if (reader != 0) {
                        if (0 != 0) {
                            try {
                                reader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            reader.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (reader != 0) {
                    if (th != null) {
                        try {
                            reader.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        reader.close();
                    }
                }
                throw th3;
            }
        }
        Preconditions.checkNotNull(str);
        SnapshotManager snapshotManager = this.geminiSupervisor.getSnapshotManager();
        snapshotManager.setNeedToBreakLineage(false);
        Map<Long, SnapshotManager.RestoredSnapshot> restore = snapshotManager.restore(j, hashMap, str);
        this.gContext.getSupervisor().getDfsFileManager().restore(mergeDbAndSnapshotFileMeta(i3 == 0 ? new HashMap<>() : getRestoredFileMetaUsedByDB(hashMap, hashMap2, 0 == 0), restore, 0 == 0));
        if (z && i4 > 0) {
            FileManager localFileManager = this.gContext.getSupervisor().getLocalFileManager();
            localFileManager.restore(getRestoredFileMetaUsedByDB(restoreLocalFile(str2, hashMap3, localFileManager.getBasePath()), map2, true));
        }
        LOG.info("Restored successfully from {} for {}, region {} to {}.", new Object[]{list, map, Integer.valueOf(i), Integer.valueOf(i2)});
    }

    public GTable getTableOrCreate(GTableDescription gTableDescription) throws GeminiRuntimeException {
        return this.geminiTableMap.computeIfAbsent(gTableDescription.getTableName(), str -> {
            GTable createTable = gTableDescription.createTable(this.gContext);
            this.geminiSupervisor.getCacheManager().addTable(createTable);
            this.geminiSupervisor.getWriteBufferManager().addTableNum(gTableDescription.getTableName());
            return createTable;
        });
    }

    public void close() {
        setStatus(null, Status.CLOSED);
        this.geminiSupervisor.close();
        LOG.info("GeminiDB is closed");
    }

    public GContext getGContext() {
        return this.gContext;
    }

    public GConfiguration getConfiguration() {
        return this.configuration;
    }

    public Map<String, GTable> getGeminiTableMap() {
        return this.geminiTableMap;
    }

    private MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> getFileMappingSerializer() {
        return new MapSerializer<>(IntSerializer.INSTANCE, new MapSerializer(IntSerializer.INSTANCE, new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, LongSerializer.INSTANCE})));
    }

    private Map<Integer, FileMeta.RestoredFileMeta> getRestoredFileMetaUsedByDB(Map<Integer, String> map, Map<Integer, Map<Integer, Tuple2<Integer, Long>>> map2, boolean z) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> entry : map2.entrySet()) {
            int intValue = entry.getKey().intValue();
            String str = map.get(Integer.valueOf(intValue));
            long j = 0;
            int i = 0;
            for (Map.Entry<Integer, Tuple2<Integer, Long>> entry2 : entry.getValue().entrySet()) {
                i += ((Integer) entry2.getValue().f0).intValue();
                j += ((Long) entry2.getValue().f1).longValue();
            }
            hashMap.put(Integer.valueOf(intValue), FileMeta.RestoredFileMeta.of(intValue, str, 0L, j, i, 0, z));
        }
        return hashMap;
    }

    private Map<Integer, FileMeta.RestoredFileMeta> mergeDbAndSnapshotFileMeta(Map<Integer, FileMeta.RestoredFileMeta> map, Map<Long, SnapshotManager.RestoredSnapshot> map2, boolean z) {
        HashMap hashMap = new HashMap(map);
        Iterator<SnapshotManager.RestoredSnapshot> it = map2.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<Integer, String> entry : it.next().getFileMapping().entrySet()) {
                int intValue = entry.getKey().intValue();
                String value = entry.getValue();
                FileMeta.RestoredFileMeta restoredFileMeta = (FileMeta.RestoredFileMeta) hashMap.get(Integer.valueOf(intValue));
                if (restoredFileMeta != null) {
                    restoredFileMeta.snapshotReference++;
                } else {
                    hashMap.put(Integer.valueOf(intValue), FileMeta.RestoredFileMeta.of(intValue, value, 0L, 0L, 0, 1, z));
                }
            }
        }
        return hashMap;
    }

    private Map<Integer, String> restoreLocalFile(String str, Map<Integer, String> map, Path path) throws Exception {
        HashMap hashMap = new HashMap();
        File file = new File(new Path(str).toUri().getPath());
        File file2 = new File(path.toUri().getPath());
        if (file2.exists()) {
            FileUtils.deleteDirectory(file2);
        }
        if (!file2.mkdirs()) {
            throw new IOException("Local working directory for  already exists: " + file2);
        }
        for (Map.Entry<Integer, String> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            String value = entry.getValue();
            File file3 = new File(file, value);
            File file4 = new File(file2, value);
            try {
                Files.createLink(file4.toPath(), file3.toPath());
                hashMap.put(Integer.valueOf(intValue), file4.getAbsolutePath());
            } catch (Exception e) {
                LOG.error("Fail to create hard link from {} to {}, {}", new Object[]{file3.getAbsolutePath(), file4.getAbsolutePath(), e});
                throw e;
            }
        }
        return hashMap;
    }
}
