package org.apache.flink.runtime.state.gemini.engine.snapshot;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileIDImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.metrics.SnapshotCompactionMetrics;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompactionImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/snapshot/SnapshotManagerImpl.class */
public class SnapshotManagerImpl implements SnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotManagerImpl.class);
    public static final String SNAPSHOT_DIR = "snapshot";
    public static final String SNAPSHOT_FILE_PREFIX = "snapshot";
    public static final String SNAPSHOT_FILE_SEPERATOR = "-";
    private final boolean localSnapshotEnabled;
    private final FileManager localFileManager;
    private final FileManager dfsFileManager;
    private boolean needToBreakLineage;
    private final WriteBufferManager writeBufferManager;
    private final ExecutorService snapshotExecutor;
    private final GContext gContext;
    private final FileCleaner fileCleaner;

    @Nullable
    private SnapshotCompactionMetrics snapshotCompactionMetrics;
    private final Object lock = new Object();
    private final SortedMap<Long, SnapshotManager.CompletedSnapshot> completedSnapshots = new TreeMap();
    private final SortedMap<Long, SnapshotManager.PendingSnapshot> runningSnapshots = new TreeMap();
    private final SortedSet<Long> runningSnapshotAccessNumber = new TreeSet();
    private volatile long minRunningSnapshotAccessNumber = Long.MAX_VALUE;
    private final SnapshotCompactionStat snapshotCompactionStat = new SnapshotCompactionStat();

    public SnapshotManagerImpl(GContext gContext, WriteBufferManager writeBufferManager, FileManager fileManager, FileManager fileManager2) {
        this.gContext = gContext;
        this.writeBufferManager = writeBufferManager;
        this.localFileManager = fileManager;
        this.dfsFileManager = fileManager2;
        this.localSnapshotEnabled = gContext.getGConfiguration().isLocalSnapshotEnabled();
        this.snapshotExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(32767), new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "geminiMainSnapshot-%d").build());
        this.fileCleaner = gContext.getSupervisor().getFileCleaner();
        MetricGroup dBMetricGroup = gContext.getDBMetricGroup();
        if (dBMetricGroup != null) {
            this.snapshotCompactionMetrics = new SnapshotCompactionMetrics(dBMetricGroup.addGroup("snapshot_compaction"), gContext.getGConfiguration().getMetricSampleCount(), gContext.getGConfiguration().getMetricHistogramWindowSize());
            this.snapshotCompactionMetrics.register(this.snapshotCompactionStat);
        }
        LOG.info("SnapshotManager is created.");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public String getNameSpace() {
        return this.dfsFileManager.getBasePath().toUri().toString();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public boolean isNeedToBreakLineage() {
        return this.needToBreakLineage;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void setNeedToBreakLineage(boolean z) {
        this.needToBreakLineage = z;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) {
        synchronized (this.lock) {
            this.gContext.checkDBStatus();
            long checkpointId = backendSnapshotMeta.getCheckpointId();
            Preconditions.checkArgument(!this.runningSnapshots.containsKey(Long.valueOf(checkpointId)), checkpointId + " is already running.");
            this.gContext.increaseCurVersion();
            long incrementAndGetAccessNumber = this.gContext.incrementAndGetAccessNumber();
            this.runningSnapshotAccessNumber.add(Long.valueOf(incrementAndGetAccessNumber));
            this.minRunningSnapshotAccessNumber = this.runningSnapshotAccessNumber.first().longValue();
            long currentTimeMillis = System.currentTimeMillis();
            LOG.info("GeminiDB start checkpoint {}, start time {}, access number {}.", new Object[]{Long.valueOf(checkpointId), Long.valueOf(currentTimeMillis), Long.valueOf(incrementAndGetAccessNumber)});
            try {
                SnapshotOperation localAndDFSSnapshotOperation = this.localSnapshotEnabled ? new LocalAndDFSSnapshotOperation(this.gContext, this, this.dfsFileManager, this.localFileManager) : new DFSSnapshotOperation(this.gContext, this, this.dfsFileManager);
                localAndDFSSnapshotOperation.setForceFlushPage(this.needToBreakLineage);
                SnapshotManager.PendingSnapshot createPendingSnapshot = localAndDFSSnapshotOperation.createPendingSnapshot(backendSnapshotMeta, incrementAndGetAccessNumber);
                this.runningSnapshots.put(Long.valueOf(checkpointId), createPendingSnapshot);
                SnapshotCompletableFuture resultFuture = createPendingSnapshot.getResultFuture();
                resultFuture.incRunningTask();
                this.writeBufferManager.doSnapshot(localAndDFSSnapshotOperation);
                resultFuture.decRunningTask();
                createPendingSnapshot.getSnapshotStat().setSyncStartTime(currentTimeMillis);
                createPendingSnapshot.getSnapshotStat().setAsyncStartTime(System.currentTimeMillis());
            } catch (Throwable th) {
                this.runningSnapshotAccessNumber.remove(Long.valueOf(incrementAndGetAccessNumber));
                this.minRunningSnapshotAccessNumber = !this.runningSnapshotAccessNumber.isEmpty() ? this.runningSnapshotAccessNumber.first().longValue() : Long.MAX_VALUE;
                throw th;
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void endSnapshot(long j, Throwable th) {
        synchronized (this.lock) {
            SnapshotManager.PendingSnapshot remove = this.runningSnapshots.remove(Long.valueOf(j));
            if (remove != null) {
                remove.getSnapshotStat().setCompleteTime(System.currentTimeMillis());
                HashSet hashSet = new HashSet();
                if (th != null || remove.isCanceled()) {
                    if (!remove.isCanceled()) {
                        LOG.warn("GeminiDB fail to complete checkpoint {}, exception {}", Long.valueOf(j), th);
                    }
                    discardCheckpointMetaFile(remove.getSnapshotMetaPath().toUri().toString());
                } else {
                    checkFileAmplification(remove);
                    Iterator<Integer> it = remove.getFileMapping().keySet().iterator();
                    while (it.hasNext()) {
                        int intValue = it.next().intValue();
                        this.dfsFileManager.incSnapshotReference(new FileIDImpl(intValue));
                        hashSet.add(Integer.valueOf(intValue));
                    }
                    this.completedSnapshots.put(Long.valueOf(j), new SnapshotManager.CompletedSnapshot(j, remove.getSnapshotMetaPath().toUri().toString(), hashSet));
                    updateSnapshotCompactionStat(remove.getSnapshotStat());
                    LOG.info("GeminiDB finished checkpoint {}, SnapshotStat {}", Long.valueOf(j), remove.getSnapshotStat());
                }
                remove.releaseResource();
                this.runningSnapshotAccessNumber.remove(Long.valueOf(remove.getAccessNumber()));
                this.minRunningSnapshotAccessNumber = !this.runningSnapshotAccessNumber.isEmpty() ? this.runningSnapshotAccessNumber.first().longValue() : Long.MAX_VALUE;
            } else {
                LOG.warn("checkpoint {} is not running, and can't be ended.", Long.valueOf(j));
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public long getMinRunningSnapshotAccessNumber() {
        return this.minRunningSnapshotAccessNumber;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public SnapshotManager.PendingSnapshot getPendingSnapshot(long j) {
        SnapshotManager.PendingSnapshot pendingSnapshot = this.runningSnapshots.get(Long.valueOf(j));
        if (pendingSnapshot == null) {
            throw new GeminiRuntimeException("there is no pending snapshot " + j);
        }
        return pendingSnapshot;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public ExecutorService getSnapshotExecutor() {
        return this.snapshotExecutor;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void notifySnapshotComplete(long j) {
        if (this.needToBreakLineage) {
            this.needToBreakLineage = false;
            LOG.info("As checkpoint {} completed, we would no longer need to flush all pages out.", Long.valueOf(j));
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void notifySnapshotAbort(long j) {
        SnapshotManager.CompletedSnapshot completedSnapshot = null;
        boolean z = false;
        synchronized (this.lock) {
            SnapshotManager.PendingSnapshot pendingSnapshot = this.runningSnapshots.get(Long.valueOf(j));
            if (pendingSnapshot != null) {
                pendingSnapshot.resultFuture.setEndSnapshot();
                pendingSnapshot.setCanceled(true);
                z = true;
            }
            if (!z) {
                completedSnapshot = this.completedSnapshots.remove(Long.valueOf(j));
            }
        }
        if (completedSnapshot != null) {
            discardCompletedSnapshot(completedSnapshot);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public void notifySnapshotSubsume(long j) {
        HashSet hashSet = new HashSet();
        synchronized (this.lock) {
            Iterator<Map.Entry<Long, SnapshotManager.CompletedSnapshot>> it = this.completedSnapshots.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, SnapshotManager.CompletedSnapshot> next = it.next();
                if (next.getKey().longValue() > j) {
                    break;
                }
                it.remove();
                hashSet.add(next.getValue());
            }
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            discardCompletedSnapshot((SnapshotManager.CompletedSnapshot) it2.next());
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager
    public Map<Long, SnapshotManager.RestoredSnapshot> restore(long j, Map<Integer, String> map, String str) {
        Map<Long, SnapshotManager.RestoredSnapshot> emptyMap;
        if (this.needToBreakLineage) {
            emptyMap = Collections.emptyMap();
            LOG.info("no snapshot is restored because lineage needs to be broken");
        } else {
            emptyMap = loadSnapshots(str, Collections.singleton(Long.valueOf(j)));
            emptyMap.put(Long.valueOf(j), new SnapshotManager.RestoredSnapshot(j, getDFSSnapshotMetaPath(new Path(str), j).toUri().toString(), map));
            restoreSnapshots(emptyMap);
            LOG.info("restore snapshot manager successfully with {} snapshots: {} from {}.", new Object[]{Integer.valueOf(emptyMap.size()), emptyMap.keySet(), str});
        }
        return emptyMap;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this.lock) {
            this.snapshotExecutor.shutdownNow();
            LOG.info("SnapshotManager is closed");
            this.runningSnapshotAccessNumber.clear();
            this.runningSnapshots.clear();
            this.completedSnapshots.clear();
        }
    }

    @VisibleForTesting
    public Map<Long, SnapshotManager.CompletedSnapshot> getCompletedSnapshots() {
        return Collections.unmodifiableMap(this.completedSnapshots);
    }

    @VisibleForTesting
    Map<Long, SnapshotManager.PendingSnapshot> getRunningSnapshots() {
        return Collections.unmodifiableMap(this.runningSnapshots);
    }

    private void restoreSnapshots(Map<Long, SnapshotManager.RestoredSnapshot> map) {
        synchronized (this.lock) {
            for (Map.Entry<Long, SnapshotManager.RestoredSnapshot> entry : map.entrySet()) {
                long longValue = entry.getKey().longValue();
                SnapshotManager.RestoredSnapshot value = entry.getValue();
                this.completedSnapshots.put(Long.valueOf(longValue), new SnapshotManager.CompletedSnapshot(longValue, value.getMetaFilePath(), value.getFileMapping().keySet()));
            }
        }
    }

    private Map<Long, SnapshotManager.RestoredSnapshot> loadSnapshots(String str, Set<Long> set) {
        HashMap hashMap = new HashMap();
        Path path = new Path(str, "snapshot");
        try {
            FileStatus[] listStatus = FileSystem.get(path.toUri()).listStatus(path);
            if (listStatus == null) {
                return hashMap;
            }
            for (FileStatus fileStatus : listStatus) {
                Path path2 = fileStatus.getPath();
                try {
                    long snapshotID = getSnapshotID(path2.getName());
                    if (set.contains(Long.valueOf(snapshotID))) {
                        LOG.info("skip to load snapshot {}", Long.valueOf(snapshotID));
                    } else {
                        try {
                            SnapshotMetaFile.Reader reader = SnapshotMetaFile.getReader(path2);
                            Throwable th = null;
                            try {
                                try {
                                    reader.seek(fileStatus.getLen() - 16);
                                    reader.seek(reader.readLong());
                                    Preconditions.checkState(reader.readBoolean(), "file mapping should always exist.");
                                    int readInt = reader.readInt();
                                    reader.readUTF();
                                    HashMap hashMap2 = new HashMap();
                                    for (int i = 0; i < readInt; i++) {
                                        String readUTF = reader.readUTF();
                                        Integer valueOf = Integer.valueOf(reader.readInt());
                                        reader.readLong();
                                        hashMap2.put(valueOf, readUTF);
                                    }
                                    hashMap.put(Long.valueOf(snapshotID), new SnapshotManager.RestoredSnapshot(snapshotID, path2.toUri().toString(), hashMap2));
                                    LOG.info("successfully load snapshot {} with {} files", Long.valueOf(snapshotID), Integer.valueOf(hashMap2.size()));
                                    if (reader != null) {
                                        if (0 != 0) {
                                            try {
                                                reader.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            reader.close();
                                        }
                                    }
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                    break;
                                }
                            } finally {
                            }
                        } catch (Exception e) {
                            LOG.error("failed to load snapshot {}, {}", Long.valueOf(snapshotID), e);
                        }
                    }
                } catch (Exception e2) {
                    LOG.error("failed to get snapshot ID.", e2);
                }
            }
            return hashMap;
        } catch (Exception e3) {
            LOG.error("failed to list dir status for {} when loading snapshots, {}", path, e3);
            return hashMap;
        }
    }

    public Path getDFSSnapshotMetaPath(Path path, long j) {
        return new Path(path, new Path("snapshot", "snapshot-" + j));
    }

    public Path getLocalSnapshotMetaPath(Path path, long j) {
        return new Path(path, "snapshot-" + j);
    }

    public MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> getFileMappingSerializer() {
        return new MapSerializer<>(IntSerializer.INSTANCE, new MapSerializer(IntSerializer.INSTANCE, new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, LongSerializer.INSTANCE})));
    }

    private long getSnapshotID(String str) {
        String[] split = str.split(SNAPSHOT_FILE_SEPERATOR);
        if (split.length == 2 && "snapshot".equals(split[0])) {
            try {
                long longValue = Long.valueOf(split[1]).longValue();
                if (longValue > 0) {
                    return longValue;
                }
            } catch (Exception e) {
            }
        }
        throw new IllegalArgumentException("invalid snapshot meta file name " + str);
    }

    private void discardCompletedSnapshot(SnapshotManager.CompletedSnapshot completedSnapshot) {
        Iterator<Integer> it = completedSnapshot.getDataFileIDs().iterator();
        while (it.hasNext()) {
            this.dfsFileManager.decSnapshotReference(new FileIDImpl(it.next().intValue()));
        }
        discardCheckpointMetaFile(completedSnapshot.getMetaFilePath());
        LOG.info("Discard snapshot {} when this snapshot is notified as useless.", Long.valueOf(completedSnapshot.getCheckpointID()));
    }

    private void discardCheckpointMetaFile(String str) {
        try {
            this.fileCleaner.registerFilesToClean(Collections.singleton(str));
        } catch (Exception e) {
            LOG.error("Failed to delete snapshot meta file " + str, e);
        }
    }

    private void updateSnapshotCompactionStat(SnapshotStat snapshotStat) {
        this.snapshotCompactionStat.setAmplificationRatioBeforeCompaction(snapshotStat.getAmplificationRatioBeforeCompaction());
        if (snapshotStat.isNeedCompaction()) {
            this.snapshotCompactionStat.addAndGetNumberCompaction(1);
        }
        this.snapshotCompactionStat.setCompactionSize(snapshotStat.addAndGetIncrementalSize(0L) - snapshotStat.getIncrementalSizeBeforeCompaction());
        this.snapshotCompactionStat.setActualAmplificationRatio(snapshotStat.getActualAmplificationRatio());
        this.snapshotCompactionStat.setCompactionDuration(snapshotStat.getCompactionEndTime() - snapshotStat.getCompactionStartTime());
    }

    private void checkFileAmplification(SnapshotManager.PendingSnapshot pendingSnapshot) {
        Map<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMapping = pendingSnapshot.getFileMapping();
        ArrayList arrayList = new ArrayList();
        long j = 0;
        long j2 = 0;
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> entry : fileMapping.entrySet()) {
            int intValue = entry.getKey().intValue();
            int intValue2 = ((Integer) entry.getValue().values().stream().map(tuple2 -> {
                return (Integer) tuple2.f0;
            }).reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })).intValue();
            long longValue = ((Long) entry.getValue().values().stream().map(tuple22 -> {
                return (Long) tuple22.f1;
            }).reduce(0L, (v0, v1) -> {
                return Long.sum(v0, v1);
            })).longValue();
            FileMeta fileMeta = this.dfsFileManager.getFileMeta(intValue);
            long fileSize = fileMeta.getFileSize();
            if (fileMeta.addAndGetSnapshotReference(0) != 0) {
                arrayList2.add(Integer.valueOf(intValue));
            }
            float f = ((float) fileSize) / ((float) longValue);
            j += fileSize;
            j2 += longValue;
            if (LOG.isDebugEnabled()) {
                arrayList.add(new SnapshotCompactionImpl.SnapshotFileInfo(intValue, fileSize, longValue, intValue2, f));
            }
        }
        float f2 = ((float) j) / ((float) j2);
        if (LOG.isDebugEnabled()) {
            arrayList.sort((snapshotFileInfo, snapshotFileInfo2) -> {
                return Float.compare(snapshotFileInfo2.ratio, snapshotFileInfo.ratio);
            });
            LOG.debug("completed snapshot {} statistics: number of total files {}, number of shared files {}, total file size {}, total snapshot data size {}, amplification ratio {}", new Object[]{Long.valueOf(pendingSnapshot.getCheckpointId()), Integer.valueOf(fileMapping.size()), Integer.valueOf(arrayList2.size()), Long.valueOf(j), Long.valueOf(j2), Float.valueOf(f2)});
            LOG.debug("completed snapshot {} file details: {}", Long.valueOf(pendingSnapshot.getCheckpointId()), arrayList);
            LOG.debug("completed snapshot {} shared files: {}", Long.valueOf(pendingSnapshot.getCheckpointId()), arrayList2);
        }
        pendingSnapshot.getSnapshotStat().setActualAmplificationRatio(f2);
        this.snapshotCompactionStat.setActualAmplificationRatio(f2);
    }
}
