/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.snapshot;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileIDImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DFSSnapshotOperation;
import org.apache.flink.runtime.state.gemini.engine.snapshot.LocalAndDFSSnapshotOperation;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompletableFuture;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotManagerImpl
implements SnapshotManager {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotManagerImpl.class);
    public static final String SNAPSHOT_DIR = "snapshot";
    public static final String SNAPSHOT_FILE_PREFIX = "snapshot";
    public static final String SNAPSHOT_FILE_SEPERATOR = "-";
    private final boolean localSnapshotEnabled;
    private final FileManager localFileManager;
    private final FileManager dfsFileManager;
    private boolean needToBreakLineage;
    private final WriteBufferManager writeBufferManager;
    private final SortedMap<Long, SnapshotManager.CompletedSnapshot> completedSnapshots;
    private final SortedMap<Long, SnapshotManager.PendingSnapshot> runningSnapshots;
    private final SortedSet<Long> runningSnapshotAccessNumber;
    private volatile long minRunningSnapshotAccessNumber;
    private final ExecutorService snapshotExecutor;
    private final GContext gContext;
    private final FileCleaner fileCleaner;
    private final Object lock = new Object();

    public SnapshotManagerImpl(GContext gContext, WriteBufferManager writeBufferManager, FileManager localFileManager, FileManager dfsFileManager) {
        this.gContext = gContext;
        this.writeBufferManager = writeBufferManager;
        this.localFileManager = localFileManager;
        this.dfsFileManager = dfsFileManager;
        this.completedSnapshots = new TreeMap<Long, SnapshotManager.CompletedSnapshot>();
        this.runningSnapshots = new TreeMap<Long, SnapshotManager.PendingSnapshot>();
        this.runningSnapshotAccessNumber = new TreeSet<Long>();
        this.minRunningSnapshotAccessNumber = Long.MAX_VALUE;
        this.localSnapshotEnabled = gContext.getGConfiguration().isLocalSnapshotEnabled();
        String prefix = gContext.getGConfiguration().getExcetorPrefixName();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiMainSnapshot-%d").build();
        this.snapshotExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE), namedThreadFactory);
        this.fileCleaner = gContext.getSupervisor().getFileCleaner();
        LOG.info("SnapshotManager is created.");
    }

    @Override
    public boolean isNeedToBreakLineage() {
        return this.needToBreakLineage;
    }

    @Override
    public void setNeedToBreakLineage(boolean needToBreakLineage) {
        this.needToBreakLineage = needToBreakLineage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) {
        Object object = this.lock;
        synchronized (object) {
            this.gContext.checkDBStatus();
            long checkpointId = backendSnapshotMeta.getCheckpointId();
            Preconditions.checkArgument((!this.runningSnapshots.containsKey(checkpointId) ? 1 : 0) != 0, (Object)(checkpointId + " is already running."));
            this.gContext.increaseCurVersion();
            long accessNumber = this.gContext.incrementAndGetAccessNumber();
            this.runningSnapshotAccessNumber.add(accessNumber);
            this.minRunningSnapshotAccessNumber = this.runningSnapshotAccessNumber.first();
            long startTime = System.currentTimeMillis();
            LOG.info("GeminiDB start checkpoint {}, start time {}, access number {}.", new Object[]{checkpointId, startTime, accessNumber});
            try {
                SnapshotOperation snapshotOperation = this.localSnapshotEnabled ? new LocalAndDFSSnapshotOperation(this.gContext, this, this.dfsFileManager, this.localFileManager) : new DFSSnapshotOperation(this.gContext, this, this.dfsFileManager);
                SnapshotManager.PendingSnapshot pendingSnapshot = snapshotOperation.createPendingSnapshot(backendSnapshotMeta, accessNumber);
                this.runningSnapshots.put(checkpointId, pendingSnapshot);
                SnapshotCompletableFuture snapshotCompletableFuture = pendingSnapshot.getResultFuture();
                snapshotCompletableFuture.incRunningTask();
                this.writeBufferManager.doSnapshot(snapshotOperation);
                snapshotCompletableFuture.decRunningTask();
                pendingSnapshot.getSnapshotStat().setSyncStartTime(startTime);
                pendingSnapshot.getSnapshotStat().setAsyncStartTime(System.currentTimeMillis());
            }
            catch (Throwable e) {
                this.runningSnapshotAccessNumber.remove(accessNumber);
                this.minRunningSnapshotAccessNumber = !this.runningSnapshotAccessNumber.isEmpty() ? this.runningSnapshotAccessNumber.first() : Long.MAX_VALUE;
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void endSnapshot(long checkpointId, Throwable throwable) {
        Object object = this.lock;
        synchronized (object) {
            SnapshotManager.PendingSnapshot pendingSnapshot = (SnapshotManager.PendingSnapshot)this.runningSnapshots.remove(checkpointId);
            if (pendingSnapshot != null) {
                pendingSnapshot.getSnapshotStat().setCompleteTime(System.currentTimeMillis());
                HashSet<Integer> dataFileIDs = new HashSet<Integer>();
                if (throwable == null && !pendingSnapshot.isCanceled()) {
                    for (int id : pendingSnapshot.getFileMapping().keySet()) {
                        this.dfsFileManager.incSnapshotReference(new FileIDImpl(id));
                        dataFileIDs.add(id);
                    }
                    SnapshotManager.CompletedSnapshot completedSnapshot = new SnapshotManager.CompletedSnapshot(checkpointId, pendingSnapshot.getSnapshotMetaPath().toUri().toString(), dataFileIDs);
                    this.completedSnapshots.put(checkpointId, completedSnapshot);
                    LOG.info("GeminiDB finished checkpoint {}, SnapshotStat {}", (Object)checkpointId, (Object)pendingSnapshot.getSnapshotStat());
                } else {
                    if (!pendingSnapshot.isCanceled()) {
                        LOG.warn("GeminiDB fail to complete checkpoint {}, exception {}", (Object)checkpointId, (Object)throwable);
                    }
                    this.discardCheckpointMetaFile(pendingSnapshot.getSnapshotMetaPath().toUri().toString());
                }
                pendingSnapshot.releaseResource();
                this.runningSnapshotAccessNumber.remove(pendingSnapshot.getAccessNumber());
                this.minRunningSnapshotAccessNumber = !this.runningSnapshotAccessNumber.isEmpty() ? this.runningSnapshotAccessNumber.first() : Long.MAX_VALUE;
            } else {
                LOG.warn("checkpoint {} is not running, and can't be ended.", (Object)checkpointId);
            }
        }
    }

    @Override
    public long getMinRunningSnapshotAccessNumber() {
        return this.minRunningSnapshotAccessNumber;
    }

    @Override
    public SnapshotManager.PendingSnapshot getPendingSnapshot(long checkpointId) {
        SnapshotManager.PendingSnapshot pendingSnapshot = (SnapshotManager.PendingSnapshot)this.runningSnapshots.get(checkpointId);
        if (pendingSnapshot == null) {
            throw new GeminiRuntimeException("there is no pending snapshot " + checkpointId);
        }
        return pendingSnapshot;
    }

    @Override
    public ExecutorService getSnapshotExecutor() {
        return this.snapshotExecutor;
    }

    @Override
    public void notifySnapshotComplete(long snapshotId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifySnapshotAbort(long snapshotId) {
        SnapshotManager.CompletedSnapshot snapshotToAbort = null;
        boolean runningSnapshotCanceled = false;
        Object object = this.lock;
        synchronized (object) {
            SnapshotManager.PendingSnapshot pendingSnapshot = (SnapshotManager.PendingSnapshot)this.runningSnapshots.get(snapshotId);
            if (pendingSnapshot != null) {
                pendingSnapshot.resultFuture.setEndSnapshot();
                pendingSnapshot.setCanceled(true);
                runningSnapshotCanceled = true;
            }
            if (!runningSnapshotCanceled) {
                snapshotToAbort = (SnapshotManager.CompletedSnapshot)this.completedSnapshots.remove(snapshotId);
            }
        }
        if (snapshotToAbort != null) {
            this.discardCompletedSnapshot(snapshotToAbort);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifySnapshotSubsume(long snapshotId) {
        HashSet<SnapshotManager.CompletedSnapshot> snapshotsToAbort = new HashSet<SnapshotManager.CompletedSnapshot>();
        Iterator iterator = this.lock;
        synchronized (iterator) {
            Map.Entry<Long, SnapshotManager.CompletedSnapshot> entry;
            Iterator<Map.Entry<Long, SnapshotManager.CompletedSnapshot>> iterator2 = this.completedSnapshots.entrySet().iterator();
            while (iterator2.hasNext() && (entry = iterator2.next()).getKey() <= snapshotId) {
                iterator2.remove();
                snapshotsToAbort.add(entry.getValue());
            }
        }
        for (SnapshotManager.CompletedSnapshot completedSnapshot : snapshotsToAbort) {
            this.discardCompletedSnapshot(completedSnapshot);
        }
    }

    @Override
    public Map<Long, SnapshotManager.RestoredSnapshot> restore(long snapshotId, Map<Integer, String> fileMapping, String restoredBasePath) {
        Map<Long, SnapshotManager.RestoredSnapshot> snapshots;
        if (!this.needToBreakLineage) {
            snapshots = this.loadSnapshots(restoredBasePath, Collections.singleton(snapshotId));
            SnapshotManager.RestoredSnapshot restoredSnapshot = new SnapshotManager.RestoredSnapshot(snapshotId, this.getDFSSnapshotMetaPath(new Path(restoredBasePath), snapshotId).toUri().toString(), fileMapping);
            snapshots.put(snapshotId, restoredSnapshot);
            this.restoreSnapshots(snapshots);
            LOG.info("restore snapshot manager successfully with {} snapshots: {}", (Object)snapshots.size(), snapshots.keySet());
        } else {
            snapshots = Collections.emptyMap();
            LOG.info("no snapshot is restored because lineage needs to be broken");
        }
        return snapshots;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            this.snapshotExecutor.shutdownNow();
            LOG.info("SnapshotManager is closed");
            this.runningSnapshotAccessNumber.clear();
            this.runningSnapshots.clear();
            this.completedSnapshots.clear();
        }
    }

    @VisibleForTesting
    Map<Long, SnapshotManager.CompletedSnapshot> getCompletedSnapshots() {
        return Collections.unmodifiableMap(this.completedSnapshots);
    }

    @VisibleForTesting
    Map<Long, SnapshotManager.PendingSnapshot> getRunningSnapshots() {
        return Collections.unmodifiableMap(this.runningSnapshots);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreSnapshots(Map<Long, SnapshotManager.RestoredSnapshot> snapshots) {
        Object object = this.lock;
        synchronized (object) {
            for (Map.Entry<Long, SnapshotManager.RestoredSnapshot> entry : snapshots.entrySet()) {
                long checkpointId = entry.getKey();
                SnapshotManager.RestoredSnapshot restoredSnapshot = entry.getValue();
                this.completedSnapshots.put(checkpointId, new SnapshotManager.CompletedSnapshot(checkpointId, restoredSnapshot.getMetaFilePath(), restoredSnapshot.getFileMapping().keySet()));
            }
        }
    }

    private Map<Long, SnapshotManager.RestoredSnapshot> loadSnapshots(String restoredDBPath, Set<Long> excludeSnapshots) {
        FileStatus[] fileStatusArray;
        HashMap<Long, SnapshotManager.RestoredSnapshot> snapshots = new HashMap<Long, SnapshotManager.RestoredSnapshot>();
        Path metaDirPath = new Path(restoredDBPath, "snapshot");
        try {
            fileStatusArray = FileSystem.get((URI)metaDirPath.toUri()).listStatus(metaDirPath);
        }
        catch (Exception e) {
            LOG.error("failed to list dir status for {} when loading snapshots, {}", (Object)metaDirPath, (Object)e);
            return snapshots;
        }
        if (fileStatusArray == null) {
            return snapshots;
        }
        for (FileStatus fileStatus : fileStatusArray) {
            long snapshotId;
            Path path = fileStatus.getPath();
            String fileName = path.getName();
            try {
                snapshotId = this.getSnapshotID(fileName);
            }
            catch (Exception e) {
                LOG.error("failed to get snapshot ID caused by {}", (Throwable)e);
                continue;
            }
            if (excludeSnapshots.contains(snapshotId)) {
                LOG.info("skip to load snapshot {}", (Object)snapshotId);
                continue;
            }
            try (SnapshotMetaFile.Reader reader = SnapshotMetaFile.getReader(path);){
                long fileSize = fileStatus.getLen();
                reader.seek(fileSize - 16L);
                long fileMappingOffset = reader.readLong();
                reader.seek(fileMappingOffset);
                boolean hasFileMapping = reader.readBoolean();
                Preconditions.checkState((boolean)hasFileMapping, (Object)"file mapping should always exist.");
                int fileMappingSize = reader.readInt();
                reader.readUTF();
                HashMap<Integer, String> fileIDToPath = new HashMap<Integer, String>();
                for (int i = 0; i < fileMappingSize; ++i) {
                    String filePath = reader.readUTF();
                    Integer id = reader.readInt();
                    fileIDToPath.put(id, filePath);
                }
                snapshots.put(snapshotId, new SnapshotManager.RestoredSnapshot(snapshotId, path.toUri().toString(), fileIDToPath));
                LOG.info("successfully load snapshot {} with {} files", (Object)snapshotId, (Object)fileIDToPath.size());
            }
            catch (Exception e) {
                LOG.error("failed to load snapshot {}, {}", (Object)snapshotId, (Object)e);
            }
        }
        return snapshots;
    }

    public Path getDFSSnapshotMetaPath(Path basePath, long checkpointId) {
        String name = "snapshot-" + checkpointId;
        return new Path(basePath, new Path("snapshot", name));
    }

    public Path getLocalSnapshotMetaPath(Path basePath, long checkpointId) {
        String name = "snapshot-" + checkpointId;
        return new Path(basePath, name);
    }

    public MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> getFileMappingSerializer() {
        TupleSerializer tuple2Serializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, LongSerializer.INSTANCE});
        MapSerializer groupMapSerializer = new MapSerializer((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)tuple2Serializer);
        return new MapSerializer((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)groupMapSerializer);
    }

    private long getSnapshotID(String snapshotMetaName) {
        String[] splits = snapshotMetaName.split(SNAPSHOT_FILE_SEPERATOR);
        if (splits.length == 2 && "snapshot".equals(splits[0])) {
            try {
                long snapshotID = Long.valueOf(splits[1]);
                if (snapshotID > 0L) {
                    return snapshotID;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        throw new IllegalArgumentException("invalid snapshot meta file name " + snapshotMetaName);
    }

    private void discardCompletedSnapshot(SnapshotManager.CompletedSnapshot completedSnapshot) {
        for (Integer fileId : completedSnapshot.getDataFileIDs()) {
            this.dfsFileManager.decSnapshotReference(new FileIDImpl(fileId));
        }
        this.discardCheckpointMetaFile(completedSnapshot.getMetaFilePath());
        LOG.info("Discard snapshot {} when this snapshot is notified as useless.", (Object)completedSnapshot.getCheckpointID());
    }

    private void discardCheckpointMetaFile(String metaFilePath) {
        try {
            this.fileCleaner.registerFilesToClean(Collections.singleton(metaFilePath));
        }
        catch (Exception e) {
            LOG.error("Failed to delete snapshot meta file " + metaFilePath, (Throwable)e);
        }
    }
}

