/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.fs;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleanerStat;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCleanerMetrics;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileCleanerImpl
implements FileCleaner {
    private static final Logger LOG = LoggerFactory.getLogger(FileCleanerImpl.class);
    private final GContext gContext;
    private final FileCleanerStat fileCleanerStat;
    private final Set<FileManager> registeredFileManagers;
    private final Queue<String> registeredFiles;
    private final ScheduledThreadPoolExecutor fileCleanUpExecutor;
    private volatile boolean closed;

    public FileCleanerImpl(GContext gContext) {
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.registeredFileManagers = Collections.newSetFromMap(new ConcurrentHashMap());
        this.fileCleanerStat = new FileCleanerStat();
        FileCleanerMetrics fileCleanerMetrics = gContext.getFileCleanerMetrics();
        if (fileCleanerMetrics != null) {
            gContext.getFileCleanerMetrics().register(this.fileCleanerStat);
        }
        this.registeredFiles = new ConcurrentLinkedQueue<String>();
        this.fileCleanUpExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExcetorPrefixName() + "FileCleaner-%d").build());
        this.fileCleanUpExecutor.setRemoveOnCancelPolicy(true);
        this.fileCleanUpExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.fileCleanUpExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.closed = false;
        LOG.info("FileCleaner is created.");
    }

    @Override
    public void start() {
        this.fileCleanUpExecutor.scheduleAtFixedRate(new CleanupRunner(), 0L, this.gContext.getGConfiguration().getFileCleanInterval(), TimeUnit.MILLISECONDS);
        LOG.info("FileCleaner is started");
    }

    @Override
    public void registerFileManager(FileManager fileManager) {
        this.registeredFileManagers.add(fileManager);
    }

    @Override
    public void unregisterFileManager(FileManager fileManager) {
        this.registeredFileManagers.remove(fileManager);
    }

    @Override
    public void triggerCleanup(FileManager fileManager) {
        if (!this.registeredFileManagers.contains(fileManager)) {
            LOG.warn("file manager {} is not registered", (Object)fileManager);
            return;
        }
        Set<String> files = fileManager.getMarkedDeletionFiles();
        this.registeredFiles.addAll(files);
        this.fileCleanerStat.addTotalReceivedFile(files.size());
    }

    @Override
    public boolean registerFilesToClean(Collection<String> filesToClean) {
        return this.registeredFiles.addAll(filesToClean);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        String filePath;
        FileCleanerImpl fileCleanerImpl = this;
        synchronized (fileCleanerImpl) {
            if (this.closed) {
                LOG.warn("FileCleaner ({}) has been closed");
                return;
            }
            this.closed = true;
        }
        this.fileCleanUpExecutor.shutdownNow();
        while (!this.registeredFiles.isEmpty() && (filePath = this.registeredFiles.poll()) != null) {
            try {
                Path path = new Path(filePath);
                path.getFileSystem().delete(path, false);
            }
            catch (Exception e) {
                LOG.warn("Failed to delete file {} when closing file cleaner, {}", (Object)filePath, (Object)e);
            }
        }
        this.registeredFileManagers.clear();
        LOG.info("FileCleaner is closed");
    }

    @VisibleForTesting
    Collection<String> getRegisteredFiles() {
        return new ArrayList<String>(this.registeredFiles);
    }

    private void printStat() {
        LOG.info("FileCleanerStat[ totalReceivedFiles: {}, totalDeletedFiles: {}, totalFailedDeleteFiles: {}]", new Object[]{this.fileCleanerStat.addTotalReceivedFile(0), this.fileCleanerStat.addTotalDeletedFile(0), this.fileCleanerStat.addTotalFailedDeletedFiles(0)});
    }

    private class CleanupRunner
    implements Runnable {
        private CleanupRunner() {
        }

        @Override
        public void run() {
            String filePath;
            for (FileManager fileManager : FileCleanerImpl.this.registeredFileManagers) {
                if (FileCleanerImpl.this.closed) break;
                Set<String> files = fileManager.getMarkedDeletionFiles();
                FileCleanerImpl.this.registeredFiles.addAll(files);
                FileCleanerImpl.this.fileCleanerStat.addTotalReceivedFile(files.size());
            }
            while (!FileCleanerImpl.this.closed && !FileCleanerImpl.this.registeredFiles.isEmpty() && (filePath = (String)FileCleanerImpl.this.registeredFiles.poll()) != null) {
                boolean success;
                block8: {
                    success = false;
                    Path path = new Path(filePath);
                    try {
                        path.getFileSystem().delete(path, false);
                        success = true;
                        FileCleanerImpl.this.fileCleanerStat.addTotalDeletedFile(1);
                        LOG.info("Delete file {}", (Object)filePath);
                    }
                    catch (Exception e) {
                        FileCleanerImpl.this.fileCleanerStat.addTotalFailedDeletedFiles(1);
                        LOG.warn("failed to clean up file {} with Exception {}", (Object)filePath, (Object)e);
                        if (!FileCleanerImpl.this.closed) break block8;
                        try {
                            path.getFileSystem().delete(path, false);
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                    }
                }
                if (!success) continue;
                try {
                    Thread.sleep(100L);
                }
                catch (Exception exception) {}
            }
            FileCleanerImpl.this.printStat();
        }
    }
}

