/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressSingleImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePoolSampleImpl;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EvictHandlerSepImpl {
    private static final Logger LOG = LoggerFactory.getLogger(EvictHandlerSepImpl.class);
    private final String name;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final CacheManager cacheManager;
    private final EventExecutorGroup flushEventExecutorGroup;
    private final Map<PageAddress, GRegion> readyToEvictDataPageMap = new LinkedHashMap<PageAddress, GRegion>();
    private final AtomicLong preparedFlushedPageSize = new AtomicLong(0L);
    private final AtomicLong flushingPageSize = new AtomicLong(0L);
    private final EvictablePagePool pagePriorityPool;
    private final Map<GRegion, GRegion> allRegions = new ConcurrentHashMap<GRegion, GRegion>();
    private final AtomicLong curThreadTotalPageUsedMem = new AtomicLong(0L);
    private final AtomicLong curThreadTotalFlushedSize = new AtomicLong(0L);
    private final AtomicLong curThreadTotalEvictedSize = new AtomicLong(0L);
    private final long curThreadMemLowMark;
    private final long curThreadMemMidMark;
    private final long curThreadMemHighMark;
    private final int maxPreparedFlushSize;
    private volatile int dynamicPreparedFlushSize;
    private final long intervalShrinkDynamicPreparedFlushSize = 300000L;
    private volatile long lastTimeChangeMaxFlushSize = System.currentTimeMillis();

    public EvictHandlerSepImpl(String name, GContext gContext) {
        this.name = name;
        this.gContext = gContext;
        this.supervisor = gContext.getSupervisor();
        this.cacheManager = gContext.getSupervisor().getCacheManager();
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        int totalThread = gContext.getGConfiguration().getRegionThreadNum();
        this.curThreadMemLowMark = this.cacheManager.getMemLowMark() / (long)totalThread;
        this.curThreadMemMidMark = this.cacheManager.getMemMidMark() / (long)totalThread;
        this.curThreadMemHighMark = this.cacheManager.getMemHighMark() / (long)totalThread;
        this.dynamicPreparedFlushSize = this.maxPreparedFlushSize = gContext.getGConfiguration().getMaxPreparedFlushSize();
        this.pagePriorityPool = new EvictablePagePoolSampleImpl(this, gContext, this.cacheManager);
        LOG.info("EvictHandlerSepImpl {} curThreadMemLowMark={},curThreadMemMidMark={},curThreadMemHighMark={},maxPreparedFlushSize={},pagePriorityPool={}", new Object[]{name, this.curThreadMemLowMark, this.curThreadMemMidMark, this.curThreadMemHighMark, this.maxPreparedFlushSize, this.pagePriorityPool});
    }

    public int removeInvalidPage(PageAddress pageAddress) {
        Iterator<PageAddress> pageAddressIterator = pageAddress.pageIterator();
        int delete = 0;
        delete += this.pagePriorityPool.remove(pageAddress) ? 1 : 0;
        while (pageAddressIterator.hasNext()) {
            PageAddress subPageAddress = pageAddressIterator.next();
            delete += this.pagePriorityPool.remove(subPageAddress) ? 1 : 0;
            if (this.readyToEvictDataPageMap.remove(subPageAddress) == null) continue;
            this.preparedFlushedPageSize.addAndGet(-subPageAddress.getDataLen());
            ++delete;
        }
        return delete;
    }

    public void addPage(PageAddress pageAddress, GRegion gRegion) {
        this.pagePriorityPool.add(pageAddress, gRegion);
    }

    void tryPrepareFlush(int minSize) {
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemMidMark) {
            if (this.curThreadTotalPageUsedMem.get() >= this.curThreadMemLowMark - (long)this.maxPreparedFlushSize) {
                this.tryFillPool();
            }
            return;
        }
        long currentTime = System.currentTimeMillis();
        if (this.dynamicPreparedFlushSize < minSize) {
            this.dynamicPreparedFlushSize = this.maxPreparedFlushSize + minSize;
            this.lastTimeChangeMaxFlushSize = currentTime;
        } else if (this.dynamicPreparedFlushSize > this.maxPreparedFlushSize && this.maxPreparedFlushSize > minSize && this.changeMaxFlushSizeTimeOut(currentTime)) {
            this.dynamicPreparedFlushSize = this.maxPreparedFlushSize + (this.dynamicPreparedFlushSize - this.maxPreparedFlushSize) * 3 / 4;
            this.lastTimeChangeMaxFlushSize = currentTime;
        }
        this.doPrepareFlush(minSize, (int)((long)this.dynamicPreparedFlushSize - this.preparedFlushedPageSize.get()));
    }

    boolean changeMaxFlushSizeTimeOut(long currentTime) {
        return currentTime - this.lastTimeChangeMaxFlushSize >= 300000L;
    }

    boolean isPageAlreadyInEvict(PageAddress oriPageAddress, PageAddress subPageAddress) {
        return this.pagePriorityPool.containsPage(oriPageAddress) || this.readyToEvictDataPageMap.containsKey(subPageAddress);
    }

    @VisibleForTesting
    void doPrepareFlush(int minSize, int bestFlushSize) {
        if (bestFlushSize <= 0) {
            return;
        }
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemMidMark) {
            return;
        }
        long flushedSize = 0L;
        int scanPage = 0;
        while (flushedSize < (long)bestFlushSize && this.gContext.isDBNormal()) {
            Iterator<EvictablePagePool.SortedEntry> dataListIterator = this.getSortedList().iterator();
            while (dataListIterator.hasNext() && this.gContext.isDBNormal()) {
                EvictablePagePool.SortedEntry sortedEntry = dataListIterator.next();
                PageAddress pageAddress = sortedEntry.pageAddress;
                GRegion gRegion = sortedEntry.region;
                if (!this.pagePriorityPool.containsPage(pageAddress)) {
                    dataListIterator.remove();
                    continue;
                }
                if (!pageAddress.isPageValid() || this.readyToEvictDataPageMap.containsKey(pageAddress)) {
                    dataListIterator.remove();
                    this.pagePriorityPool.remove(pageAddress);
                    continue;
                }
                PageAddress finalPageAddress = null;
                boolean needRemoveFromPool = false;
                if (pageAddress instanceof PageAddressSingleImpl) {
                    finalPageAddress = pageAddress;
                    needRemoveFromPool = true;
                } else if (pageAddress instanceof PageAddressCompositeImpl) {
                    PageAddress[] subPages = ((PageAddressCompositeImpl)pageAddress).getSubPageAddress();
                    long tickTime = this.cacheManager.getCurrentTickTime();
                    List dataList = Arrays.stream(subPages).map(pa -> {
                        DataPage dataPage = pa.getDataPageNoReference();
                        if (dataPage != null && pa.isPageValid() && !this.readyToEvictDataPageMap.containsKey(pa)) {
                            return new EvictablePagePool.SortedEntry((PageAddress)pa, gRegion, dataPage.score(tickTime));
                        }
                        return null;
                    }).filter(Objects::nonNull).sorted(Comparator.comparingDouble(EvictablePagePool.SortedEntry::getScore)).collect(Collectors.toList());
                    boolean flushEnough = false;
                    for (EvictablePagePool.SortedEntry entry : dataList) {
                        int curFlushedSize = this.prepareFlushSinglePage(entry.pageAddress, gRegion);
                        if (curFlushedSize > 0) {
                            this.pagePriorityPool.partialSubPageFlush(pageAddress, curFlushedSize);
                        }
                        ++scanPage;
                        if ((flushedSize += (long)curFlushedSize) < (long)bestFlushSize) continue;
                        flushEnough = true;
                        break;
                    }
                    if (!flushEnough) {
                        PageAddress mainPageAddress = ((PageAddressCompositeImpl)pageAddress).getMainPageAddress();
                        finalPageAddress = !pageAddress.isPageValid() || this.readyToEvictDataPageMap.containsKey(mainPageAddress) ? null : mainPageAddress;
                        needRemoveFromPool = true;
                    }
                } else {
                    throw new GeminiRuntimeException("Internal Bug: error PageAddress");
                }
                if (finalPageAddress != null) {
                    flushedSize += (long)this.prepareFlushSinglePage(finalPageAddress, gRegion);
                    ++scanPage;
                }
                if (needRemoveFromPool) {
                    this.pagePriorityPool.remove(pageAddress);
                    dataListIterator.remove();
                }
                if (flushedSize < (long)bestFlushSize) continue;
                break;
            }
            this.tryFillPool();
            if (this.preparedFlushedPageSize.get() > (long)minSize) break;
            LOG.info("EvictHandler doFlushRegion NOT WORK bestFlushSize({}) minSize({}) flushedPageSize({}) preparedFlushedPageSize({}) scanPage({}) {}", new Object[]{bestFlushSize, minSize, flushedSize, this.preparedFlushedPageSize.get(), scanPage, this.toString()});
        }
    }

    List<EvictablePagePool.SortedEntry> getSortedList() {
        return this.pagePriorityPool.getSortedList();
    }

    boolean tryFillPool() {
        return this.pagePriorityPool.tryFillPool(this.allRegions.keySet());
    }

    private int prepareFlushSinglePage(PageAddress pageAddress, GRegion gRegion) {
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl), (Object)"Internal Bug");
        DataPage dataPage = pageAddress.getDataPage();
        if (dataPage == null || !pageAddress.isPageValid()) {
            return 0;
        }
        int dataPageSize = dataPage.getSize();
        if (this.readyToEvictDataPageMap.containsKey(pageAddress)) {
            throw new GeminiRuntimeException("Internal Bug");
        }
        this.readyToEvictDataPageMap.put(pageAddress, gRegion);
        this.flushingPageSize.addAndGet(dataPageSize);
        this.preparedFlushedPageSize.addAndGet(dataPageSize);
        EventExecutor flushEventExecutor = this.flushEventExecutorGroup.next();
        this.supervisor.getFileCache().addPage(pageAddress, gRegion.getGRegionContext(), flushEventExecutor, (success, throwable) -> {
            this.flushingPageSize.addAndGet(-dataPageSize);
            this.curThreadTotalFlushedSize.addAndGet(dataPageSize);
            dataPage.release();
            if (!success.booleanValue()) {
                LOG.error("prepare flush {} failed", (Object)pageAddress, throwable);
                this.gContext.setDBInternalError(new GeminiRuntimeException("Prepare flush failed, " + throwable));
            }
        });
        return dataPageSize;
    }

    void addPageUsedMemory(int logicPageSize, boolean needEvict) {
        long mem = this.curThreadTotalPageUsedMem.addAndGet(logicPageSize);
        if (needEvict && mem > this.curThreadMemHighMark) {
            this.doEvict((int)(mem - this.curThreadMemHighMark));
        }
    }

    void doEvict(int expectedSize) {
        if (expectedSize <= 0) {
            return;
        }
        if (this.readyToEvictDataPageMap.size() == 0 && this.preparedFlushedPageSize.get() == 0L) {
            this.tryFillPool();
            LOG.info("EvictHandler doEvict NOT WORK expectedSize({}) {}", (Object)expectedSize, (Object)this.toString());
            return;
        }
        long startTime = System.currentTimeMillis();
        int evictedSize = 0;
        int totalScanPageCount = 0;
        while (evictedSize < expectedSize && this.gContext.isDBNormal()) {
            long running = this.flushingPageSize.get();
            Iterator<Map.Entry<PageAddress, GRegion>> readyIterator = this.readyToEvictDataPageMap.entrySet().iterator();
            while (readyIterator.hasNext()) {
                Map.Entry<PageAddress, GRegion> entry = readyIterator.next();
                PageAddress pageAddress = entry.getKey();
                Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
                ++totalScanPageCount;
                DataPage dataPage = pageAddress.getDataPageNoReference();
                if (dataPage == null) {
                    if (!pageAddress.isPageValid()) continue;
                    throw new GeminiRuntimeException("Internal Bug");
                }
                if (!this.supervisor.getFileCache().isCached(pageAddress)) continue;
                evictedSize += pageAddress.getDataLen();
                this.supervisor.getBloomFilterManager().addBloomFilter(pageAddress, dataPage);
                pageAddress.setDataPage(null);
                dataPage.release();
                entry.getValue().getGRegionContext().getPageStoreStats().addPageUsedMemory(entry.getValue(), -pageAddress.getDataLen(), false);
                readyIterator.remove();
                this.preparedFlushedPageSize.addAndGet(-pageAddress.getDataLen());
                this.curThreadTotalEvictedSize.addAndGet(pageAddress.getDataLen());
                this.cacheManager.getCacheStats().addPageCacheEvictSize(pageAddress.getDataLen());
                if (evictedSize < expectedSize) continue;
                break;
            }
            long nowTime = System.currentTimeMillis();
            if (evictedSize >= expectedSize) continue;
            if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemHighMark) break;
            this.cacheManager.getCacheStats().addEvictBlock(1);
            this.tryPrepareFlush(expectedSize - evictedSize);
            if (running > (long)expectedSize || this.flushingPageSize.get() > (long)(expectedSize - evictedSize)) continue;
            LOG.info("EvictHandler doEvict NOT WORK,have run ({})ms,expectedSize({}), evictedSize({}), scanPageCount({}), currentRunning({}) {}", new Object[]{nowTime - startTime, expectedSize, evictedSize, totalScanPageCount, running, this.toString()});
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("name", (Object)this.name).add("curThreadMemLowMark", this.curThreadMemLowMark).add("curThreadMemMidMark", this.curThreadMemMidMark).add("curThreadMemHighMark", this.curThreadMemHighMark).add("curThreadTotalPageUsedMem", (Object)this.curThreadTotalPageUsedMem).add("maxPreparedFlushSize", this.maxPreparedFlushSize).add("flushingPageSize", (Object)this.flushingPageSize).add("preparedFlushedPageSize", (Object)this.preparedFlushedPageSize).add("readyToEvictDataPageMapCount", this.readyToEvictDataPageMap.size()).add("logicPagePriorityPoolSize", this.pagePriorityPool.size()).add("logicPagePriorityPoolDataLen", this.pagePriorityPool.dataSize()).add("curThreadTotalEvictedSize", (Object)this.curThreadTotalEvictedSize).add("curThreadTotalFlushedSize", (Object)this.curThreadTotalFlushedSize).add("dynamicPreparedFlushSize", this.dynamicPreparedFlushSize).add("pagePriorityPool", (Object)this.pagePriorityPool.toString()).toString();
    }

    public void addRegion(GRegion gRegion) {
        this.allRegions.put(gRegion, gRegion);
    }

    public long getMaxPreparedFlushSize() {
        return this.maxPreparedFlushSize;
    }

    @VisibleForTesting
    public int getDynamicPreparedFlushSize() {
        return this.dynamicPreparedFlushSize;
    }

    public long getCurThreadMemLowMark() {
        return this.curThreadMemLowMark;
    }

    @VisibleForTesting
    long getCurThreadTotalPageUsedMem() {
        return this.curThreadTotalPageUsedMem.get();
    }

    @VisibleForTesting
    public long getCurThreadMemMidMark() {
        return this.curThreadMemMidMark;
    }

    @VisibleForTesting
    EvictablePagePool getEvictablePagePool() {
        return this.pagePriorityPool;
    }

    @VisibleForTesting
    public Map<PageAddress, GRegion> getReadyToEvictDataPageMap() {
        return this.readyToEvictDataPageMap;
    }

    @VisibleForTesting
    long getPreparedFlushedPageSize() {
        return this.preparedFlushedPageSize.get();
    }

    public void shutdown() {
        this.pagePriorityPool.shutdown();
    }

    public String getName() {
        return this.name;
    }

    public long getLastTimeChangeMaxFlushSize() {
        return this.lastTimeChangeMaxFlushSize;
    }

    public EvictPolicy.MemoryUsedWaterMark getMemoryUsedWaterMark(int addSize) {
        if (this.curThreadTotalPageUsedMem.get() + (long)addSize < this.curThreadMemLowMark) {
            return EvictPolicy.MemoryUsedWaterMark.Normal;
        }
        if (this.curThreadTotalPageUsedMem.get() + (long)addSize < this.curThreadMemMidMark) {
            return EvictPolicy.MemoryUsedWaterMark.Low;
        }
        if (this.curThreadTotalPageUsedMem.get() + (long)addSize < this.curThreadMemHighMark) {
            return EvictPolicy.MemoryUsedWaterMark.Middle;
        }
        return EvictPolicy.MemoryUsedWaterMark.High;
    }
}

