package org.apache.flink.runtime.state.gemini.engine.snapshot;

import java.io.File;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/snapshot/LocalAndDFSSnapshotOperation.class */
public class LocalAndDFSSnapshotOperation extends SnapshotOperation {
    private static final Logger LOG = LoggerFactory.getLogger(LocalAndDFSSnapshotOperation.class);
    private final FileManager dfsFileManager;
    private final FileManager localFileManager;
    private SnapshotManager.PendingSnapshot pendingSnapshot;

    public LocalAndDFSSnapshotOperation(GContext gContext, SnapshotManager snapshotManager, FileManager fileManager, FileManager fileManager2) {
        super(gContext, snapshotManager);
        this.dfsFileManager = (FileManager) Preconditions.checkNotNull(fileManager);
        this.localFileManager = (FileManager) Preconditions.checkNotNull(fileManager2);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation
    public SnapshotManager.PendingSnapshot createPendingSnapshot(BackendSnapshotMeta backendSnapshotMeta, long j) {
        long checkpointId = backendSnapshotMeta.getCheckpointId();
        SnapshotCompletableFuture snapshotCompletableFuture = new SnapshotCompletableFuture(this.gContext, this.gContext.getSupervisor().getSnapshotExecutorGroup());
        Path basePath = this.dfsFileManager.getBasePath();
        Path dFSSnapshotMetaPath = ((SnapshotManagerImpl) this.snapshotManager).getDFSSnapshotMetaPath(basePath, checkpointId);
        Path directory = backendSnapshotMeta.getLocalSnapshotDir().getDirectory();
        SnapshotManager.PendingSnapshot pendingSnapshot = new SnapshotManager.PendingSnapshot(checkpointId, backendSnapshotMeta.getTimestamp(), snapshotCompletableFuture, basePath, dFSSnapshotMetaPath, directory, ((SnapshotManagerImpl) this.snapshotManager).getLocalSnapshotMetaPath(directory, checkpointId), j, this);
        snapshotCompletableFuture.setPendingSnapshot(pendingSnapshot);
        this.pendingSnapshot = pendingSnapshot;
        return pendingSnapshot;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation
    public SnapshotManager.PendingSnapshot getPendingSnapshot() {
        return this.pendingSnapshot;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation
    public DBSnapshotResult getSnapshotResult() throws Exception {
        SnapshotMetaFile.Writer writer = null;
        SnapshotMetaFile.Writer writer2 = null;
        try {
            writer = SnapshotMetaFile.getWriter(this.pendingSnapshot.getLocalSnapshotMetaPath());
            RegionSnapshot regionSnapshot = new RegionSnapshot(this.localFileManager, this.dfsFileManager, writer);
            writer2 = SnapshotMetaFile.getWriter(this.pendingSnapshot.getSnapshotMetaPath());
            RegionSnapshot regionSnapshot2 = new RegionSnapshot(null, this.dfsFileManager, writer2);
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            writePageIndex(this.pendingSnapshot.getGRegionSnapshotMeta(), writer, writer2, regionSnapshot, regionSnapshot2, hashMap2, hashMap);
            this.pendingSnapshot.setLocalFileMapping(hashMap2);
            this.pendingSnapshot.setFileMapping(hashMap);
            writeLocalAndDfsFileMapping(this.pendingSnapshot, writer, this.localFileManager, this.dfsFileManager);
            writeLocalAndDfsFileMapping(this.pendingSnapshot, writer2, null, this.dfsFileManager);
            long pos = writer.getPos();
            long pos2 = writer2.getPos();
            SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
            snapshotStat.addAndGetTotalLoalFiles(hashMap2.size());
            snapshotStat.setLocalMetaFileSize(pos);
            snapshotStat.addAndGetTotalFiles(hashMap.size());
            snapshotStat.setMetaFileSize(pos2);
            createHardLinkForLocalFiles(this.pendingSnapshot.getLocalSnapshotBasePath(), hashMap2.keySet());
            DBSnapshotMeta dBSnapshotMeta = new DBSnapshotMeta(this.pendingSnapshot.getCheckpointId(), this.gContext.getStartRegionId(), this.gContext.getEndRegionId(), pos, snapshotStat.addAndGetTotalLocalSize(0L), snapshotStat.addAndGetLocalIncrementalSize(0L), writer.getFilePath().toUri().toString());
            DBSnapshotMeta dBSnapshotMeta2 = new DBSnapshotMeta(this.pendingSnapshot.getCheckpointId(), this.gContext.getStartRegionId(), this.gContext.getEndRegionId(), pos2, snapshotStat.addAndGetTotalSize(0L), snapshotStat.addAndGetIncrementalSize(0L), writer2.getFilePath().toUri().toString());
            writer.close();
            writer2.close();
            return new DBSnapshotResult(SnapshotDirectory.permanent(this.pendingSnapshot.getLocalSnapshotBasePath()), dBSnapshotMeta, SnapshotDirectory.permanent(this.pendingSnapshot.getSnapshotBasePath()), dBSnapshotMeta2);
        } catch (Exception e) {
            LOG.error("failed to snapshot meta for {}, {}", Long.valueOf(this.pendingSnapshot.getCheckpointId()), e.toString());
            closeAndDeleteWriterQuietly(writer);
            closeAndDeleteWriterQuietly(writer2);
            throw e;
        }
    }

    @VisibleForTesting
    void createHardLinkForLocalFiles(Path path, Set<Integer> set) throws Exception {
        FileSystem fileSystem = path.getFileSystem();
        if (!(fileSystem instanceof LocalFileSystem)) {
            LOG.error("expected LocalFilSystem, actual {}", fileSystem);
            throw new GeminiRuntimeException("hard link only works for local file system, actual " + fileSystem);
        }
        File file = new File(path.toUri());
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            File file2 = new File(this.localFileManager.getFilePath(it.next().intValue()));
            try {
                Files.createLink(new File(file, file2.getName()).toPath(), file2.toPath());
            } catch (Exception e) {
                LOG.error("Fail to create hard link from {} to {}, {}", new Object[]{file2.getAbsolutePath(), path.toUri().toString(), e});
                throw e;
            }
        }
    }
}
