package org.apache.flink.runtime.state.gemini.engine.filecompaction;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.gemini.engine.DbPageIterator;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCompactionMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecompaction/FileCompactionImpl.class */
public class FileCompactionImpl implements FileCompaction {
    private static final Logger LOG = LoggerFactory.getLogger(FileCompactionImpl.class);
    private final GContext gContext;
    private final FileCompactionPageTransfer pageTransfer;
    private final FileManager fileManager;
    private float compactionTriggerRatio;
    private float compactionTargetRatio;
    private long checkInterval;
    private final ScheduledThreadPoolExecutor checkExecutor;
    private volatile RunningCompaction runningCompaction;
    private volatile int compactionCounter;
    private final FileCompactionStat fileCompactionStat;

    @Nullable
    private FileCompactionMetrics metrics;
    private volatile boolean closed;

    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecompaction/FileCompactionImpl$AmplificationCheckRunner.class */
    class AmplificationCheckRunner implements Runnable {
        AmplificationCheckRunner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (FileCompactionImpl.this.runningCompaction == null) {
                CompactionCheckResult checkCompaction = FileCompactionImpl.this.checkCompaction(FileCompactionImpl.this.fileManager.getFileMapping(), false);
                if (FileCompactionImpl.LOG.isDebugEnabled()) {
                    FileCompactionImpl.LOG.debug("Compaction check result: {}", checkCompaction);
                }
                FileCompactionImpl.this.fileCompactionStat.setAmplificationRatio(checkCompaction.amplificationRatio);
                if (checkCompaction.needCompaction()) {
                    FileCompactionImpl.this.startCompaction(checkCompaction);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecompaction/FileCompactionImpl$CompactionCheckResult.class */
    public static class CompactionCheckResult {
        long totalFileSize;
        long totalDataSize;
        float amplificationRatio;
        boolean isNeedCompaction;
        Map<Integer, FileInfo> compactionFiles;
        long expectedTotalFileSize;
        float expectedAmplificationRatio;

        CompactionCheckResult() {
        }

        boolean needCompaction() {
            return this.isNeedCompaction;
        }

        static CompactionCheckResult of(long j, long j2, float f, boolean z) {
            return of(j, j2, f, z, j, f, Collections.emptyMap());
        }

        static CompactionCheckResult of(long j, long j2, float f, boolean z, long j3, float f2, Map<Integer, FileInfo> map) {
            CompactionCheckResult compactionCheckResult = new CompactionCheckResult();
            compactionCheckResult.totalFileSize = j;
            compactionCheckResult.totalDataSize = j2;
            compactionCheckResult.amplificationRatio = f;
            compactionCheckResult.isNeedCompaction = z;
            compactionCheckResult.expectedTotalFileSize = j3;
            compactionCheckResult.expectedAmplificationRatio = f2;
            compactionCheckResult.compactionFiles = map;
            return compactionCheckResult;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("CompactionCheckResult{");
            sb.append("totalFileSize=");
            sb.append(this.totalFileSize);
            sb.append(", totalDataSize=");
            sb.append(this.totalDataSize);
            sb.append(", amplificationRatio=");
            sb.append(this.amplificationRatio);
            sb.append(", needCompaction=");
            sb.append(this.isNeedCompaction);
            if (!this.compactionFiles.isEmpty()) {
                sb.append(", expectedTotalFileSize=");
                sb.append(this.expectedTotalFileSize);
                sb.append(", expectedAmplificationRatio=");
                sb.append(this.expectedAmplificationRatio);
                sb.append(", numberOfCompactionFiles=");
                sb.append(this.compactionFiles.size());
            }
            sb.append("}");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecompaction/FileCompactionImpl$FileInfo.class */
    public static class FileInfo {
        int fileId;
        long fileSize;
        long dataSize;
        float ratio;
        int numPage;

        FileInfo() {
        }

        void set(int i, long j, long j2, int i2, float f) {
            this.fileId = i;
            this.fileSize = j;
            this.dataSize = j2;
            this.numPage = i2;
            this.ratio = f;
        }

        public String toString() {
            return "FileInfo={fileId=" + this.fileId + ", fileSize=" + this.fileSize + ", dataSize=" + this.dataSize + ", numPage=" + this.numPage + ", ratio=" + this.ratio + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecompaction/FileCompactionImpl$RunningCompaction.class */
    public class RunningCompaction {
        int id;
        CompactionCheckResult compactionCheckResult;
        AtomicInteger runningTasks = new AtomicInteger(0);
        int pagesPrepareToTransfer = 0;
        long pageSizePrepareToTransfer = 0;
        AtomicInteger pagesSuccessfullyTransferred = new AtomicInteger(0);
        AtomicLong pageSizeSuccessfullyTransferred = new AtomicLong(0);
        AtomicInteger pagesFailedToTransfer = new AtomicInteger(0);
        AtomicLong pageSizeFailedToTransfer = new AtomicLong(0);
        long startTime;
        long endTime;

        RunningCompaction(int i, CompactionCheckResult compactionCheckResult) {
            this.id = i;
            this.compactionCheckResult = compactionCheckResult;
        }

        boolean finished() {
            return this.runningTasks.get() == 0;
        }

        void addTask() {
            this.runningTasks.addAndGet(1);
        }

        void removeTask() {
            if (this.runningTasks.addAndGet(-1) == 0) {
                FileCompactionImpl.this.endCompaction();
            }
        }

        void addPreparePage(long j) {
            this.pagesPrepareToTransfer++;
            this.pageSizePrepareToTransfer += j;
        }

        void addSuccessfulPage(long j) {
            this.pagesSuccessfullyTransferred.addAndGet(1);
            this.pageSizeSuccessfullyTransferred.addAndGet(j);
        }

        void addFailedPage(long j) {
            this.pagesFailedToTransfer.addAndGet(1);
            this.pageSizeFailedToTransfer.addAndGet(j);
        }

        public String toString() {
            return "RunningCompaction{id=" + this.id + ", totalDataSize=" + this.compactionCheckResult.totalDataSize + ", pagesPrepareToTransfer=" + this.pagesPrepareToTransfer + ", pageSizePrepareToTransfer=" + this.pageSizePrepareToTransfer + ", pagesSuccessfullyTransferred=" + this.pagesSuccessfullyTransferred.get() + ", pageSizeSuccessfullyTransferred=" + this.pageSizeSuccessfullyTransferred.get() + ", pagesFailedToTransfer=" + this.pagesFailedToTransfer.get() + ", pageSizeFailedToTransfer=" + this.pageSizeFailedToTransfer.get() + ", startTime=" + this.startTime + ", endTime=" + this.endTime + ", duration=" + (this.endTime - this.startTime) + "}";
        }
    }

    public FileCompactionImpl(GContext gContext, FileCompactionPageTransfer fileCompactionPageTransfer) {
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        this.pageTransfer = (FileCompactionPageTransfer) Preconditions.checkNotNull(fileCompactionPageTransfer);
        this.fileManager = (FileManager) Preconditions.checkNotNull(fileCompactionPageTransfer.getDbFileManager());
        GConfiguration gConfiguration = gContext.getGConfiguration();
        this.compactionTriggerRatio = gConfiguration.getCompactionTriggerRatio();
        this.compactionTargetRatio = gConfiguration.getCompactionTargetRatio();
        this.checkInterval = gConfiguration.getAmplificationCheckInterval();
        Preconditions.checkArgument(this.compactionTriggerRatio >= 1.0f, "compaction trigger ratio " + this.compactionTriggerRatio + " should not be less than 1");
        Preconditions.checkArgument(this.compactionTargetRatio >= 1.0f, "compaction target ratio " + this.compactionTargetRatio + " should not be less than 1");
        Preconditions.checkArgument(this.compactionTriggerRatio > this.compactionTargetRatio, String.format("compaction trigger ratio %f should be larger than target ratio %f", Float.valueOf(this.compactionTriggerRatio), Float.valueOf(this.compactionTargetRatio)));
        Preconditions.checkArgument(this.checkInterval > 0, "amplification check interval " + this.checkInterval + " should be non-negative");
        this.checkExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "FileCompaction-%d").build());
        this.checkExecutor.setRemoveOnCancelPolicy(true);
        this.checkExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.checkExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.fileCompactionStat = new FileCompactionStat();
        MetricGroup dBMetricGroup = gContext.getDBMetricGroup();
        if (dBMetricGroup != null) {
            this.metrics = new FileCompactionMetrics(dBMetricGroup.addGroup("file_compaction"), gConfiguration.getMetricSampleCount(), gConfiguration.getMetricHistogramWindowSize());
            this.metrics.register(this.fileCompactionStat);
        }
        this.compactionCounter = 0;
        this.closed = false;
        LOG.info("FileCompaction is created, trigger ratio {}, target ratio {}, check interval {}ms", new Object[]{Float.valueOf(this.compactionTriggerRatio), Float.valueOf(this.compactionTargetRatio), Long.valueOf(this.checkInterval)});
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompaction
    public void start() {
        this.checkExecutor.scheduleWithFixedDelay(new AmplificationCheckRunner(), this.checkInterval, this.checkInterval, TimeUnit.MILLISECONDS);
        LOG.info("FileCompaction is started.");
    }

    @VisibleForTesting
    RunningCompaction getRunningCompaction() {
        return this.runningCompaction;
    }

    @VisibleForTesting
    FileCompactionPageTransfer getPageTransfer() {
        return this.pageTransfer;
    }

    @VisibleForTesting
    CompactionCheckResult checkCompaction(Map<Integer, FileMeta> map, boolean z) {
        long j = 0;
        long j2 = 0;
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (FileMeta fileMeta : map.values()) {
            long fileSize = fileMeta.getFileSize();
            long addAndGetDataSize = fileMeta.addAndGetDataSize(0L);
            int addAndGetDBReference = fileMeta.addAndGetDBReference(0);
            if (addAndGetDBReference != 0) {
                int i = fileMeta.getFileId().get();
                FileInfo fileInfo = (FileInfo) hashMap.get(Integer.valueOf(i));
                if (fileInfo == null) {
                    fileInfo = new FileInfo();
                    hashMap.put(Integer.valueOf(i), fileInfo);
                } else {
                    j -= fileInfo.fileSize;
                    j2 -= fileInfo.dataSize;
                    hashSet.remove(Integer.valueOf(i));
                }
                if (fileMeta.getFileWriter() != null) {
                    hashSet.add(Integer.valueOf(i));
                }
                j += fileSize;
                j2 += addAndGetDataSize;
                fileInfo.set(i, fileSize, addAndGetDataSize, addAndGetDBReference, ((float) fileSize) / ((float) addAndGetDataSize));
            }
        }
        float f = j2 == 0 ? 1.0f : ((float) j) / ((float) j2);
        ArrayList arrayList = new ArrayList(hashMap.values());
        arrayList.sort((fileInfo2, fileInfo3) -> {
            return Float.compare(fileInfo3.ratio, fileInfo2.ratio);
        });
        if (LOG.isDebugEnabled()) {
            LOG.debug("current file statistics: total files {}, total file size {}, total data size {}, amplification ratio {}", new Object[]{Integer.valueOf(arrayList.size()), Long.valueOf(j), Long.valueOf(j2), Float.valueOf(f)});
            LOG.debug("current file details: {}", arrayList);
        }
        boolean z2 = f > this.compactionTriggerRatio;
        if (z || !z2) {
            return CompactionCheckResult.of(j, j2, f, z2);
        }
        HashMap hashMap2 = new HashMap();
        long j3 = j;
        float f2 = f;
        int i2 = 0;
        while (i2 < arrayList.size() && f2 > this.compactionTargetRatio) {
            FileInfo fileInfo4 = (FileInfo) arrayList.get(i2);
            if (!hashSet.contains(Integer.valueOf(fileInfo4.fileId))) {
                hashMap2.put(Integer.valueOf(fileInfo4.fileId), fileInfo4);
                j3 -= fileInfo4.fileSize - fileInfo4.dataSize;
                f2 = ((float) j3) / ((float) j2);
            }
            i2++;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("expected file statistics: compact {} files, total file size {}, total data size {}, amplification ratio {}", new Object[]{Integer.valueOf(i2), Long.valueOf(j3), Long.valueOf(j2), Float.valueOf(f2)});
            LOG.debug("expected files to compact: {}", hashMap2.values());
        }
        return CompactionCheckResult.of(j, j2, f, true, j3, f2, hashMap2);
    }

    @VisibleForTesting
    void startCompaction(CompactionCheckResult compactionCheckResult) {
        if (compactionCheckResult.needCompaction()) {
            this.compactionCounter++;
            RunningCompaction runningCompaction = new RunningCompaction(this.compactionCounter, compactionCheckResult);
            this.runningCompaction = runningCompaction;
            LOG.info("Start compaction " + this.compactionCounter);
            runningCompaction.startTime = System.currentTimeMillis();
            Map<Integer, FileInfo> map = runningCompaction.compactionCheckResult.compactionFiles;
            EventExecutorGroup flushExecutorGroup = this.gContext.getSupervisor().getFlushExecutorGroup();
            DbPageIterator dbPageIterator = this.gContext.getGeminiDB().getDbPageIterator();
            runningCompaction.addTask();
            while (dbPageIterator.valid()) {
                PageAddress currentPage = dbPageIterator.currentPage();
                if (this.pageTransfer.hasDbFileAddress(currentPage) && map.containsKey(Integer.valueOf(this.pageTransfer.getDbFileId(currentPage)))) {
                    runningCompaction.addTask();
                    runningCompaction.addPreparePage(currentPage.getDataLen());
                    this.pageTransfer.transferPage(currentPage, dbPageIterator.currentRegion().getGRegionContext(), flushExecutorGroup.next(), (bool, th) -> {
                        if (bool.booleanValue()) {
                            runningCompaction.addSuccessfulPage(currentPage.getDataLen());
                        } else {
                            runningCompaction.addFailedPage(currentPage.getDataLen());
                            LOG.debug("file compaction failed", th);
                        }
                        runningCompaction.removeTask();
                    });
                }
                dbPageIterator.next();
            }
            runningCompaction.removeTask();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void endCompaction() {
        Preconditions.checkNotNull(this.runningCompaction, "There is no running compaction");
        Preconditions.checkState(this.runningCompaction.finished(), "file compaction has not finished");
        CompactionCheckResult checkCompaction = checkCompaction(this.fileManager.getFileMapping(), true);
        this.runningCompaction.endTime = System.currentTimeMillis();
        long j = this.runningCompaction.endTime - this.runningCompaction.startTime;
        this.fileCompactionStat.addAndGetNumberCompaction(1);
        this.fileCompactionStat.setTransferSize(this.runningCompaction.pageSizePrepareToTransfer);
        this.fileCompactionStat.setCompactionDuration(j);
        this.fileCompactionStat.setAmplificationRatio(checkCompaction.amplificationRatio);
        LOG.info("end compaction {}, duration {}, transfer size {}, amplification after compaction {}", new Object[]{Integer.valueOf(this.runningCompaction.id), Long.valueOf(j), Long.valueOf(this.runningCompaction.pageSizePrepareToTransfer), Float.valueOf(checkCompaction.amplificationRatio)});
        if (LOG.isDebugEnabled()) {
            LOG.debug("completed compaction details {}", this.runningCompaction);
        }
        this.runningCompaction = null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed) {
                LOG.warn("FileCompaction has been closed");
                return;
            }
            this.closed = true;
            this.checkExecutor.shutdownNow();
            LOG.info("File compaction is closed.");
        }
    }
}
