/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictHandlerSepImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EvictablePagePoolSampleImpl
implements EvictablePagePool {
    private static final Logger LOG = LoggerFactory.getLogger(EvictablePagePoolSampleImpl.class);
    private final CacheManager cacheManager;
    private final Map<PageAddress, GRegion> dataPoolMap = new ConcurrentHashMap<PageAddress, GRegion>();
    private final Map<PageAddress, Long> dataPoolSizeMap = new ConcurrentHashMap<PageAddress, Long>();
    private final AtomicLong curDataLen = new AtomicLong(0L);
    private final long maxDataLen;
    private final long extraMaxDataLen;
    private final int minSortedListCountForFlush;
    private volatile List<EvictablePagePool.SortedEntry> lastOrderList = new ArrayList<EvictablePagePool.SortedEntry>();
    private final EvictHandlerSepImpl evictHandlerSep;
    private final GContext gContext;
    private final AtomicBoolean fillPollRunning = new AtomicBoolean(false);
    private final ExecutorService fillPoolExecutor;
    private final Map<GRegion, RegionChosen> regionCursorMap = new HashMap<GRegion, RegionChosen>();
    private final int intervalFillPool = 1000;
    private volatile long lastFillPoolTime = System.currentTimeMillis();
    private final boolean evictBaseOnPageAddressComposite;

    public EvictablePagePoolSampleImpl(EvictHandlerSepImpl evictHandlerSep, GContext gContext, CacheManager cacheManager) {
        this.gContext = gContext;
        this.cacheManager = cacheManager;
        this.evictHandlerSep = evictHandlerSep;
        this.minSortedListCountForFlush = gContext.getGConfiguration().getMinSortedListCountForFlush();
        this.evictBaseOnPageAddressComposite = gContext.getGConfiguration().getEvictBaseOnPageAddressComposite();
        int factor = gContext.getGConfiguration().getEvictPoolFactor();
        long configMaxSize = evictHandlerSep.getMaxPreparedFlushSize() * (long)factor;
        if (configMaxSize > evictHandlerSep.getCurThreadMemLowMark() >> 1) {
            configMaxSize = evictHandlerSep.getCurThreadMemLowMark() >> 1;
        }
        this.maxDataLen = configMaxSize;
        this.extraMaxDataLen = this.maxDataLen + (evictHandlerSep.getMaxPreparedFlushSize() << 1);
        String prefix = gContext.getGConfiguration().getExecutorPrefixName();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "EvictablePagePoolSampleImpl-%d").build();
        this.fillPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE), namedThreadFactory);
    }

    @Override
    public int size() {
        return this.dataPoolMap.size();
    }

    @Override
    public long dataSize() {
        return this.curDataLen.get();
    }

    @Override
    public boolean remove(PageAddress pageAddress) {
        if (this.dataPoolMap.remove(pageAddress) != null) {
            while (this.gContext.isDBNormal()) {
                Long size = this.dataPoolSizeMap.remove(pageAddress);
                if (size == null) continue;
                this.curDataLen.addAndGet(-size.longValue());
                return true;
            }
        }
        return false;
    }

    @Override
    public void partialSubPageFlush(PageAddress pageAddress, int curFlushedSize) {
        Long current = this.dataPoolSizeMap.remove(pageAddress);
        if (current == null) {
            return;
        }
        long now = current < (long)curFlushedSize ? 0L : current - (long)curFlushedSize;
        this.dataPoolSizeMap.put(pageAddress, now);
        this.curDataLen.addAndGet(now - current);
    }

    @Override
    public void add(PageAddress pageAddress, GRegion gRegion) {
        if (this.curDataLen.get() >= this.maxDataLen) {
            return;
        }
        this.internalAdd(pageAddress, gRegion, null);
    }

    @VisibleForTesting
    int internalAdd(PageAddress pageAddress, GRegion gRegion, AtomicLong existedInPool) {
        if (this.evictBaseOnPageAddressComposite) {
            return this.internalAddPageAddressComposite(pageAddress, gRegion, existedInPool);
        }
        return this.internalAddOnlySinglePageAddress(pageAddress, gRegion, existedInPool);
    }

    int internalAddOnlySinglePageAddress(PageAddress pageAddress, GRegion gRegion) {
        return this.internalAddOnlySinglePageAddress(pageAddress, gRegion, null);
    }

    int internalAddPageAddressComposite(PageAddress pageAddress, GRegion gRegion) {
        return this.internalAddPageAddressComposite(pageAddress, gRegion, null);
    }

    int internalAddOnlySinglePageAddress(PageAddress pageAddress, GRegion gRegion, AtomicLong existedInPool) {
        Iterator<PageAddress> pageAddressIterator = pageAddress.pageIterator();
        int totalAddedSize = 0;
        while (pageAddressIterator.hasNext()) {
            PageAddress subPage = pageAddressIterator.next();
            DataPage dataPage = subPage.getDataPageNoReference();
            if (dataPage == null) continue;
            if (gRegion.getGRegionContext().isHugePage(dataPage)) {
                gRegion.getGRegionContext().addHugePage(dataPage);
                continue;
            }
            if (!this.evictHandlerSep.isPageAlreadyInEvict(subPage, subPage)) {
                totalAddedSize += this.doAdd(subPage, gRegion);
                continue;
            }
            if (existedInPool == null) continue;
            existedInPool.incrementAndGet();
        }
        return totalAddedSize;
    }

    int internalAddPageAddressComposite(PageAddress pageAddress, GRegion gRegion, AtomicLong existedInPool) {
        Iterator<PageAddress> pageAddressIterator = pageAddress.pageIterator();
        while (pageAddressIterator.hasNext()) {
            PageAddress subPage = pageAddressIterator.next();
            if (!this.evictHandlerSep.isPageAlreadyInEvict(pageAddress, subPage)) continue;
            if (existedInPool != null) {
                existedInPool.incrementAndGet();
            }
            return 0;
        }
        return this.doAdd(pageAddress, gRegion);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int doAdd(PageAddress pageAddress, GRegion gRegion) {
        PageAddress lockPageAddress;
        int curMemorySize = pageAddress.getMemorySize();
        if (curMemorySize == 0) {
            return 0;
        }
        PageAddress pageAddress2 = lockPageAddress = pageAddress instanceof PageAddressCompositeImpl ? ((PageAddressCompositeImpl)pageAddress).getMainPageAddress() : pageAddress;
        synchronized (pageAddress2) {
            if (!pageAddress.isPageValid()) {
                return 0;
            }
            if (!this.dataPoolMap.containsKey(pageAddress) && !this.dataPoolSizeMap.containsKey(pageAddress)) {
                this.dataPoolSizeMap.put(pageAddress, Long.valueOf(curMemorySize));
                this.dataPoolMap.put(pageAddress, gRegion);
                this.curDataLen.addAndGet(curMemorySize);
                return curMemorySize;
            }
        }
        return 0;
    }

    @Override
    public boolean containsPage(PageAddress pageAddress) {
        return this.dataPoolMap.containsKey(pageAddress);
    }

    @Override
    public List<EvictablePagePool.SortedEntry> getSortedList() {
        if (this.lastOrderList.size() == 0 && this.curDataLen.get() >= this.maxDataLen) {
            return this.sortedListAndArrangePool(true);
        }
        return this.lastOrderList;
    }

    @VisibleForTesting
    List<EvictablePagePool.SortedEntry> sortedListAndArrangePool(boolean fastSortList) {
        long tickTime = this.cacheManager.getCurrentTickTime();
        List dataList = this.dataPoolMap.entrySet().stream().map(entry -> {
            PageAddress pageAddress = (PageAddress)entry.getKey();
            DataPage dataPage = pageAddress.getDataPageNoReference();
            if (dataPage != null && pageAddress.isPageValid()) {
                return new EvictablePagePool.SortedEntry((PageAddress)entry.getKey(), (GRegion)entry.getValue(), dataPage.score(tickTime));
            }
            return null;
        }).filter(Objects::nonNull).sorted(Comparator.comparingDouble(EvictablePagePool.SortedEntry::getScore)).collect(Collectors.toList());
        if (!fastSortList) {
            for (int index = dataList.size() - 1; this.curDataLen.get() > this.maxDataLen && index >= 0; --index) {
                PageAddress lastPageAddress = ((EvictablePagePool.SortedEntry)dataList.remove((int)index)).pageAddress;
                this.remove(lastPageAddress);
            }
        }
        List<EvictablePagePool.SortedEntry> result = dataList.subList(0, dataList.size() >> 1);
        this.lastOrderList = new ArrayList(result);
        return result;
    }

    @Override
    public boolean tryFillPool(Set<GRegion> regions) {
        if (this.evictHandlerSep.getCurThreadTotalPageUsedMem() < this.evictHandlerSep.getCurThreadMemLowMark() - this.evictHandlerSep.getMaxPreparedFlushSize()) {
            return false;
        }
        if (!this.fillPollRunning.compareAndSet(false, true)) {
            return false;
        }
        long currentTime = System.currentTimeMillis();
        if (this.isPoolValid(currentTime)) {
            this.fillPollRunning.compareAndSet(true, false);
            return false;
        }
        this.fillPoolExecutor.submit(() -> {
            try {
                int loop = 0;
                int filledSize = 0;
                int needAddSize = (int)(this.extraMaxDataLen - this.curDataLen.get());
                while (filledSize < needAddSize && this.gContext.isDBNormal()) {
                    List<RegionChosen> regionAssign = this.assignToRegion(regions, needAddSize - filledSize);
                    boolean print = ++loop % 100 == 0;
                    int thisLoopSize = -1;
                    for (RegionChosen chosenRegion : regionAssign) {
                        if (chosenRegion.assignSize == 0) continue;
                        thisLoopSize = this.doFillPoolPerRegion(chosenRegion, print);
                        filledSize += thisLoopSize;
                    }
                    if (this.extraMaxDataLen <= this.curDataLen.get()) break;
                    if (!print) continue;
                    LOG.info("tryFillPool has run {}, cur needAddSize={} filledSize={} thisLoopSize={} current regionCount({}) audit({})", new Object[]{loop, needAddSize, filledSize, thisLoopSize, regions.size(), this.audit(regions)});
                }
                this.sortedListAndArrangePool(false);
            }
            catch (Throwable e) {
                LOG.error("Internal Bug!", e);
            }
            finally {
                long runTime = System.currentTimeMillis() - currentTime;
                this.cacheManager.getCacheStats().addFillPoolTime(runTime);
                if (runTime > 1000L) {
                    LOG.error("tryFillPool TOO SLOW! {} (ms)", (Object)runTime);
                }
                this.lastFillPoolTime = currentTime;
                this.fillPollRunning.compareAndSet(true, false);
            }
        });
        return true;
    }

    boolean isPoolValid(long currentTime) {
        return this.lastOrderList.size() > this.minSortedListCountForFlush && this.curDataLen.get() >= this.maxDataLen >> 1 && currentTime - this.lastFillPoolTime < 1000L;
    }

    List<RegionChosen> assignToRegion(Set<GRegion> regions, int wantedSize) {
        long canChoseSize = 0L;
        for (GRegion gRegion : regions) {
            RegionChosen regionChosen = this.regionCursorMap.computeIfAbsent(gRegion, RegionChosen::new);
            regionChosen.usedMemory = gRegion.getGRegionContext().getPageStoreStats().getPageUsedMemory();
            regionChosen.assignSize = 0;
            canChoseSize += regionChosen.usedMemory;
        }
        if (canChoseSize <= (long)(wantedSize << 1)) {
            wantedSize = (int)(canChoseSize >> 1);
        }
        ArrayList<RegionChosen> result = new ArrayList<RegionChosen>(this.regionCursorMap.values());
        HashSet<RegionChosen> removeRegionChosen = new HashSet<RegionChosen>();
        int curLoopWanted = wantedSize;
        int curRegionSize = regions.size();
        block1: while (curLoopWanted > 0 && this.gContext.isDBNormal()) {
            int averageWantedSize = curLoopWanted / curRegionSize + (curLoopWanted % curRegionSize == 0 ? 0 : 1);
            for (RegionChosen regionChosen : result) {
                int curAssign;
                if (regionChosen.usedMemory <= (long)(regionChosen.assignSize << 1)) {
                    if (removeRegionChosen.contains(regionChosen)) continue;
                    removeRegionChosen.add(regionChosen);
                    --curRegionSize;
                    continue;
                }
                if (regionChosen.usedMemory - (long)regionChosen.assignSize > (long)(averageWantedSize << 1)) {
                    curAssign = this.checkSize(curLoopWanted < averageWantedSize ? curLoopWanted : averageWantedSize, regionChosen);
                    regionChosen.assignSize += curAssign;
                    if ((curLoopWanted -= curAssign) > 0) continue;
                    break block1;
                }
                if (regionChosen.usedMemory - (long)regionChosen.assignSize <= (long)averageWantedSize) continue;
                curAssign = this.checkSize(curLoopWanted < averageWantedSize >> 1 ? curLoopWanted : averageWantedSize >> 1, regionChosen);
                regionChosen.assignSize += curAssign;
                if ((curLoopWanted -= curAssign) > 0) continue;
                break block1;
            }
        }
        return result;
    }

    private int checkSize(int curAssign, RegionChosen regionChosen) {
        int result = curAssign;
        if ((long)(curAssign + regionChosen.assignSize) > regionChosen.usedMemory >> 1) {
            result = (int)((regionChosen.usedMemory >> 1) - (long)regionChosen.assignSize);
        }
        return result;
    }

    private String audit(Set<GRegion> regions) {
        try {
            long totalPageAddressCount = 0L;
            long totalPageSize = 0L;
            long totalPageInMemSize = 0L;
            for (GRegion region : regions) {
                PageStore pageStore = region.getPageStore();
                Iterator<PageAddress> pageAddressIterator = pageStore.getPageIndex().pageIterator();
                while (pageAddressIterator.hasNext()) {
                    ++totalPageAddressCount;
                    PageAddress pageAddress = pageAddressIterator.next();
                    totalPageSize += (long)pageAddress.getDataLen();
                    totalPageInMemSize += (long)pageAddress.getMemorySize();
                }
            }
            return String.format("totalPageAddressCount=%s,totalPageSize=%s,totalPageInMemSize=%s %s", totalPageAddressCount, totalPageSize, totalPageInMemSize, this.evictHandlerSep);
        }
        catch (Exception e) {
            LOG.warn("audit error", (Throwable)e);
            return "audit error;";
        }
    }

    @VisibleForTesting
    int doFillPoolPerRegion(RegionChosen regionChosen, boolean print) {
        LogicalPageChain[] pages = regionChosen.gRegion.getPageStore().getPageIndex().getPageIndex();
        int scanCount = 0;
        int addedSize = 0;
        int cursor = regionChosen.cursor;
        AtomicLong existedInPool = new AtomicLong(0L);
        while (addedSize < regionChosen.assignSize && this.gContext.isDBNormal()) {
            ++scanCount;
            LogicalPageChain logicalPageChain = pages[cursor];
            if (logicalPageChain != null) {
                for (int i = 0; i <= logicalPageChain.getCurrentPageChainIndex(); ++i) {
                    int addedPerPageAddress;
                    PageAddress pageAddress = logicalPageChain.getPageAddress(i);
                    if (pageAddress == null || (addedPerPageAddress = this.internalAdd(pageAddress, regionChosen.gRegion, existedInPool)) <= 0) continue;
                    addedSize += addedPerPageAddress;
                    break;
                }
            }
            if (++cursor >= pages.length) {
                cursor = 0;
            }
            if (cursor != regionChosen.cursor) continue;
            break;
        }
        if (print) {
            LOG.info("tryFillPool({}) scanCount={}, regionChosen={}, cursor={}, addedSize={}, existedPage={} pages.length={}, dataPool={},lastOrderList={}", new Object[]{this.evictHandlerSep.getName(), scanCount, regionChosen, cursor, addedSize, existedInPool.get(), pages.length, this.dataPoolMap.size(), this.lastOrderList.size()});
        }
        regionChosen.cursor = cursor;
        return addedSize;
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("name", (Object)this.evictHandlerSep.getName()).add("poolCount", this.size()).add("poolDataSize", this.dataSize()).add("lastOrderList", this.lastOrderList == null ? 0 : this.lastOrderList.size()).add("maxDataLen", this.maxDataLen).add("lastFillPoolTime", this.lastFillPoolTime).toString();
    }

    @Override
    public void shutdown() {
        this.fillPoolExecutor.shutdownNow();
    }

    @VisibleForTesting
    Map<PageAddress, GRegion> getDataPoolMap() {
        return this.dataPoolMap;
    }

    @VisibleForTesting
    public long getMaxDataLen() {
        return this.maxDataLen;
    }

    @VisibleForTesting
    public long getExtraMaxDataLen() {
        return this.extraMaxDataLen;
    }

    @VisibleForTesting
    public boolean getFillPollRunning() {
        return this.fillPollRunning.get();
    }

    public boolean isEvictBaseOnPageAddressComposite() {
        return this.evictBaseOnPageAddressComposite;
    }

    @VisibleForTesting
    public Map<GRegion, RegionChosen> getRegionCursorMap() {
        return this.regionCursorMap;
    }

    @VisibleForTesting
    public long getLastFillPoolTime() {
        return this.lastFillPoolTime;
    }

    static class RegionChosen {
        private final GRegion gRegion;
        int cursor = 0;
        long usedMemory = 0L;
        int assignSize = 0;

        RegionChosen(GRegion gRegion) {
            this.gRegion = gRegion;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("gRegion", (Object)this.gRegion.getRegionId()).add("cursor", this.cursor).add("usedMemory", this.usedMemory).add("assignSize", this.assignSize).toString();
        }
    }
}

