package org.apache.flink.runtime.state.gemini.engine.snapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.gemini.engine.GRegionID;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.filecache.PageBatchFlusher;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/snapshot/SnapshotCompactionImpl.class */
public class SnapshotCompactionImpl implements SnapshotCompaction {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotCompactionImpl.class);
    private final GContext gContext;
    private final FileManager dfsFileManager;
    private final SnapshotManager.PendingSnapshot pendingSnapshot;
    private final float targetRatio;
    private final Map<Integer, Tuple2<AtomicInteger, AtomicLong>> sharedFiles;
    private AtomicInteger compactionTaskCounter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/snapshot/SnapshotCompactionImpl$SnapshotCheckResult.class */
    public static class SnapshotCheckResult {
        long totalFileSize;
        long totalDataSize;
        float amplificationRatio;
        boolean isNeedCompaction;
        Map<Integer, SnapshotFileInfo> compactionFiles;
        long expectedTotalFileSize;
        long expectedIncrementalDataSize;
        float expectedAmplificationRatio;

        SnapshotCheckResult() {
        }

        static SnapshotCheckResult of(long j, long j2, float f, boolean z) {
            return of(j, j2, f, z, j, 0L, f, Collections.emptyMap());
        }

        static SnapshotCheckResult of(long j, long j2, float f, boolean z, long j3, long j4, float f2, Map<Integer, SnapshotFileInfo> map) {
            SnapshotCheckResult snapshotCheckResult = new SnapshotCheckResult();
            snapshotCheckResult.totalFileSize = j;
            snapshotCheckResult.totalDataSize = j2;
            snapshotCheckResult.amplificationRatio = f;
            snapshotCheckResult.isNeedCompaction = z;
            snapshotCheckResult.expectedTotalFileSize = j3;
            snapshotCheckResult.expectedIncrementalDataSize = j4;
            snapshotCheckResult.expectedAmplificationRatio = f2;
            snapshotCheckResult.compactionFiles = map;
            return snapshotCheckResult;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/snapshot/SnapshotCompactionImpl$SnapshotFileInfo.class */
    public static class SnapshotFileInfo {
        int fileId;
        long fileSize;
        long dataSize;
        int numPage;
        float ratio;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SnapshotFileInfo(int i, long j, long j2, int i2, float f) {
            this.fileId = i;
            this.fileSize = j;
            this.dataSize = j2;
            this.numPage = i2;
            this.ratio = f;
        }

        public String toString() {
            return "SnapshotFileInfo={fileId=" + this.fileId + ", fileSize=" + this.fileSize + ", dataSize=" + this.dataSize + ", numPage=" + this.numPage + ", ratio=" + this.ratio + "}";
        }
    }

    public SnapshotCompactionImpl(GContext gContext, SnapshotManager.PendingSnapshot pendingSnapshot) {
        this.gContext = gContext;
        this.dfsFileManager = gContext.getSupervisor().getDfsFileManager();
        this.pendingSnapshot = pendingSnapshot;
        this.targetRatio = gContext.getGConfiguration().getSnapshotCompactionTargetRatio();
        Preconditions.checkArgument(this.targetRatio >= 1.0f, "Snapshot compaction target ratio can't be less than 1");
        this.sharedFiles = new ConcurrentHashMap();
        this.compactionTaskCounter = new AtomicInteger(0);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompaction
    public void recordSharedPage(PageAddress pageAddress) {
        Tuple2<AtomicInteger, AtomicLong> computeIfAbsent = this.sharedFiles.computeIfAbsent(Integer.valueOf(this.dfsFileManager.getSimpleFileID(pageAddress.getDfsAddress())), num -> {
            return Tuple2.of(new AtomicInteger(0), new AtomicLong(0L));
        });
        ((AtomicInteger) computeIfAbsent.f0).addAndGet(1);
        ((AtomicLong) computeIfAbsent.f1).addAndGet(pageAddress.getDataLen());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotStage
    public boolean isAsync() {
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        SnapshotCheckResult checkSnapshot = checkSnapshot();
        updateSnapshotStat(checkSnapshot);
        if (checkSnapshot.isNeedCompaction) {
            transferPages(checkSnapshot);
        }
    }

    @VisibleForTesting
    Map<Integer, Tuple2<AtomicInteger, AtomicLong>> getSharedFiles() {
        return this.sharedFiles;
    }

    @VisibleForTesting
    SnapshotCheckResult checkSnapshot() {
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        long addAndGetTotalSize = snapshotStat.addAndGetTotalSize(0L);
        long j = addAndGetTotalSize;
        long addAndGetIncrementalSize = snapshotStat.addAndGetIncrementalSize(0L);
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, Tuple2<AtomicInteger, AtomicLong>> entry : this.sharedFiles.entrySet()) {
            int intValue = entry.getKey().intValue();
            int i = ((AtomicInteger) entry.getValue().f0).get();
            long j2 = ((AtomicLong) entry.getValue().f1).get();
            FileMeta fileMeta = this.dfsFileManager.getFileMeta(intValue);
            long fileSize = fileMeta.getFileSize();
            arrayList.add(new SnapshotFileInfo(intValue, fileSize, j2, i, ((float) fileSize) / ((float) j2)));
            j += fileSize - j2;
            if (fileMeta.getFileWriter() != null) {
                hashSet.add(Integer.valueOf(intValue));
            }
        }
        float f = addAndGetTotalSize == 0 ? 1.0f : ((float) j) / ((float) addAndGetTotalSize);
        arrayList.sort((snapshotFileInfo, snapshotFileInfo2) -> {
            return Float.compare(snapshotFileInfo2.ratio, snapshotFileInfo.ratio);
        });
        if (LOG.isDebugEnabled()) {
            LOG.debug("current snapshot {} statistics: number of shared files {}, total file size {}, total data size {}, amplification ratio {}", new Object[]{Long.valueOf(this.pendingSnapshot.getCheckpointId()), Integer.valueOf(arrayList.size()), Long.valueOf(j), Long.valueOf(addAndGetTotalSize), Float.valueOf(f)});
            LOG.debug("current snapshot {} shared file details: {}", Long.valueOf(this.pendingSnapshot.getCheckpointId()), arrayList);
        }
        if (f <= this.targetRatio) {
            return SnapshotCheckResult.of(j, addAndGetTotalSize, f, false, j, addAndGetIncrementalSize, f, Collections.emptyMap());
        }
        HashMap hashMap = new HashMap();
        long j3 = j;
        long j4 = addAndGetIncrementalSize;
        float f2 = f;
        int i2 = 0;
        while (i2 < arrayList.size() && f2 > this.targetRatio) {
            SnapshotFileInfo snapshotFileInfo3 = (SnapshotFileInfo) arrayList.get(i2);
            if (!hashSet.contains(Integer.valueOf(snapshotFileInfo3.fileId))) {
                hashMap.put(Integer.valueOf(snapshotFileInfo3.fileId), snapshotFileInfo3);
                j3 -= snapshotFileInfo3.fileSize - snapshotFileInfo3.dataSize;
                j4 -= snapshotFileInfo3.dataSize;
                f2 = ((float) j3) / ((float) addAndGetTotalSize);
            }
            i2++;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("expected snapshot {} statistics: compact {} files, total file size {}, total data size {}, amplification ratio, {}", new Object[]{Long.valueOf(this.pendingSnapshot.getCheckpointId()), Integer.valueOf(i2), Long.valueOf(j3), Long.valueOf(addAndGetTotalSize), Float.valueOf(f2)});
            LOG.debug("expected snapshot {} files to compact: {}", Long.valueOf(this.pendingSnapshot.getCheckpointId()), hashMap.values());
        }
        return SnapshotCheckResult.of(j, addAndGetTotalSize, f, true, j3, j4, f2, hashMap);
    }

    private void updateSnapshotStat(SnapshotCheckResult snapshotCheckResult) {
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        snapshotStat.setTotalFileSizeBeforeCompaction(snapshotCheckResult.totalFileSize);
        snapshotStat.setIncrementalSizeBeforeCompaction(snapshotStat.addAndGetIncrementalSize(0L));
        snapshotStat.setAmplificationRatioBeforeCompaction(snapshotCheckResult.amplificationRatio);
        snapshotStat.setNeedCompaction(snapshotCheckResult.isNeedCompaction);
    }

    private void transferPages(SnapshotCheckResult snapshotCheckResult) {
        SnapshotCompletableFuture resultFuture = this.pendingSnapshot.getResultFuture();
        Map<Integer, SnapshotFileInfo> map = snapshotCheckResult.compactionFiles;
        Map<String, Map<GRegionID, SnapshotManager.GRegionSnapshotMeta>> gRegionSnapshotMeta = this.pendingSnapshot.getGRegionSnapshotMeta();
        SnapshotStat snapshotStat = this.pendingSnapshot.getSnapshotStat();
        addTask();
        snapshotStat.setCompactionStartTime(System.currentTimeMillis());
        PageBatchFlusher dfsPageBatchFlusher = this.pendingSnapshot.getDfsPageBatchFlusher();
        if (!dfsPageBatchFlusher.isForceFlush()) {
            dfsPageBatchFlusher = new PageBatchFlusher(dfsPageBatchFlusher.getBatchNumPages(), dfsPageBatchFlusher.getBatchDataSize(), dfsPageBatchFlusher.isForceFlush(), dfsPageBatchFlusher.getFileCache(), dfsPageBatchFlusher.getEventExecutorGroup());
        }
        Iterator<Map<GRegionID, SnapshotManager.GRegionSnapshotMeta>> it = gRegionSnapshotMeta.values().iterator();
        while (it.hasNext()) {
            for (SnapshotManager.GRegionSnapshotMeta gRegionSnapshotMeta2 : it.next().values()) {
                Iterator<PageAddress> pageIterator = gRegionSnapshotMeta2.getPageIndex().pageIterator();
                while (pageIterator.hasNext()) {
                    PageAddress next = pageIterator.next();
                    if (map.containsKey(Integer.valueOf(this.dfsFileManager.getSimpleFileID(next.getDfsAddress())))) {
                        resultFuture.incRunningTask();
                        addTask();
                        snapshotStat.addAndGetIncrementalSize(next.getDataLen());
                        snapshotStat.addAndGetIncrementalPages(1);
                        dfsPageBatchFlusher.addPage(next, gRegionSnapshotMeta2.getGRegionContext(), (bool, th) -> {
                            if (!bool.booleanValue()) {
                                LOG.error("Write error when snapshot dfs.");
                                resultFuture.setEndSnapshot();
                                resultFuture.completeExceptionally(th);
                            }
                            removeTask();
                            resultFuture.decRunningTask();
                        });
                    }
                }
            }
        }
        dfsPageBatchFlusher.flush();
        removeTask();
    }

    private void addTask() {
        this.compactionTaskCounter.addAndGet(1);
    }

    private void removeTask() {
        if (this.compactionTaskCounter.addAndGet(-1) == 0) {
            this.pendingSnapshot.getSnapshotStat().setCompactionEndTime(System.currentTimeMillis());
        }
    }
}
