/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.handler;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.handler.Handler;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.LogicChainedPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EvictHandlerSepImpl
implements Handler {
    private static final Logger LOG = LoggerFactory.getLogger(EvictHandlerSepImpl.class);
    private final String name;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final CacheManager cacheManager;
    private final EventExecutorGroup flushEventExecutorGroup;
    private final Map<PageAddress, GRegion> readyToEvictDataPageMap = new LinkedHashMap<PageAddress, GRegion>();
    private final AtomicLong preparedFlushedPageSize = new AtomicLong(0L);
    private final AtomicLong runningFlushedPageSize = new AtomicLong(0L);
    private final LogicPagePriorityPool pagePriorityPool;
    private final AtomicLong curThreadTotalPageUsedMem = new AtomicLong(0L);
    private final AtomicLong curThreadTotalFlushedSize = new AtomicLong(0L);
    private final AtomicLong curThreadTotalEvictedSize = new AtomicLong(0L);
    private final long curThreadMemLowMark;
    private final long curThreadMemMidMark;
    private final long curThreadMemHighMark;
    private final long maxPreparedFlushSize;
    private final int batchSortCount;
    private volatile int autoFillCursor = 0;

    public EvictHandlerSepImpl(String name, GContext gContext) {
        this.name = name;
        this.gContext = gContext;
        this.supervisor = gContext.getSupervisor();
        this.cacheManager = gContext.getSupervisor().getCacheManager();
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        int totalThread = gContext.getGConfiguration().getRegionThreadNum();
        this.curThreadMemLowMark = this.cacheManager.getMemLowMark() / (long)totalThread;
        this.curThreadMemMidMark = this.cacheManager.getMemMidMark() / (long)totalThread;
        this.curThreadMemHighMark = this.cacheManager.getMemHighMark() / (long)totalThread;
        this.maxPreparedFlushSize = this.curThreadMemMidMark / 20L;
        this.batchSortCount = gContext.getGConfiguration().getBatchSortCount();
        this.pagePriorityPool = new LogicPagePriorityPool(this.batchSortCount, (long)((double)this.maxPreparedFlushSize * 1.5));
    }

    @Override
    public void handle() {
    }

    public void addEvictablePage(PageAddress pageAddress, GRegion gRegion) {
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemLowMark) {
            return;
        }
        this.pagePriorityPool.add(pageAddress, gRegion);
        this.doPrepareFlush();
    }

    public void removeInvalidPage(GRegion gRegion, int curIndex, int relatedIndex, List<PageAddress> pageAddressList) {
        int totalRemoveReadToEvictSize = 0;
        int totalRemoveCount = 0;
        for (PageAddress pageAddress : pageAddressList) {
            totalRemoveCount += this.pagePriorityPool.remove(pageAddress) ? 1 : 0;
            if (this.readyToEvictDataPageMap.remove(pageAddress) == null) continue;
            totalRemoveReadToEvictSize += pageAddress.getDataLen();
            ++totalRemoveCount;
        }
        if (totalRemoveCount > 0) {
            this.tryFillPool(gRegion, curIndex, relatedIndex, totalRemoveCount);
        }
        if (totalRemoveReadToEvictSize > 0) {
            this.preparedFlushedPageSize.addAndGet(-totalRemoveReadToEvictSize);
            if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemLowMark) {
                return;
            }
            this.doPrepareFlush();
        }
    }

    private int tryFillPool(GRegion gRegion, int curIndex, int relatedIndex, int minAddPage) {
        if (this.pagePriorityPool.getCurDataLen() >= this.maxPreparedFlushSize) {
            return curIndex;
        }
        LogicChainedPage[] pages = gRegion.getPageStore().getPageIndex().getPageIndex();
        int scanCount = 0;
        int addedPage = 0;
        int cursor = curIndex + 1;
        int indexDeep = 0;
        while (indexDeep < 3 && addedPage < minAddPage && this.gContext.isDBNormal()) {
            PageAddress pageAddress;
            if (cursor >= pages.length) {
                cursor = 0;
            }
            if (cursor == curIndex) {
                ++indexDeep;
                cursor = curIndex + 1;
                continue;
            }
            if (cursor == relatedIndex) {
                ++cursor;
                continue;
            }
            if (++scanCount >= pages.length * 3) break;
            LogicChainedPage logicChainedPage = pages[cursor];
            if (logicChainedPage != null && logicChainedPage.getCurrentPageChainIndex() >= indexDeep && (pageAddress = logicChainedPage.getPageAddress(indexDeep)) != null && pageAddress.getDataPageNoReference() != null && !this.pagePriorityPool.dataMap.containsKey(pageAddress) && !this.readyToEvictDataPageMap.containsKey(pageAddress)) {
                this.pagePriorityPool.add(pageAddress, gRegion);
                if (++addedPage >= minAddPage) break;
            }
            ++cursor;
        }
        LOG.debug("tryFillPool scanCount=" + scanCount + " ,minAddPage=" + minAddPage + " ,addedPage=" + addedPage + " \uff0ccursor=" + cursor + " ,pages.length=" + pages.length);
        return cursor >= pages.length ? 0 : cursor;
    }

    private void doPrepareFlush() {
        long needPrepareFlush = this.maxPreparedFlushSize - this.preparedFlushedPageSize.get();
        if (needPrepareFlush <= 0L) {
            return;
        }
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemMidMark) {
            return;
        }
        long flushedSize = 0L;
        Iterator<Map.Entry<PageAddress, GRegion>> pagesIterator = this.pagePriorityPool.dataMap.entrySet().iterator();
        int scanPage = 0;
        while (pagesIterator.hasNext()) {
            Map.Entry<PageAddress, GRegion> entry = pagesIterator.next();
            DataPage dataPage = entry.getKey().getDataPage();
            if (dataPage == null) {
                throw new GeminiRuntimeException("Internal Bug");
            }
            int dataPageSize = dataPage.getSize();
            pagesIterator.remove();
            this.pagePriorityPool.addDataLen(-dataPageSize);
            ++scanPage;
            if (this.readyToEvictDataPageMap.containsKey(entry.getKey())) {
                throw new GeminiRuntimeException("Internal Bug");
            }
            this.readyToEvictDataPageMap.put(entry.getKey(), entry.getValue());
            this.runningFlushedPageSize.addAndGet(dataPageSize);
            this.preparedFlushedPageSize.addAndGet(dataPageSize);
            EventExecutor flushEventExecutor = this.flushEventExecutorGroup.next();
            this.gContext.getSupervisor().getFileCache().addPage(entry.getKey(), entry.getValue().getGRegionContext(), flushEventExecutor, (success, throwable) -> {
                this.runningFlushedPageSize.addAndGet(-dataPageSize);
                this.curThreadTotalFlushedSize.addAndGet(dataPageSize);
                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                if (!success.booleanValue()) {
                    LOG.error("prepare flush {} failed, {}", entry.getKey(), throwable);
                    this.gContext.setDBInternalError(new GeminiRuntimeException("Prepare flush failed, " + throwable));
                }
            });
            if ((flushedSize += (long)dataPageSize) < needPrepareFlush) continue;
            break;
        }
        LOG.debug("EvictHandler doFlushRegion totalPreparePool({}) readyToEvictDataPageMap({}) expectedSize ({}) flushedPageSize({}) scanPage({}) preparedFlushedPageSize({}),runningFlushedPageSize({})", new Object[]{this.pagePriorityPool.size(), this.readyToEvictDataPageMap.size(), needPrepareFlush, flushedSize, scanPage, this.preparedFlushedPageSize.get(), this.runningFlushedPageSize.get()});
    }

    public void addPageUsedMemory(GRegion gRegion, int logicPageSize, boolean needEvict) {
        this.curThreadTotalPageUsedMem.addAndGet(logicPageSize);
        if (needEvict) {
            this.doEvict(gRegion, logicPageSize);
        }
    }

    private void doEvict(GRegion gRegion, int expectedSize) {
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemHighMark) {
            return;
        }
        if (this.readyToEvictDataPageMap.size() == 0 && this.preparedFlushedPageSize.get() == 0L) {
            this.autoFillCursor = this.tryFillPool(gRegion, this.autoFillCursor, this.autoFillCursor, this.batchSortCount);
            return;
        }
        long startTime = System.currentTimeMillis();
        int evictedSize = 0;
        int totalScanPageCount = 0;
        int totalEvictPageCount = 0;
        while (evictedSize < expectedSize && this.gContext.isDBNormal()) {
            long curRunningSize = this.runningFlushedPageSize.get();
            Iterator<Map.Entry<PageAddress, GRegion>> readyIterator = this.readyToEvictDataPageMap.entrySet().iterator();
            while (readyIterator.hasNext()) {
                Map.Entry<PageAddress, GRegion> entry = readyIterator.next();
                PageAddress pageAddress = entry.getKey();
                ++totalScanPageCount;
                DataPage dataPage = pageAddress.getDataPageNoReference();
                if (dataPage == null) {
                    throw new GeminiRuntimeException("Internal Bug");
                }
                if (!this.gContext.getSupervisor().getFileCache().isCached(pageAddress)) continue;
                evictedSize += dataPage.getSize();
                ++totalEvictPageCount;
                pageAddress.setDataPage(null);
                if (dataPage.getGBinaryHashMap().getGByteBuffer().getCnt() != 1) {
                    this.gContext.getSupervisor().getDiscardOrEvictPageReleaseManager().addMonitorPageStillHaveReference(dataPage.getGBinaryHashMap().getGByteBuffer(), ReferenceCount.ReleaseType.Discard, pageAddress);
                }
                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Discard);
                entry.getValue().getGRegionContext().getPageStoreStats().addPageUsedMemory(entry.getValue(), -dataPage.getSize(), false);
                readyIterator.remove();
                this.curThreadTotalEvictedSize.addAndGet(dataPage.getSize());
                this.cacheManager.getCacheStats().addPageCacheEvictSize(dataPage.getSize());
                this.preparedFlushedPageSize.addAndGet(-dataPage.getSize());
                if (evictedSize < expectedSize) continue;
                break;
            }
            long nowTime = System.currentTimeMillis();
            if (evictedSize < expectedSize) {
                this.cacheManager.getCacheStats().addEvictBlock(1);
                if (curRunningSize >= (long)(expectedSize - evictedSize)) {
                    while (curRunningSize - this.runningFlushedPageSize.get() < (long)(expectedSize - evictedSize) && this.gContext.isDBNormal()) {
                        this.autoFillCursor = this.tryFillPool(gRegion, this.autoFillCursor, this.autoFillCursor, this.batchSortCount);
                        LOG.info("EvictHandler doEvict blocking {}ms ,have run ({})ms,expectedSize({}), evictedSize({}),scanPageCount({}) readyToEvictDataPageMap({}), beforeRunning({}), preparedFlushedPageSize({}), runningFlushedPageSize({})", new Object[]{System.currentTimeMillis() - nowTime, nowTime - startTime, expectedSize, evictedSize, totalScanPageCount, this.readyToEvictDataPageMap.size(), curRunningSize, this.preparedFlushedPageSize.get(), this.runningFlushedPageSize.get()});
                    }
                    continue;
                }
                LOG.info("EvictHandler doEvict NOT WORK,have run ({})ms,expectedSize({}), evictedSize({}),scanPageCount({}) readyToEvictDataPageMap({}), preparedFlushedPageSize({}), runningFlushedPageSize({})", new Object[]{nowTime - startTime, expectedSize, evictedSize, totalScanPageCount, this.readyToEvictDataPageMap.size(), this.preparedFlushedPageSize.get(), this.runningFlushedPageSize.get()});
                this.autoFillCursor = this.tryFillPool(gRegion, this.autoFillCursor, this.autoFillCursor, this.batchSortCount);
                this.doPrepareFlush();
                continue;
            }
            this.autoFillCursor = this.tryFillPool(gRegion, this.autoFillCursor, this.autoFillCursor, totalEvictPageCount);
            break;
        }
        LOG.debug("EvictHandler doEvict totalPreparePool({}) readyToEvictDataPageMap({}) expectedSize ({}) evictedSize({}) scanPage({}) readyToEvictDataPageMap({}), preparedFlushedPageSize({}),runningFlushedPageSize({})", new Object[]{this.pagePriorityPool.size(), this.readyToEvictDataPageMap.size(), expectedSize, evictedSize, totalScanPageCount, this.readyToEvictDataPageMap.size(), this.preparedFlushedPageSize.get(), this.runningFlushedPageSize.get()});
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("name", (Object)this.name).add("curThreadMemLowMark", this.curThreadMemLowMark).add("curThreadMemMidMark", this.curThreadMemMidMark).add("curThreadMemHighMark", this.curThreadMemHighMark).add("curThreadTotalPageUsedMem", (Object)this.curThreadTotalPageUsedMem).add("maxPreparedFlushSize", this.maxPreparedFlushSize).add("runningFlushedPageSize", (Object)this.runningFlushedPageSize).add("preparedFlushedPageSize", (Object)this.preparedFlushedPageSize).add("readyToEvictDataPageMapCount", this.readyToEvictDataPageMap.size()).add("logicPagePriorityPoolSize", this.pagePriorityPool.size()).add("logicPagePriorityPoolDataLen", this.pagePriorityPool.curDataLen).add("curThreadTotalEvictedSize", (Object)this.curThreadTotalEvictedSize).add("curThreadTotalFlushedSize", (Object)this.curThreadTotalFlushedSize).toString();
    }

    public static double getComparableValueFromKey(PageAddress pageAddress) {
        DataPage dataPage = pageAddress.getDataPageNoReference();
        if (dataPage == null) {
            throw new GeminiRuntimeException("InternalBug");
        }
        long requestCount = pageAddress.getRequestCount();
        double requestCountDouble = requestCount == 0L ? 1.0 : (double)requestCount;
        return requestCountDouble / (double)dataPage.getCompactionCount();
    }

    public static class SortedEntry {
        double sortedValue;
        PageAddress pageAddress;

        public SortedEntry(PageAddress pageAddress) {
            this.pageAddress = pageAddress;
            this.sortedValue = EvictHandlerSepImpl.getComparableValueFromKey(pageAddress);
        }

        public double getValue() {
            return this.sortedValue;
        }
    }

    public static class LogicPagePriorityPool {
        HashMap<PageAddress, GRegion> dataMap = new HashMap();
        private long curDataLen = 0L;
        private final long maxDataLen;
        private int curCount = 0;
        private final int batchSortCount;

        public LogicPagePriorityPool(int batchSortCount, long maxDataLen) {
            this.maxDataLen = maxDataLen;
            this.batchSortCount = batchSortCount;
        }

        public int size() {
            return this.dataMap.size();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void add(PageAddress pageAddress, GRegion gRegion) {
            block10: {
                DataPage dataPage = pageAddress.getDataPage();
                if (dataPage == null) {
                    return;
                }
                try {
                    if (this.dataMap.put(pageAddress, gRegion) == null) {
                        this.curDataLen += (long)dataPage.getSize();
                        if (this.curDataLen < this.maxDataLen) {
                            return;
                        }
                        ++this.curCount;
                        if (this.curCount < this.batchSortCount) {
                            return;
                        }
                        List dataList = this.dataMap.keySet().stream().map(k -> new SortedEntry((PageAddress)k)).collect(Collectors.toList());
                        Collections.sort(dataList, Comparator.comparingDouble(SortedEntry::getValue));
                        for (int index = dataList.size() - 1; this.curDataLen > this.maxDataLen && index >= 0; --index) {
                            PageAddress lastPageAddress = ((SortedEntry)dataList.get((int)index)).pageAddress;
                            this.remove(lastPageAddress);
                        }
                        this.curCount = 0;
                        break block10;
                    }
                    throw new GeminiRuntimeException("InternalBug");
                }
                finally {
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
            }
        }

        public boolean remove(PageAddress pageAddress) {
            if (this.dataMap.remove(pageAddress) != null) {
                DataPage dataPage = pageAddress.getDataPageNoReference();
                if (dataPage == null) {
                    throw new GeminiRuntimeException("InternalBug");
                }
                this.curDataLen -= (long)dataPage.getSize();
                return true;
            }
            return false;
        }

        public void addDataLen(int dataLen) {
            this.curDataLen += (long)dataLen;
        }

        @VisibleForTesting
        public HashMap<PageAddress, GRegion> getDataMap() {
            return this.dataMap;
        }

        @VisibleForTesting
        public long getCurDataLen() {
            return this.curDataLen;
        }

        @VisibleForTesting
        public long getSize() {
            return this.dataMap.size();
        }
    }
}

