package org.apache.flink.runtime.state.gemini.engine.handler;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.LogicChainedPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/handler/EvictHandlerSepImpl.class */
public class EvictHandlerSepImpl implements Handler {
    private static final Logger LOG = LoggerFactory.getLogger(EvictHandlerSepImpl.class);
    private final String name;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final CacheManager cacheManager;
    private final EventExecutorGroup flushEventExecutorGroup;
    private final LogicPagePriorityPool pagePriorityPool;
    private final long curThreadMemLowMark;
    private final long curThreadMemMidMark;
    private final long curThreadMemHighMark;
    private final long maxPreparedFlushSize;
    private final int batchSortCount;
    private final Map<PageAddress, GRegion> readyToEvictDataPageMap = new LinkedHashMap();
    private final AtomicLong preparedFlushedPageSize = new AtomicLong(0);
    private final AtomicLong runningFlushedPageSize = new AtomicLong(0);
    private final AtomicLong curThreadTotalPageUsedMem = new AtomicLong(0);
    private final AtomicLong curThreadTotalFlushedSize = new AtomicLong(0);
    private final AtomicLong curThreadTotalEvictedSize = new AtomicLong(0);
    private volatile int autoFillCursor = 0;

    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/handler/EvictHandlerSepImpl$LogicPagePriorityPool.class */
    public static class LogicPagePriorityPool {
        private final long maxDataLen;
        private final int batchSortCount;
        HashMap<PageAddress, GRegion> dataMap = new HashMap<>();
        private long curDataLen = 0;
        private int curCount = 0;

        public LogicPagePriorityPool(int i, long j) {
            this.maxDataLen = j;
            this.batchSortCount = i;
        }

        public int size() {
            return this.dataMap.size();
        }

        public void add(PageAddress pageAddress, GRegion gRegion) {
            DataPage dataPage = pageAddress.getDataPage();
            if (dataPage == null) {
                return;
            }
            try {
                if (this.dataMap.put(pageAddress, gRegion) != null) {
                    throw new GeminiRuntimeException("InternalBug");
                }
                this.curDataLen += dataPage.getSize();
                if (this.curDataLen < this.maxDataLen) {
                    return;
                }
                this.curCount++;
                if (this.curCount < this.batchSortCount) {
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                    return;
                }
                List list = (List) this.dataMap.keySet().stream().map(pageAddress2 -> {
                    return new SortedEntry(pageAddress2);
                }).collect(Collectors.toList());
                Collections.sort(list, Comparator.comparingDouble((v0) -> {
                    return v0.getValue();
                }));
                for (int size = list.size() - 1; this.curDataLen > this.maxDataLen && size >= 0; size--) {
                    remove(((SortedEntry) list.get(size)).pageAddress);
                }
                this.curCount = 0;
                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
            } finally {
                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
            }
        }

        public boolean remove(PageAddress pageAddress) {
            if (this.dataMap.remove(pageAddress) == null) {
                return false;
            }
            if (pageAddress.getDataPageNoReference() == null) {
                throw new GeminiRuntimeException("InternalBug");
            }
            this.curDataLen -= r0.getSize();
            return true;
        }

        public void addDataLen(int i) {
            this.curDataLen += i;
        }

        @VisibleForTesting
        public HashMap<PageAddress, GRegion> getDataMap() {
            return this.dataMap;
        }

        @VisibleForTesting
        public long getCurDataLen() {
            return this.curDataLen;
        }

        @VisibleForTesting
        public long getSize() {
            return this.dataMap.size();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/handler/EvictHandlerSepImpl$SortedEntry.class */
    public static class SortedEntry {
        double sortedValue;
        PageAddress pageAddress;

        public SortedEntry(PageAddress pageAddress) {
            this.pageAddress = pageAddress;
            this.sortedValue = EvictHandlerSepImpl.getComparableValueFromKey(pageAddress);
        }

        public double getValue() {
            return this.sortedValue;
        }
    }

    public EvictHandlerSepImpl(String str, GContext gContext) {
        this.name = str;
        this.gContext = gContext;
        this.supervisor = gContext.getSupervisor();
        this.cacheManager = gContext.getSupervisor().getCacheManager();
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        int regionThreadNum = gContext.getGConfiguration().getRegionThreadNum();
        this.curThreadMemLowMark = this.cacheManager.getMemLowMark() / regionThreadNum;
        this.curThreadMemMidMark = this.cacheManager.getMemMidMark() / regionThreadNum;
        this.curThreadMemHighMark = this.cacheManager.getMemHighMark() / regionThreadNum;
        this.maxPreparedFlushSize = this.curThreadMemMidMark / 20;
        this.batchSortCount = gContext.getGConfiguration().getBatchSortCount();
        this.pagePriorityPool = new LogicPagePriorityPool(this.batchSortCount, (long) (this.maxPreparedFlushSize * 1.5d));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.handler.Handler
    public void handle() {
    }

    public void addEvictablePage(PageAddress pageAddress, GRegion gRegion) {
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemLowMark) {
            return;
        }
        this.pagePriorityPool.add(pageAddress, gRegion);
        doPrepareFlush();
    }

    public void removeInvalidPage(GRegion gRegion, int i, int i2, List<PageAddress> list) {
        int i3 = 0;
        int i4 = 0;
        for (PageAddress pageAddress : list) {
            i4 += this.pagePriorityPool.remove(pageAddress) ? 1 : 0;
            if (this.readyToEvictDataPageMap.remove(pageAddress) != null) {
                i3 += pageAddress.getDataLen();
                i4++;
            }
        }
        if (i4 > 0) {
            tryFillPool(gRegion, i, i2, i4);
        }
        if (i3 > 0) {
            this.preparedFlushedPageSize.addAndGet(-i3);
            if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemLowMark) {
                return;
            }
            doPrepareFlush();
        }
    }

    private int tryFillPool(GRegion gRegion, int i, int i2, int i3) {
        PageAddress pageAddress;
        if (this.pagePriorityPool.getCurDataLen() >= this.maxPreparedFlushSize) {
            return i;
        }
        LogicChainedPage[] pageIndex = gRegion.getPageStore().getPageIndex().getPageIndex();
        int i4 = 0;
        int i5 = 0;
        int i6 = i + 1;
        int i7 = 0;
        while (i7 < 3 && i5 < i3 && this.gContext.isDBNormal()) {
            if (i6 >= pageIndex.length) {
                i6 = 0;
            }
            if (i6 != i) {
                if (i6 != i2) {
                    i4++;
                    if (i4 >= pageIndex.length * 3) {
                        break;
                    }
                    LogicChainedPage logicChainedPage = pageIndex[i6];
                    if (logicChainedPage != null && logicChainedPage.getCurrentPageChainIndex() >= i7 && (pageAddress = logicChainedPage.getPageAddress(i7)) != null && pageAddress.getDataPageNoReference() != null && !this.pagePriorityPool.dataMap.containsKey(pageAddress) && !this.readyToEvictDataPageMap.containsKey(pageAddress)) {
                        this.pagePriorityPool.add(pageAddress, gRegion);
                        i5++;
                        if (i5 >= i3) {
                            break;
                        }
                    }
                    i6++;
                } else {
                    i6++;
                }
            } else {
                i7++;
                i6 = i + 1;
            }
        }
        LOG.debug("tryFillPool scanCount=" + i4 + " ,minAddPage=" + i3 + " ,addedPage=" + i5 + " ，cursor=" + i6 + " ,pages.length=" + pageIndex.length);
        if (i6 >= pageIndex.length) {
            return 0;
        }
        return i6;
    }

    private void doPrepareFlush() {
        long j = this.maxPreparedFlushSize - this.preparedFlushedPageSize.get();
        if (j > 0 && this.curThreadTotalPageUsedMem.get() >= this.curThreadMemMidMark) {
            long j2 = 0;
            Iterator<Map.Entry<PageAddress, GRegion>> it = this.pagePriorityPool.dataMap.entrySet().iterator();
            int i = 0;
            while (it.hasNext()) {
                Map.Entry<PageAddress, GRegion> next = it.next();
                DataPage dataPage = next.getKey().getDataPage();
                if (dataPage == null) {
                    throw new GeminiRuntimeException("Internal Bug");
                }
                int size = dataPage.getSize();
                it.remove();
                this.pagePriorityPool.addDataLen(-size);
                i++;
                if (this.readyToEvictDataPageMap.containsKey(next.getKey())) {
                    throw new GeminiRuntimeException("Internal Bug");
                }
                this.readyToEvictDataPageMap.put(next.getKey(), next.getValue());
                this.runningFlushedPageSize.addAndGet(size);
                this.preparedFlushedPageSize.addAndGet(size);
                this.gContext.getSupervisor().getFileCache().addPage(next.getKey(), next.getValue().getGRegionContext(), this.flushEventExecutorGroup.next(), (bool, th) -> {
                    this.runningFlushedPageSize.addAndGet(-size);
                    this.curThreadTotalFlushedSize.addAndGet(size);
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                    if (bool.booleanValue()) {
                        return;
                    }
                    LOG.error("prepare flush {} failed, {}", next.getKey(), th);
                    this.gContext.setDBInternalError(new GeminiRuntimeException("Prepare flush failed, " + th));
                });
                j2 += size;
                if (j2 >= j) {
                    break;
                }
            }
            LOG.debug("EvictHandler doFlushRegion totalPreparePool({}) readyToEvictDataPageMap({}) expectedSize ({}) flushedPageSize({}) scanPage({}) preparedFlushedPageSize({}),runningFlushedPageSize({})", new Object[]{Integer.valueOf(this.pagePriorityPool.size()), Integer.valueOf(this.readyToEvictDataPageMap.size()), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i), Long.valueOf(this.preparedFlushedPageSize.get()), Long.valueOf(this.runningFlushedPageSize.get())});
        }
    }

    public void addPageUsedMemory(GRegion gRegion, int i, boolean z) {
        this.curThreadTotalPageUsedMem.addAndGet(i);
        if (z) {
            doEvict(gRegion, i);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:48:0x02e4, code lost:
    
        org.apache.flink.runtime.state.gemini.engine.handler.EvictHandlerSepImpl.LOG.debug("EvictHandler doEvict totalPreparePool({}) readyToEvictDataPageMap({}) expectedSize ({}) evictedSize({}) scanPage({}) readyToEvictDataPageMap({}), preparedFlushedPageSize({}),runningFlushedPageSize({})", new java.lang.Object[]{java.lang.Integer.valueOf(r10.pagePriorityPool.size()), java.lang.Integer.valueOf(r10.readyToEvictDataPageMap.size()), java.lang.Integer.valueOf(r12), java.lang.Integer.valueOf(r15), java.lang.Integer.valueOf(r16), java.lang.Integer.valueOf(r10.readyToEvictDataPageMap.size()), java.lang.Long.valueOf(r10.preparedFlushedPageSize.get()), java.lang.Long.valueOf(r10.runningFlushedPageSize.get())});
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x0352, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void doEvict(org.apache.flink.runtime.state.gemini.engine.GRegion r11, int r12) {
        /*
            Method dump skipped, instructions count: 851
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.state.gemini.engine.handler.EvictHandlerSepImpl.doEvict(org.apache.flink.runtime.state.gemini.engine.GRegion, int):void");
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("name", this.name).add("curThreadMemLowMark", this.curThreadMemLowMark).add("curThreadMemMidMark", this.curThreadMemMidMark).add("curThreadMemHighMark", this.curThreadMemHighMark).add("curThreadTotalPageUsedMem", this.curThreadTotalPageUsedMem).add("maxPreparedFlushSize", this.maxPreparedFlushSize).add("runningFlushedPageSize", this.runningFlushedPageSize).add("preparedFlushedPageSize", this.preparedFlushedPageSize).add("readyToEvictDataPageMapCount", this.readyToEvictDataPageMap.size()).add("logicPagePriorityPoolSize", this.pagePriorityPool.size()).add("logicPagePriorityPoolDataLen", this.pagePriorityPool.curDataLen).add("curThreadTotalEvictedSize", this.curThreadTotalEvictedSize).add("curThreadTotalFlushedSize", this.curThreadTotalFlushedSize).toString();
    }

    public static double getComparableValueFromKey(PageAddress pageAddress) {
        if (pageAddress.getDataPageNoReference() == null) {
            throw new GeminiRuntimeException("InternalBug");
        }
        long requestCount = pageAddress.getRequestCount();
        return (requestCount == 0 ? 1.0d : requestCount) / r0.getCompactionCount();
    }
}
