package org.apache.flink.runtime.state.gemini.engine.fs;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.gemini.engine.ExceptionStat;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileManagerMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.DfsDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LocalDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManagerImpl;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/fs/FileManagerImpl.class */
public class FileManagerImpl implements FileManager {
    private static final Logger LOG = LoggerFactory.getLogger(FileManagerImpl.class);
    private final GContext gContext;
    private final String fileManagerIdentifier;
    private final Path workingBasePath;
    private final Integer writerFailCountThreshold;
    private final Integer fileManagerFailCountThreshold;
    private final long retryCreateFileWriterInterval;
    private Tuple2<Long, Integer> fileWriterErrorStatus;
    private ExceptionStat exceptionStat;
    private final String invalidMessage;
    private final Map<Integer, FileMeta> fileMapping;
    private final FileIDGenerator fileIDGenerator;
    private final String backendUID;
    private final AtomicLong fileSuffix;
    private final boolean snapshotStorage;
    private final long fileAliveTimeAfterNoDBReference;
    private final SortedSet<FileMeta> waitingDBDeletionFiles;
    private final Set<FileMeta> waitingSnapshotDeletionFiles;
    private final Set<String> markedDeletionFiles;
    private final ScheduledThreadPoolExecutor fileDeletionCheckExecutor;
    private final FileManagerStat fileManagerStat;
    private long lastPrintStatTime;
    private volatile boolean closed;
    private final DataPageUtil dataPageUtil;

    @Nullable
    private FileManagerMetrics fileManagerMetrics;

    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/fs/FileManagerImpl$DBDeletionCheckRunner.class */
    private class DBDeletionCheckRunner implements Runnable {
        private DBDeletionCheckRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis - FileManagerImpl.this.fileAliveTimeAfterNoDBReference;
            while (!FileManagerImpl.this.closed && !FileManagerImpl.this.waitingDBDeletionFiles.isEmpty()) {
                FileMeta fileMeta = (FileMeta) FileManagerImpl.this.waitingDBDeletionFiles.first();
                long currentAccessNumber = FileManagerImpl.this.getCurrentAccessNumber();
                if (fileMeta.getDiscardAccessNumber() >= currentAccessNumber || fileMeta.getDiscardTimeStamp() >= j) {
                    break;
                }
                FileManagerImpl.this.waitingDBDeletionFiles.remove(fileMeta);
                try {
                    FileReader fileReader = fileMeta.getFileReader();
                    if (fileReader != null) {
                        fileReader.close();
                        fileMeta.setFileReader(null);
                    }
                } catch (Exception e) {
                    FileManagerImpl.LOG.error("failed to close file reader when moving from db deletion set to snapshot set, {}, {}", fileMeta, e);
                }
                FileManagerImpl.this.waitingSnapshotDeletionFiles.add(fileMeta);
                FileManagerImpl.LOG.debug("Add file to waitingSnapshotDeletionFiles {}, current time {}, current access number {}", new Object[]{fileMeta, Long.valueOf(currentTimeMillis), Long.valueOf(currentAccessNumber)});
                if (fileMeta.addAndGetSnapshotReference(0) == 0 && FileManagerImpl.this.waitingSnapshotDeletionFiles.remove(fileMeta)) {
                    FileManagerImpl.this.markFileDeletion(fileMeta);
                    FileManagerImpl.LOG.debug("Mark file deletion by DB {}", fileMeta);
                }
            }
            FileManagerImpl.this.printStat();
        }
    }

    @VisibleForTesting
    public FileManagerImpl(GContext gContext, String str, Path path) {
        this(gContext, str, path, false);
    }

    @VisibleForTesting
    public FileManagerImpl(GContext gContext, String str, Path path, boolean z) {
        this(gContext, str, path, z, z ? new DfsDataPageUtil(gContext.getGConfiguration().isChecksumEnable()) : new LocalDataPageUtil(gContext.getSupervisor().getForReadAllocator(), gContext.getGConfiguration().isChecksumEnable()));
    }

    public FileManagerImpl(GContext gContext, String str, Path path, boolean z, DataPageUtil dataPageUtil) {
        this.fileWriterErrorStatus = new Tuple2<>();
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        this.fileManagerIdentifier = str;
        this.workingBasePath = path;
        this.fileMapping = new ConcurrentHashMap();
        this.fileIDGenerator = new FileIDGenerator(gContext.getGConfiguration().getSubTaskIndex(), gContext.getGConfiguration().getNumParallelSubtasks());
        this.backendUID = gContext.getGConfiguration().getBackendUID();
        this.fileSuffix = new AtomicLong(0L);
        this.snapshotStorage = z;
        this.fileAliveTimeAfterNoDBReference = gContext.getGConfiguration().getFileAliveTimeAfterNoDataReference();
        this.waitingDBDeletionFiles = new ConcurrentSkipListSet(new Comparator<FileMeta>() { // from class: org.apache.flink.runtime.state.gemini.engine.fs.FileManagerImpl.1
            @Override // java.util.Comparator
            public int compare(FileMeta fileMeta, FileMeta fileMeta2) {
                long discardAccessNumber = fileMeta.getDiscardAccessNumber() - fileMeta2.getDiscardAccessNumber();
                if (discardAccessNumber != 0) {
                    return discardAccessNumber < 0 ? -1 : 1;
                }
                long discardAccessNumber2 = fileMeta.getDiscardAccessNumber() - fileMeta2.getDiscardAccessNumber();
                return discardAccessNumber2 != 0 ? discardAccessNumber2 < 0 ? -1 : 1 : Integer.compare(fileMeta.getFileId().get(), fileMeta2.getFileId().get());
            }
        });
        this.waitingSnapshotDeletionFiles = ConcurrentHashMap.newKeySet();
        this.markedDeletionFiles = ConcurrentHashMap.newKeySet();
        this.lastPrintStatTime = 0L;
        this.fileManagerStat = new FileManagerStat();
        MetricGroup fileManagerMetricGroup = gContext.getFileManagerMetricGroup();
        if (fileManagerMetricGroup != null) {
            this.fileManagerMetrics = new FileManagerMetrics(fileManagerMetricGroup.addGroup(str), gContext.getGConfiguration().getMetricSampleCount());
            this.fileManagerMetrics.register(this.fileManagerStat);
            FileManagerMetrics fileManagerMetrics = this.fileManagerMetrics;
            Map<Integer, FileMeta> map = this.fileMapping;
            map.getClass();
            fileManagerMetrics.registerUsedFile(map::size);
            FileManagerMetrics fileManagerMetrics2 = this.fileManagerMetrics;
            SortedSet<FileMeta> sortedSet = this.waitingDBDeletionFiles;
            sortedSet.getClass();
            fileManagerMetrics2.registerWaitingDbDeletion(sortedSet::size);
            FileManagerMetrics fileManagerMetrics3 = this.fileManagerMetrics;
            Set<FileMeta> set = this.waitingSnapshotDeletionFiles;
            set.getClass();
            fileManagerMetrics3.registerWaitingSnapshotDeletion(set::size);
            FileManagerMetrics fileManagerMetrics4 = this.fileManagerMetrics;
            Set<String> set2 = this.markedDeletionFiles;
            set2.getClass();
            fileManagerMetrics4.registerMarkDeletionFile(set2::size);
        }
        this.fileDeletionCheckExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "FileManager-" + str + "-%d").build());
        this.fileDeletionCheckExecutor.setRemoveOnCancelPolicy(true);
        this.fileDeletionCheckExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.fileDeletionCheckExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.closed = false;
        this.writerFailCountThreshold = Integer.valueOf(gContext.getGConfiguration().getWriterFailCountThreshold());
        this.fileManagerFailCountThreshold = Integer.valueOf(gContext.getGConfiguration().getFileManagerFailCountThreshold());
        this.retryCreateFileWriterInterval = gContext.getGConfiguration().getFileManagerCreateFileWriterRetryInterval() * 1000;
        this.fileWriterErrorStatus.f0 = -1L;
        this.fileWriterErrorStatus.f1 = 0;
        this.exceptionStat = new ExceptionStat();
        if (gContext.getExceptionMetrics() != null) {
            gContext.getExceptionMetrics().register(this.exceptionStat);
        }
        this.invalidMessage = "Can't create file writer anymore, because of exceed the threshold(" + this.fileManagerFailCountThreshold + ") of continues filewriter error";
        this.dataPageUtil = dataPageUtil;
        LOG.info("FileManager is created for {}", path);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void start() {
        this.fileDeletionCheckExecutor.scheduleWithFixedDelay(new DBDeletionCheckRunner(), 1500L, this.gContext.getGConfiguration().getFileDeletionCheckInterval(), TimeUnit.MILLISECONDS);
        LOG.info("FileManager is started for {}", this.workingBasePath);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public String getFileManagerIdentifier() {
        return this.fileManagerIdentifier;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public Path getBasePath() {
        return this.workingBasePath;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public String getFilePath(FileID fileID) {
        return getFilePath(fileID.get());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public String getFilePath(int i) {
        FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(i));
        Preconditions.checkNotNull(fileMeta, "file not in file mapping for " + i);
        return fileMeta.getFilePath();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public FileMeta getFileMeta(int i) {
        return this.fileMapping.get(Integer.valueOf(i));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public FileID getFileID(long j) {
        return new FileIDImpl(getIDFromAddress(j));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public int getSimpleFileID(long j) {
        return getIDFromAddress(j);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public long getAddress(FileID fileID, long j) {
        return (fileID.get() << 32) | j;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public long getFileOffset(long j) {
        return j & 4294967295L;
    }

    @VisibleForTesting
    FileID getCurrentFileID() {
        return this.fileIDGenerator.get();
    }

    private int getIDFromAddress(long j) {
        return (int) (j >>> 32);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public FileReader getFileReader(long j) {
        checkDBStatus();
        int iDFromAddress = getIDFromAddress(j);
        FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(iDFromAddress));
        Preconditions.checkNotNull(fileMeta, "file not in file mapping for file id " + iDFromAddress);
        FileReader fileReader = fileMeta.getFileReader();
        if (fileReader == null) {
            synchronized (fileMeta) {
                fileReader = fileMeta.getFileReader();
                if (fileReader == null) {
                    GeminiInputStream geminiInputStream = null;
                    try {
                        geminiInputStream = new GeminiInputStream(new Path(fileMeta.getFilePath()));
                        fileReader = new FileReaderImpl(geminiInputStream, fileMeta);
                        fileMeta.setFileReader(fileReader);
                        if (this.closed) {
                            fileMeta.setFileReader(null);
                            LOG.warn("File manager has been closed, and close file reader for file {}", fileMeta);
                            throw new GeminiRuntimeException("File manager has been closed");
                        }
                    } catch (Exception e) {
                        try {
                        } catch (Exception e2) {
                            LOG.error("failed to close input stream {}", geminiInputStream, e2);
                        }
                        if (fileReader == null) {
                            if (geminiInputStream != null) {
                                geminiInputStream.close();
                            }
                            throw new GeminiRuntimeException("failed to create file reader for " + fileMeta, e);
                        }
                        fileReader.close();
                        throw new GeminiRuntimeException("failed to create file reader for " + fileMeta, e);
                    }
                }
            }
        }
        return fileReader;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public FileWriter createNewFileWriter() {
        checkDBStatus();
        Tuple2<FileID, String> newFilePath = getNewFilePath();
        FileID fileID = (FileID) newFilePath.f0;
        String str = (String) newFilePath.f1;
        FileMeta fileMeta = new FileMeta(str, fileID);
        GeminiDataOutputStream geminiDataOutputStream = null;
        GeminiOutputStream geminiOutputStream = null;
        try {
            if (!isValid()) {
                throw new IllegalStateException(this.invalidMessage);
            }
            FTFileWriter fTFileWriter = new FTFileWriter(new GeminiOutputStream(new Path(str)), this, fileID, str, this.writerFailCountThreshold.intValue(), this.exceptionStat);
            fileMeta.setFileWriter(fTFileWriter);
            fileMeta.addAndGetDBReference(1);
            FileMeta putIfAbsent = this.fileMapping.putIfAbsent(Integer.valueOf(fileID.get()), fileMeta);
            LOG.info("{} file manager put fileID {} with meta {}.", new Object[]{getFileManagerIdentifier(), fileID, fileMeta});
            Preconditions.checkState(putIfAbsent == null, "{} has existed", new Object[]{fileMeta});
            if (this.closed && this.fileMapping.remove(Integer.valueOf(fileID.get())) != null) {
                LOG.warn("File manager has been closed, and close file writer for file {}", fileMeta);
                throw new GeminiRuntimeException("File manager has been closed.");
            }
            this.fileManagerStat.addTotalCreatedFile(1);
            this.fileManagerStat.setMaxUsedFile(this.fileMapping.size());
            LOG.debug("Create new file {}.", fileMeta);
            return fTFileWriter;
        } catch (Exception e) {
            try {
            } catch (Exception e2) {
                LOG.error("failed to close output stream, {}, {}", (Object) null, e2);
            }
            if (0 == 0) {
                if (0 != 0) {
                    geminiOutputStream.close();
                }
                recycleFileID(fileID);
                deleteFile(str, false);
                this.fileManagerStat.addTotalFailCreateFile(1);
                LOG.error("Failed to create new file writer for {}", fileID, e);
                throw new GeminiRuntimeException("Failed to create new file writer for " + fileID + ", exception " + e, e);
            }
            geminiDataOutputStream.close();
            recycleFileID(fileID);
            deleteFile(str, false);
            this.fileManagerStat.addTotalFailCreateFile(1);
            LOG.error("Failed to create new file writer for {}", fileID, e);
            throw new GeminiRuntimeException("Failed to create new file writer for " + fileID + ", exception " + e, e);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void closeFileWriter(FileWriter fileWriter) {
        checkDBStatus();
        Preconditions.checkNotNull(fileWriter);
        FileID fileID = fileWriter.getFileID();
        FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(fileID.get()));
        Preconditions.checkNotNull(fileMeta, "file not in file mapping for {}", new Object[]{fileID});
        if (fileWriter.isValid()) {
            resetFileWriterErrorCount();
        } else {
            increaseFileWriterErrorCount();
        }
        try {
            fileWriter.close();
            LOG.debug("close file writer successfully: {}", fileMeta);
        } catch (Exception e) {
            LOG.error("failed to close file writer: file id {}", fileMeta, e);
        }
        fileMeta.setFileWriter(null);
        internalDecDBReference(fileMeta, this.gContext.getAccessNumber(), System.currentTimeMillis(), 0L);
    }

    @VisibleForTesting
    Tuple2<FileID, String> getNewFilePath() {
        FileID generate;
        synchronized (this.fileIDGenerator) {
            generate = this.fileIDGenerator.generate();
        }
        String uri = new Path(this.workingBasePath, this.backendUID + SnapshotManagerImpl.SNAPSHOT_FILE_SEPERATOR + this.fileSuffix.getAndIncrement()).toUri().toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug("FileID {} with new path {} is ready to create.", generate, uri);
        }
        return Tuple2.of(generate, uri);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void incDBReference(long j, long j2) {
        checkDBStatus();
        FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(getIDFromAddress(j)));
        Preconditions.checkNotNull(fileMeta);
        internalIncDBReference(fileMeta, j2);
    }

    private void internalIncDBReference(FileMeta fileMeta, long j) {
        Preconditions.checkState(((long) fileMeta.addAndGetDBReference(1)) > 0, "snapshot reference should be positive");
        fileMeta.addAndGetDataSize(j);
        this.fileManagerStat.addTotalDataSize(j);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void decDBReference(long j, long j2, long j3, long j4) {
        checkDBStatus();
        FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(getIDFromAddress(j)));
        Preconditions.checkNotNull(fileMeta);
        internalDecDBReference(fileMeta, j2, j3, j4);
    }

    private void internalDecDBReference(FileMeta fileMeta, long j, long j2, long j3) {
        fileMeta.updateDiscardAccessNumberAndTimestamp(j, j2);
        fileMeta.addAndGetDataSize(-j3);
        long addAndGetDBReference = fileMeta.addAndGetDBReference(-1);
        Preconditions.checkState(addAndGetDBReference >= 0, "data reference should not be negative");
        if (addAndGetDBReference == 0) {
            Preconditions.checkState(this.waitingDBDeletionFiles.add(fileMeta), "failed to add file to waitingDBDeletionFiles " + fileMeta);
            LOG.debug("add file to waitingDBDeletionFiles {}", fileMeta);
        }
        this.fileManagerStat.addTotalDataSize(-j3);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void incSnapshotReference(FileID fileID) {
        checkDBStatus();
        if (this.snapshotStorage) {
            FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(fileID.get()));
            Preconditions.checkNotNull(fileMeta);
            Preconditions.checkState(((long) fileMeta.addAndGetSnapshotReference(1)) > 0, "snapshot reference should be positive");
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void decSnapshotReference(FileID fileID) {
        checkDBStatus();
        if (this.snapshotStorage) {
            FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(fileID.get()));
            Preconditions.checkNotNull(fileMeta);
            long addAndGetSnapshotReference = fileMeta.addAndGetSnapshotReference(-1);
            Preconditions.checkState(addAndGetSnapshotReference >= 0, "snapshot reference should not be negative");
            if (addAndGetSnapshotReference == 0 && this.waitingSnapshotDeletionFiles.remove(fileMeta)) {
                markFileDeletion(fileMeta);
                LOG.debug("Mark file deletion by snapshot {}", fileMeta);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public Set<String> getMarkedDeletionFiles() {
        HashSet hashSet = new HashSet();
        Iterator<String> it = this.markedDeletionFiles.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next());
            it.remove();
        }
        return hashSet;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void restore(Map<Integer, FileMeta.RestoredFileMeta> map) {
        HashSet hashSet = new HashSet();
        long j = 0;
        for (Map.Entry<Integer, FileMeta.RestoredFileMeta> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            FileMeta.RestoredFileMeta value = entry.getValue();
            FileMeta fileMeta = this.fileMapping.get(Integer.valueOf(intValue));
            if (fileMeta != null) {
                LOG.error("file mapping should not have contained file id {}, old path: {}, new path: {}", new Object[]{Integer.valueOf(intValue), fileMeta.getFilePath(), value.filePath});
                throw new GeminiRuntimeException("file mapping should not have contained file id " + intValue);
            }
            Preconditions.checkState((value.dbReference == 0 && value.snapshotReference == 0) ? false : true, "db reference and snapshot reference can not be both 0: " + value.id + ", " + value.filePath);
            FileIDImpl fileIDImpl = new FileIDImpl(value.id);
            FileMeta fileMeta2 = new FileMeta(value.filePath, fileIDImpl, value.fileSize, value.dataSize, value.dbReference, value.snapshotReference, value.canDeleted);
            this.fileMapping.put(Integer.valueOf(intValue), fileMeta2);
            hashSet.add(fileIDImpl);
            j += value.dataSize;
            if (fileMeta2.addAndGetDBReference(0) == 0) {
                this.waitingSnapshotDeletionFiles.add(fileMeta2);
            }
        }
        this.fileIDGenerator.restoreFileIDs(hashSet);
        this.fileManagerStat.addTotalDataSize(j);
        this.fileManagerStat.setMaxUsedFile(this.fileMapping.size());
        this.fileManagerStat.setNumberUsedFile(this.fileMapping.size());
        LOG.info("restore file manager successfully at {}", this.workingBasePath);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public Map<Integer, FileMeta> getFileMapping() {
        return Collections.unmodifiableMap(this.fileMapping);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public Map<FileID, String> getFileMapping(Set<FileID> set) {
        Set set2 = (Set) set.stream().map((v0) -> {
            return v0.get();
        }).collect(Collectors.toSet());
        Stream<Map.Entry<Integer, FileMeta>> stream = this.fileMapping.entrySet().stream();
        set2.getClass();
        return (Map) stream.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toMap(entry -> {
            return new FileIDImpl(((Integer) entry.getKey()).intValue());
        }, entry2 -> {
            return ((FileMeta) entry2.getValue()).getFilePath();
        }));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void increaseFileWriterErrorCount() {
        synchronized (this.fileWriterErrorStatus) {
            if (((Long) this.fileWriterErrorStatus.f0).longValue() == -1) {
                this.fileWriterErrorStatus.f0 = Long.valueOf(System.currentTimeMillis());
            }
            Tuple2<Long, Integer> tuple2 = this.fileWriterErrorStatus;
            tuple2.f1 = Integer.valueOf(((Integer) tuple2.f1).intValue() + 1);
        }
        this.exceptionStat.addTotalFileWriterShift(1);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public void resetFileWriterErrorCount() {
        synchronized (this.fileWriterErrorStatus) {
            this.fileWriterErrorStatus.f0 = -1L;
            this.fileWriterErrorStatus.f1 = 0;
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public boolean isValid() {
        int intValue;
        long longValue;
        boolean z = true;
        synchronized (this.fileWriterErrorStatus) {
            intValue = ((Integer) this.fileWriterErrorStatus.f1).intValue();
            longValue = ((Long) this.fileWriterErrorStatus.f0).longValue();
        }
        if (intValue >= this.fileManagerFailCountThreshold.intValue()) {
            if (System.currentTimeMillis() - longValue < this.retryCreateFileWriterInterval) {
                z = false;
            } else {
                resetFileWriterErrorCount();
            }
        }
        return z;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed) {
                LOG.warn("FileManager ({}) has been closed", this.workingBasePath);
                return;
            }
            this.closed = true;
            this.fileDeletionCheckExecutor.shutdownNow();
            for (FileMeta fileMeta : this.fileMapping.values()) {
                FileWriter fileWriter = fileMeta.getFileWriter();
                if (fileWriter != null) {
                    try {
                        try {
                            fileWriter.close();
                            fileMeta.setFileWriter(null);
                        } catch (Exception e) {
                            LOG.error("failed to close writer {}", fileMeta, e);
                            fileMeta.setFileWriter(null);
                        }
                    } catch (Throwable th) {
                        fileMeta.setFileWriter(null);
                        throw th;
                    }
                }
                FileReader fileReader = fileMeta.getFileReader();
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Exception e2) {
                        LOG.error("failed to close reader {}", fileMeta, e2);
                    }
                }
            }
            if (this.snapshotStorage) {
                for (FileMeta fileMeta2 : this.fileMapping.values()) {
                    if (fileMeta2.addAndGetSnapshotReference(0) == 0 && fileMeta2.canDeleted()) {
                        this.markedDeletionFiles.add(fileMeta2.getFilePath());
                        LOG.info("Mark file deletion when close: {}", fileMeta2);
                    }
                }
                this.fileMapping.clear();
                this.gContext.getSupervisor().getFileCleaner().triggerCleanup(this);
            } else {
                try {
                    this.workingBasePath.getFileSystem().delete(this.workingBasePath, true);
                    LOG.info("FileManager is not a snapshot storage, delete the whole working base path, {}", this.workingBasePath);
                } catch (Exception e3) {
                    LOG.warn("Fail to delete the working base path {}", this.workingBasePath);
                }
            }
            LOG.info("File manager ({}) is closed", this.workingBasePath);
        }
    }

    public String toString() {
        return "FileManager{" + this.workingBasePath + "}";
    }

    @VisibleForTesting
    FileIDGenerator getFileIDGenerator() {
        return this.fileIDGenerator;
    }

    @VisibleForTesting
    SortedSet<FileMeta> getWaitingDBDeletionFiles() {
        return this.waitingDBDeletionFiles;
    }

    @VisibleForTesting
    Set<FileMeta> getWaitingSnapshotDeletionFiles() {
        return this.waitingSnapshotDeletionFiles;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void markFileDeletion(FileMeta fileMeta) {
        FileID fileId = fileMeta.getFileId();
        Preconditions.checkState(this.fileMapping.get(Integer.valueOf(fileId.get())) == fileMeta, "delete a file not in mapping table");
        FileWriter fileWriter = fileMeta.getFileWriter();
        try {
            if (fileWriter != null) {
                try {
                    fileWriter.close();
                    fileMeta.setFileWriter(null);
                } catch (Exception e) {
                    LOG.error("failed to close writer when marking deletion, {}, ", fileMeta, e);
                    fileMeta.setFileWriter(null);
                }
            }
            FileReader fileReader = fileMeta.getFileReader();
            if (fileReader != null) {
                try {
                    try {
                        fileReader.close();
                        fileMeta.setFileReader(null);
                    } catch (Exception e2) {
                        LOG.error("failed to close reader when marking deletion, {}, {}", fileMeta, e2);
                        fileMeta.setFileReader(null);
                    }
                } catch (Throwable th) {
                    fileMeta.setFileReader(null);
                    throw th;
                }
            }
            if (fileMeta.canDeleted()) {
                this.markedDeletionFiles.add(fileMeta.getFilePath());
                if (this.closed) {
                    deleteFile(fileMeta.getFilePath(), false);
                }
            }
            this.fileMapping.remove(Integer.valueOf(fileId.get()));
            recycleFileID(fileId);
            this.fileManagerStat.addTotalDeletedFile(1);
        } catch (Throwable th2) {
            fileMeta.setFileWriter(null);
            throw th2;
        }
    }

    private void checkDBStatus() {
        if (this.closed) {
            throw new GeminiRuntimeException("FileManager (" + this.workingBasePath + ") has been closed");
        }
    }

    private void recycleFileID(FileID fileID) {
        synchronized (this.fileIDGenerator) {
            this.fileIDGenerator.recycleFileID(fileID);
        }
    }

    private void deleteFile(String str, boolean z) {
        try {
            Path path = new Path(str);
            FileSystem.get(path.toUri()).delete(path, z);
        } catch (Exception e) {
            LOG.warn("Fail to delete file {}, {}", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void printStat() {
        if (LOG.isDebugEnabled()) {
            long currentTimeMillis = System.currentTimeMillis();
            if (this.lastPrintStatTime + 60000 < currentTimeMillis) {
                this.lastPrintStatTime = currentTimeMillis;
                this.fileManagerStat.setNumberUsedFile(this.fileMapping.size());
                this.fileManagerStat.setNumberWaitingDBDeletionFile(this.waitingDBDeletionFiles.size());
                this.fileManagerStat.setNumberWaitingSnapshotDeletionFile(this.waitingSnapshotDeletionFiles.size());
                this.fileManagerStat.setNumberMarkDeletionFile(this.markedDeletionFiles.size());
                LOG.info("FileManagerStat {}, {}", this.workingBasePath, this.fileManagerStat);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getCurrentAccessNumber() {
        return Math.min(this.gContext.getAccessNumber(), this.gContext.getMinSnapshotAccessNumber());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.FileManager
    public DataPageUtil getDataPageUtil() {
        return this.dataPageUtil;
    }
}
