/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.filecompaction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.gemini.engine.DbPageIterator;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompaction;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionStat;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCompactionMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileCompactionImpl
implements FileCompaction {
    private static final Logger LOG = LoggerFactory.getLogger(FileCompactionImpl.class);
    private final GContext gContext;
    private final FileCompactionPageTransfer pageTransfer;
    private final FileManager fileManager;
    private float compactionTriggerRatio;
    private float compactionTargetRatio;
    private long checkInterval;
    private final ScheduledThreadPoolExecutor checkExecutor;
    private volatile RunningCompaction runningCompaction;
    private volatile int compactionCounter;
    private final FileCompactionStat fileCompactionStat;
    @Nullable
    private FileCompactionMetrics metrics;
    private volatile boolean closed;

    public FileCompactionImpl(GContext gContext, FileCompactionPageTransfer pageTransfer) {
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.pageTransfer = (FileCompactionPageTransfer)Preconditions.checkNotNull((Object)pageTransfer);
        this.fileManager = (FileManager)Preconditions.checkNotNull((Object)pageTransfer.getDbFileManager());
        GConfiguration config = gContext.getGConfiguration();
        this.compactionTriggerRatio = config.getCompactionTriggerRatio();
        this.compactionTargetRatio = config.getCompactionTargetRatio();
        this.checkInterval = config.getAmplificationCheckInterval();
        Preconditions.checkArgument((this.compactionTriggerRatio >= 1.0f ? 1 : 0) != 0, (Object)("compaction trigger ratio " + this.compactionTriggerRatio + " should not be less than 1"));
        Preconditions.checkArgument((this.compactionTargetRatio >= 1.0f ? 1 : 0) != 0, (Object)("compaction target ratio " + this.compactionTargetRatio + " should not be less than 1"));
        Preconditions.checkArgument((this.compactionTriggerRatio > this.compactionTargetRatio ? 1 : 0) != 0, (Object)String.format("compaction trigger ratio %f should be larger than target ratio %f", Float.valueOf(this.compactionTriggerRatio), Float.valueOf(this.compactionTargetRatio)));
        Preconditions.checkArgument((this.checkInterval > 0L ? 1 : 0) != 0, (Object)("amplification check interval " + this.checkInterval + " should be non-negative"));
        this.checkExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "FileCompaction-%d").build());
        this.checkExecutor.setRemoveOnCancelPolicy(true);
        this.checkExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.checkExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.fileCompactionStat = new FileCompactionStat();
        MetricGroup dbMetricGroup = gContext.getDBMetricGroup();
        if (dbMetricGroup != null) {
            this.metrics = new FileCompactionMetrics(dbMetricGroup.addGroup("file_compaction"), config.getMetricSampleCount(), config.getMetricHistogramWindowSize());
            this.metrics.register(this.fileCompactionStat);
        }
        this.compactionCounter = 0;
        this.closed = false;
        LOG.info("FileCompaction is created, trigger ratio {}, target ratio {}, check interval {}ms", new Object[]{Float.valueOf(this.compactionTriggerRatio), Float.valueOf(this.compactionTargetRatio), this.checkInterval});
    }

    @Override
    public void start() {
        this.checkExecutor.scheduleWithFixedDelay(new AmplificationCheckRunner(), this.checkInterval, this.checkInterval, TimeUnit.MILLISECONDS);
        LOG.info("FileCompaction is started.");
    }

    @VisibleForTesting
    RunningCompaction getRunningCompaction() {
        return this.runningCompaction;
    }

    @VisibleForTesting
    FileCompactionPageTransfer getPageTransfer() {
        return this.pageTransfer;
    }

    @VisibleForTesting
    CompactionCheckResult checkCompaction(Map<Integer, FileMeta> fileMetas, boolean onlyCheck) {
        int i;
        boolean isNeedCompaction;
        FileInfo info;
        long totalFileSize = 0L;
        long totalDataSize = 0L;
        HashMap<Integer, FileInfo> infoMap = new HashMap<Integer, FileInfo>();
        HashSet<Integer> fileOpenedForWrite = new HashSet<Integer>();
        for (FileMeta fileMeta : fileMetas.values()) {
            long fileSize = fileMeta.getFileSize();
            long dataSize = fileMeta.addAndGetDataSize(0L);
            int numPage = fileMeta.addAndGetDBReference(0);
            if (numPage == 0) continue;
            int fileId = fileMeta.getFileId().get();
            info = (FileInfo)infoMap.get(fileId);
            if (info == null) {
                info = new FileInfo();
                infoMap.put(fileId, info);
            } else {
                totalFileSize -= info.fileSize;
                totalDataSize -= info.dataSize;
                fileOpenedForWrite.remove(fileId);
            }
            if (fileMeta.getFileWriter() != null) {
                fileOpenedForWrite.add(fileId);
            }
            totalFileSize += fileSize;
            totalDataSize += dataSize;
            float ratio = (float)fileSize / (float)dataSize;
            info.set(fileId, fileSize, dataSize, numPage, ratio);
        }
        float totalRatio = totalDataSize == 0L ? 1.0f : (float)totalFileSize / (float)totalDataSize;
        ArrayList infoList = new ArrayList(infoMap.values());
        infoList.sort((i1, i2) -> Float.compare(i2.ratio, i1.ratio));
        if (LOG.isDebugEnabled()) {
            LOG.debug("current file statistics: total files {}, total file size {}, total data size {}, amplification ratio {}", new Object[]{infoList.size(), totalFileSize, totalDataSize, Float.valueOf(totalRatio)});
            LOG.debug("current file details: {}", infoList);
        }
        boolean bl = isNeedCompaction = totalRatio > this.compactionTriggerRatio;
        if (onlyCheck || !isNeedCompaction) {
            return CompactionCheckResult.of(totalFileSize, totalDataSize, totalRatio, isNeedCompaction);
        }
        HashMap<Integer, FileInfo> compactionFiles = new HashMap<Integer, FileInfo>();
        long expectedTotalFileSize = totalFileSize;
        float expectedTotalRatio = totalRatio;
        for (i = 0; i < infoList.size() && expectedTotalRatio > this.compactionTargetRatio; ++i) {
            info = (FileInfo)infoList.get(i);
            if (fileOpenedForWrite.contains(info.fileId)) continue;
            compactionFiles.put(info.fileId, info);
            expectedTotalRatio = (float)(expectedTotalFileSize -= info.fileSize - info.dataSize) / (float)totalDataSize;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("expected file statistics: compact {} files, total file size {}, total data size {}, amplification ratio {}", new Object[]{i, expectedTotalFileSize, totalDataSize, Float.valueOf(expectedTotalRatio)});
            LOG.debug("expected files to compact: {}", compactionFiles.values());
        }
        return CompactionCheckResult.of(totalFileSize, totalDataSize, totalRatio, true, expectedTotalFileSize, expectedTotalRatio, compactionFiles);
    }

    @VisibleForTesting
    void startCompaction(CompactionCheckResult result) {
        RunningCompaction compaction;
        if (!result.needCompaction()) {
            return;
        }
        ++this.compactionCounter;
        this.runningCompaction = compaction = new RunningCompaction(this.compactionCounter, result);
        LOG.info("Start compaction " + this.compactionCounter);
        compaction.startTime = System.currentTimeMillis();
        Map<Integer, FileInfo> compactionFiles = compaction.compactionCheckResult.compactionFiles;
        EventExecutorGroup executorGroup = this.gContext.getSupervisor().getFlushExecutorGroup();
        DbPageIterator iterator = this.gContext.getGeminiDB().getDbPageIterator();
        compaction.addTask();
        while (iterator.valid()) {
            int fileId;
            PageAddress pa = iterator.currentPage();
            if (this.pageTransfer.hasDbFileAddress(pa) && compactionFiles.containsKey(fileId = this.pageTransfer.getDbFileId(pa))) {
                compaction.addTask();
                compaction.addPreparePage(pa.getDataLen());
                this.pageTransfer.transferPage(pa, iterator.currentRegion().getGRegionContext(), executorGroup.next(), (success, throwable) -> {
                    if (success.booleanValue()) {
                        compaction.addSuccessfulPage(pa.getDataLen());
                    } else {
                        compaction.addFailedPage(pa.getDataLen());
                        LOG.debug("file compaction failed", throwable);
                    }
                    compaction.removeTask();
                });
            }
            iterator.next();
        }
        compaction.removeTask();
    }

    private void endCompaction() {
        Preconditions.checkNotNull((Object)this.runningCompaction, (String)"There is no running compaction");
        Preconditions.checkState((boolean)this.runningCompaction.finished(), (Object)"file compaction has not finished");
        CompactionCheckResult result = this.checkCompaction(this.fileManager.getFileMapping(), true);
        this.runningCompaction.endTime = System.currentTimeMillis();
        long duration = this.runningCompaction.endTime - this.runningCompaction.startTime;
        this.fileCompactionStat.addAndGetNumberCompaction(1);
        this.fileCompactionStat.setTransferSize(this.runningCompaction.pageSizePrepareToTransfer);
        this.fileCompactionStat.setCompactionDuration(duration);
        this.fileCompactionStat.setAmplificationRatio(result.amplificationRatio);
        LOG.info("end compaction {}, duration {}, transfer size {}, amplification after compaction {}", new Object[]{this.runningCompaction.id, duration, this.runningCompaction.pageSizePrepareToTransfer, Float.valueOf(result.amplificationRatio)});
        if (LOG.isDebugEnabled()) {
            LOG.debug("completed compaction details {}", (Object)this.runningCompaction);
        }
        this.runningCompaction = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        FileCompactionImpl fileCompactionImpl = this;
        synchronized (fileCompactionImpl) {
            if (this.closed) {
                LOG.warn("FileCompaction has been closed");
                return;
            }
            this.closed = true;
        }
        this.checkExecutor.shutdownNow();
        LOG.info("File compaction is closed.");
    }

    class RunningCompaction {
        int id;
        CompactionCheckResult compactionCheckResult;
        AtomicInteger runningTasks = new AtomicInteger(0);
        int pagesPrepareToTransfer = 0;
        long pageSizePrepareToTransfer = 0L;
        AtomicInteger pagesSuccessfullyTransferred = new AtomicInteger(0);
        AtomicLong pageSizeSuccessfullyTransferred = new AtomicLong(0L);
        AtomicInteger pagesFailedToTransfer = new AtomicInteger(0);
        AtomicLong pageSizeFailedToTransfer = new AtomicLong(0L);
        long startTime;
        long endTime;

        RunningCompaction(int id, CompactionCheckResult result) {
            this.id = id;
            this.compactionCheckResult = result;
        }

        boolean finished() {
            return this.runningTasks.get() == 0;
        }

        void addTask() {
            this.runningTasks.addAndGet(1);
        }

        void removeTask() {
            if (this.runningTasks.addAndGet(-1) == 0) {
                FileCompactionImpl.this.endCompaction();
            }
        }

        void addPreparePage(long size) {
            ++this.pagesPrepareToTransfer;
            this.pageSizePrepareToTransfer += size;
        }

        void addSuccessfulPage(long size) {
            this.pagesSuccessfullyTransferred.addAndGet(1);
            this.pageSizeSuccessfullyTransferred.addAndGet(size);
        }

        void addFailedPage(long size) {
            this.pagesFailedToTransfer.addAndGet(1);
            this.pageSizeFailedToTransfer.addAndGet(size);
        }

        public String toString() {
            return "RunningCompaction{id=" + this.id + ", totalDataSize=" + this.compactionCheckResult.totalDataSize + ", pagesPrepareToTransfer=" + this.pagesPrepareToTransfer + ", pageSizePrepareToTransfer=" + this.pageSizePrepareToTransfer + ", pagesSuccessfullyTransferred=" + this.pagesSuccessfullyTransferred.get() + ", pageSizeSuccessfullyTransferred=" + this.pageSizeSuccessfullyTransferred.get() + ", pagesFailedToTransfer=" + this.pagesFailedToTransfer.get() + ", pageSizeFailedToTransfer=" + this.pageSizeFailedToTransfer.get() + ", startTime=" + this.startTime + ", endTime=" + this.endTime + ", duration=" + (this.endTime - this.startTime) + "}";
        }
    }

    static class CompactionCheckResult {
        long totalFileSize;
        long totalDataSize;
        float amplificationRatio;
        boolean isNeedCompaction;
        Map<Integer, FileInfo> compactionFiles;
        long expectedTotalFileSize;
        float expectedAmplificationRatio;

        CompactionCheckResult() {
        }

        boolean needCompaction() {
            return this.isNeedCompaction;
        }

        static CompactionCheckResult of(long totalFileSize, long totalDataSize, float amplificationRatio, boolean isNeedCompaction) {
            return CompactionCheckResult.of(totalFileSize, totalDataSize, amplificationRatio, isNeedCompaction, totalFileSize, amplificationRatio, Collections.emptyMap());
        }

        static CompactionCheckResult of(long totalFileSize, long totalDataSize, float amplificationRatio, boolean isNeedCompaction, long expectedTotalFileSize, float expectedAmplificationRatio, Map<Integer, FileInfo> compactionFiles) {
            CompactionCheckResult result = new CompactionCheckResult();
            result.totalFileSize = totalFileSize;
            result.totalDataSize = totalDataSize;
            result.amplificationRatio = amplificationRatio;
            result.isNeedCompaction = isNeedCompaction;
            result.expectedTotalFileSize = expectedTotalFileSize;
            result.expectedAmplificationRatio = expectedAmplificationRatio;
            result.compactionFiles = compactionFiles;
            return result;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("CompactionCheckResult{");
            sb.append("totalFileSize=");
            sb.append(this.totalFileSize);
            sb.append(", totalDataSize=");
            sb.append(this.totalDataSize);
            sb.append(", amplificationRatio=");
            sb.append(this.amplificationRatio);
            sb.append(", needCompaction=");
            sb.append(this.isNeedCompaction);
            if (!this.compactionFiles.isEmpty()) {
                sb.append(", expectedTotalFileSize=");
                sb.append(this.expectedTotalFileSize);
                sb.append(", expectedAmplificationRatio=");
                sb.append(this.expectedAmplificationRatio);
                sb.append(", numberOfCompactionFiles=");
                sb.append(this.compactionFiles.size());
            }
            sb.append("}");
            return sb.toString();
        }
    }

    static class FileInfo {
        int fileId;
        long fileSize;
        long dataSize;
        float ratio;
        int numPage;

        FileInfo() {
        }

        void set(int fileId, long fileSize, long dataSize, int numPage, float ratio) {
            this.fileId = fileId;
            this.fileSize = fileSize;
            this.dataSize = dataSize;
            this.numPage = numPage;
            this.ratio = ratio;
        }

        public String toString() {
            return "FileInfo={fileId=" + this.fileId + ", fileSize=" + this.fileSize + ", dataSize=" + this.dataSize + ", numPage=" + this.numPage + ", ratio=" + this.ratio + "}";
        }
    }

    class AmplificationCheckRunner
    implements Runnable {
        AmplificationCheckRunner() {
        }

        @Override
        public void run() {
            if (FileCompactionImpl.this.runningCompaction == null) {
                CompactionCheckResult result = FileCompactionImpl.this.checkCompaction(FileCompactionImpl.this.fileManager.getFileMapping(), false);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Compaction check result: {}", (Object)result);
                }
                FileCompactionImpl.this.fileCompactionStat.setAmplificationRatio(result.amplificationRatio);
                if (result.needCompaction()) {
                    FileCompactionImpl.this.startCompaction(result);
                }
            }
        }
    }
}

