/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.gemini.engine.GRegionID;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.filecache.PageBatchFlusher;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompaction;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompletableFuture;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotStat;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotCompactionImpl
implements SnapshotCompaction {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotCompactionImpl.class);
    private final GContext gContext;
    private final FileManager dfsFileManager;
    private final SnapshotManager.PendingSnapshot pendingSnapshot;
    private final float targetRatio;
    private final Map<Integer, Tuple2<AtomicInteger, AtomicLong>> sharedFiles;
    private AtomicInteger compactionTaskCounter;

    public SnapshotCompactionImpl(GContext gContext, SnapshotManager.PendingSnapshot pendingSnapshot) {
        this.gContext = gContext;
        this.dfsFileManager = gContext.getSupervisor().getDfsFileManager();
        this.pendingSnapshot = pendingSnapshot;
        this.targetRatio = gContext.getGConfiguration().getSnapshotCompactionTargetRatio();
        Preconditions.checkArgument((this.targetRatio >= 1.0f ? 1 : 0) != 0, (Object)"Snapshot compaction target ratio can't be less than 1");
        this.sharedFiles = new ConcurrentHashMap<Integer, Tuple2<AtomicInteger, AtomicLong>>();
        this.compactionTaskCounter = new AtomicInteger(0);
    }

    @Override
    public void recordSharedPage(PageAddress page) {
        int fileId = this.dfsFileManager.getSimpleFileID(page.getDfsAddress());
        Tuple2 tuple = this.sharedFiles.computeIfAbsent(fileId, para -> Tuple2.of((Object)new AtomicInteger(0), (Object)new AtomicLong(0L)));
        ((AtomicInteger)tuple.f0).addAndGet(1);
        ((AtomicLong)tuple.f1).addAndGet(page.getDataLen());
    }

    @Override
    public boolean isAsync() {
        return true;
    }

    @Override
    public void run() {
        SnapshotCheckResult checkResult = this.checkSnapshot();
        this.updateSnapshotStat(checkResult);
        if (checkResult.isNeedCompaction) {
            this.transferPages(checkResult);
        }
    }

    @VisibleForTesting
    Map<Integer, Tuple2<AtomicInteger, AtomicLong>> getSharedFiles() {
        return this.sharedFiles;
    }

    @VisibleForTesting
    SnapshotCheckResult checkSnapshot() {
        int i;
        long totalDataSize;
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        long totalFileSize = totalDataSize = snapshotStat.addAndGetTotalSize(0L);
        long incrementalDataSize = snapshotStat.addAndGetIncrementalSize(0L);
        HashSet<Integer> fileOpenedForWrite = new HashSet<Integer>();
        ArrayList<SnapshotFileInfo> fileInfoList = new ArrayList<SnapshotFileInfo>();
        for (Map.Entry<Integer, Tuple2<AtomicInteger, AtomicLong>> entry : this.sharedFiles.entrySet()) {
            int fileId = entry.getKey();
            int numPage = ((AtomicInteger)entry.getValue().f0).get();
            long sharedDataSize = ((AtomicLong)entry.getValue().f1).get();
            FileMeta fileMeta = this.dfsFileManager.getFileMeta(fileId);
            long fileSize = fileMeta.getFileSize();
            SnapshotFileInfo fileInfo = new SnapshotFileInfo(fileId, fileSize, sharedDataSize, numPage, (float)fileSize / (float)sharedDataSize);
            fileInfoList.add(fileInfo);
            totalFileSize += fileSize - sharedDataSize;
            if (fileMeta.getFileWriter() == null) continue;
            fileOpenedForWrite.add(fileId);
        }
        float currentRatio = totalDataSize == 0L ? 1.0f : (float)totalFileSize / (float)totalDataSize;
        fileInfoList.sort((f1, f2) -> Float.compare(f2.ratio, f1.ratio));
        if (LOG.isDebugEnabled()) {
            LOG.debug("current snapshot {} statistics: number of shared files {}, total file size {}, total data size {}, amplification ratio {}", new Object[]{this.pendingSnapshot.getCheckpointId(), fileInfoList.size(), totalFileSize, totalDataSize, Float.valueOf(currentRatio)});
            LOG.debug("current snapshot {} shared file details: {}", (Object)this.pendingSnapshot.getCheckpointId(), fileInfoList);
        }
        if (currentRatio <= this.targetRatio) {
            return SnapshotCheckResult.of(totalFileSize, totalDataSize, currentRatio, false, totalFileSize, incrementalDataSize, currentRatio, Collections.emptyMap());
        }
        HashMap<Integer, SnapshotFileInfo> compactionFiles = new HashMap<Integer, SnapshotFileInfo>();
        long expectedTotalFileSize = totalFileSize;
        long expectedIncrementalDataSize = incrementalDataSize;
        float expectedTotalRatio = currentRatio;
        for (i = 0; i < fileInfoList.size() && expectedTotalRatio > this.targetRatio; ++i) {
            SnapshotFileInfo info = (SnapshotFileInfo)fileInfoList.get(i);
            if (fileOpenedForWrite.contains(info.fileId)) continue;
            compactionFiles.put(info.fileId, info);
            expectedIncrementalDataSize -= info.dataSize;
            expectedTotalRatio = (float)(expectedTotalFileSize -= info.fileSize - info.dataSize) / (float)totalDataSize;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("expected snapshot {} statistics: compact {} files, total file size {}, total data size {}, amplification ratio, {}", new Object[]{this.pendingSnapshot.getCheckpointId(), i, expectedTotalFileSize, totalDataSize, Float.valueOf(expectedTotalRatio)});
            LOG.debug("expected snapshot {} files to compact: {}", (Object)this.pendingSnapshot.getCheckpointId(), compactionFiles.values());
        }
        return SnapshotCheckResult.of(totalFileSize, totalDataSize, currentRatio, true, expectedTotalFileSize, expectedIncrementalDataSize, expectedTotalRatio, compactionFiles);
    }

    private void updateSnapshotStat(SnapshotCheckResult snapshotCheckResult) {
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        snapshotStat.setTotalFileSizeBeforeCompaction(snapshotCheckResult.totalFileSize);
        snapshotStat.setIncrementalSizeBeforeCompaction(snapshotStat.addAndGetIncrementalSize(0L));
        snapshotStat.setAmplificationRatioBeforeCompaction(snapshotCheckResult.amplificationRatio);
        snapshotStat.setNeedCompaction(snapshotCheckResult.isNeedCompaction);
    }

    private void transferPages(SnapshotCheckResult checkResult) {
        SnapshotCompletableFuture completableFuture = this.pendingSnapshot.getResultFuture();
        Map<Integer, SnapshotFileInfo> compactionFiles = checkResult.compactionFiles;
        Map<String, Map<GRegionID, SnapshotManager.GRegionSnapshotMeta>> regionSnapshotMetas = this.pendingSnapshot.getGRegionSnapshotMeta();
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        this.addTask();
        snapshotStat.setCompactionStartTime(System.currentTimeMillis());
        PageBatchFlusher pageBatchFlusher = this.pendingSnapshot.getDfsPageBatchFlusher();
        if (!pageBatchFlusher.isForceFlush()) {
            PageBatchFlusher newPageBatchFlusher;
            pageBatchFlusher = newPageBatchFlusher = new PageBatchFlusher(pageBatchFlusher.getBatchNumPages(), pageBatchFlusher.getBatchDataSize(), pageBatchFlusher.isForceFlush(), pageBatchFlusher.getFileCache(), pageBatchFlusher.getEventExecutorGroup());
        }
        for (Map<GRegionID, SnapshotManager.GRegionSnapshotMeta> region : regionSnapshotMetas.values()) {
            for (SnapshotManager.GRegionSnapshotMeta meta : region.values()) {
                Iterator<PageAddress> iterator = meta.getPageIndex().pageIterator();
                while (iterator.hasNext()) {
                    PageAddress page = iterator.next();
                    int fileId = this.dfsFileManager.getSimpleFileID(page.getDfsAddress());
                    if (!compactionFiles.containsKey(fileId)) continue;
                    completableFuture.incRunningTask();
                    this.addTask();
                    snapshotStat.addAndGetIncrementalSize(page.getDataLen());
                    snapshotStat.addAndGetIncrementalPages(1);
                    pageBatchFlusher.addPage(page, meta.getGRegionContext(), (success, throwable) -> {
                        if (!success.booleanValue()) {
                            LOG.error("Write error when snapshot dfs.");
                            completableFuture.setEndSnapshot();
                            completableFuture.completeExceptionally((Throwable)throwable);
                        }
                        this.removeTask();
                        completableFuture.decRunningTask();
                    });
                }
            }
        }
        pageBatchFlusher.flush();
        this.removeTask();
    }

    private void addTask() {
        this.compactionTaskCounter.addAndGet(1);
    }

    private void removeTask() {
        if (this.compactionTaskCounter.addAndGet(-1) == 0) {
            this.pendingSnapshot.getSnapshotStat().setCompactionEndTime(System.currentTimeMillis());
        }
    }

    static class SnapshotCheckResult {
        long totalFileSize;
        long totalDataSize;
        float amplificationRatio;
        boolean isNeedCompaction;
        Map<Integer, SnapshotFileInfo> compactionFiles;
        long expectedTotalFileSize;
        long expectedIncrementalDataSize;
        float expectedAmplificationRatio;

        SnapshotCheckResult() {
        }

        static SnapshotCheckResult of(long totalSnapshotFileSize, long totalDataSize, float amplificationRatio, boolean isNeedCompaction) {
            return SnapshotCheckResult.of(totalSnapshotFileSize, totalDataSize, amplificationRatio, isNeedCompaction, totalSnapshotFileSize, 0L, amplificationRatio, Collections.emptyMap());
        }

        static SnapshotCheckResult of(long totalFileSize, long totalDataSize, float amplificationRatio, boolean isNeedCompaction, long expectedTotalFileSize, long expectedIncrementalDataSize, float expectedAmplificationRatio, Map<Integer, SnapshotFileInfo> compactionFiles) {
            SnapshotCheckResult result = new SnapshotCheckResult();
            result.totalFileSize = totalFileSize;
            result.totalDataSize = totalDataSize;
            result.amplificationRatio = amplificationRatio;
            result.isNeedCompaction = isNeedCompaction;
            result.expectedTotalFileSize = expectedTotalFileSize;
            result.expectedIncrementalDataSize = expectedIncrementalDataSize;
            result.expectedAmplificationRatio = expectedAmplificationRatio;
            result.compactionFiles = compactionFiles;
            return result;
        }
    }

    static class SnapshotFileInfo {
        int fileId;
        long fileSize;
        long dataSize;
        int numPage;
        float ratio;

        SnapshotFileInfo(int fileId, long fileSize, long dataSize, int numPage, float ratio) {
            this.fileId = fileId;
            this.fileSize = fileSize;
            this.dataSize = dataSize;
            this.numPage = numPage;
            this.ratio = ratio;
        }

        public String toString() {
            return "SnapshotFileInfo={fileId=" + this.fileId + ", fileSize=" + this.fileSize + ", dataSize=" + this.dataSize + ", numPage=" + this.numPage + ", ratio=" + this.ratio + "}";
        }
    }
}

