package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/vm/EvictablePagePoolSampleImpl.class */
public class EvictablePagePoolSampleImpl implements EvictablePagePool {
    private static final Logger LOG = LoggerFactory.getLogger(EvictablePagePoolSampleImpl.class);
    private final CacheManager cacheManager;
    private final long maxDataLen;
    private final long extraMaxDataLen;
    private final int minSortedListCountForFlush;
    private final EvictHandlerSepImpl evictHandlerSep;
    private final GContext gContext;
    private final ExecutorService fillPoolExecutor;
    private final boolean evictBaseOnPageAddressComposite;
    private final Map<PageAddress, GRegion> dataPoolMap = new ConcurrentHashMap();
    private final Map<PageAddress, Long> dataPoolSizeMap = new ConcurrentHashMap();
    private final AtomicLong curDataLen = new AtomicLong(0);
    private volatile List<EvictablePagePool.SortedEntry> lastOrderList = new ArrayList();
    private final AtomicBoolean fillPollRunning = new AtomicBoolean(false);
    private final Map<GRegion, RegionChosen> regionCursorMap = new HashMap();
    private final int intervalFillPool = 1000;
    private volatile long lastFillPoolTime = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/vm/EvictablePagePoolSampleImpl$RegionChosen.class */
    public static class RegionChosen {
        private final GRegion gRegion;
        int cursor = 0;
        long usedMemory = 0;
        int assignSize = 0;

        RegionChosen(GRegion gRegion) {
            this.gRegion = gRegion;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("gRegion", this.gRegion.getRegionId()).add("cursor", this.cursor).add("usedMemory", this.usedMemory).add("assignSize", this.assignSize).toString();
        }
    }

    public EvictablePagePoolSampleImpl(EvictHandlerSepImpl evictHandlerSepImpl, GContext gContext, CacheManager cacheManager) {
        this.gContext = gContext;
        this.cacheManager = cacheManager;
        this.evictHandlerSep = evictHandlerSepImpl;
        this.minSortedListCountForFlush = gContext.getGConfiguration().getMinSortedListCountForFlush();
        this.evictBaseOnPageAddressComposite = gContext.getGConfiguration().getEvictBaseOnPageAddressComposite();
        long maxPreparedFlushSize = evictHandlerSepImpl.getMaxPreparedFlushSize() * gContext.getGConfiguration().getEvictPoolFactor();
        this.maxDataLen = maxPreparedFlushSize > (evictHandlerSepImpl.getCurThreadMemLowMark() >> 1) ? evictHandlerSepImpl.getCurThreadMemLowMark() >> 1 : maxPreparedFlushSize;
        this.extraMaxDataLen = this.maxDataLen + (evictHandlerSepImpl.getMaxPreparedFlushSize() << 1);
        this.fillPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(32767), new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "EvictablePagePoolSampleImpl-%d").build());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public int size() {
        return this.dataPoolMap.size();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public long dataSize() {
        return this.curDataLen.get();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public boolean remove(PageAddress pageAddress) {
        if (this.dataPoolMap.remove(pageAddress) == null) {
            return false;
        }
        while (this.gContext.isDBNormal()) {
            Long remove = this.dataPoolSizeMap.remove(pageAddress);
            if (remove != null) {
                this.curDataLen.addAndGet(-remove.longValue());
                return true;
            }
        }
        return false;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public void partialSubPageFlush(PageAddress pageAddress, int i) {
        Long remove = this.dataPoolSizeMap.remove(pageAddress);
        if (remove == null) {
            return;
        }
        long longValue = remove.longValue() < ((long) i) ? 0L : remove.longValue() - i;
        this.dataPoolSizeMap.put(pageAddress, Long.valueOf(longValue));
        this.curDataLen.addAndGet(longValue - remove.longValue());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public void add(PageAddress pageAddress, GRegion gRegion) {
        if (this.curDataLen.get() >= this.maxDataLen) {
            return;
        }
        internalAdd(pageAddress, gRegion, null);
    }

    @VisibleForTesting
    int internalAdd(PageAddress pageAddress, GRegion gRegion, AtomicLong atomicLong) {
        return this.evictBaseOnPageAddressComposite ? internalAddPageAddressComposite(pageAddress, gRegion, atomicLong) : internalAddOnlySinglePageAddress(pageAddress, gRegion, atomicLong);
    }

    int internalAddOnlySinglePageAddress(PageAddress pageAddress, GRegion gRegion) {
        return internalAddOnlySinglePageAddress(pageAddress, gRegion, null);
    }

    int internalAddPageAddressComposite(PageAddress pageAddress, GRegion gRegion) {
        return internalAddPageAddressComposite(pageAddress, gRegion, null);
    }

    int internalAddOnlySinglePageAddress(PageAddress pageAddress, GRegion gRegion, AtomicLong atomicLong) {
        Iterator<PageAddress> pageIterator = pageAddress.pageIterator();
        int i = 0;
        while (pageIterator.hasNext()) {
            PageAddress next = pageIterator.next();
            DataPage dataPageNoReference = next.getDataPageNoReference();
            if (dataPageNoReference != null) {
                if (gRegion.getGRegionContext().isHugePage(dataPageNoReference)) {
                    gRegion.getGRegionContext().addHugePage(dataPageNoReference);
                } else if (!this.evictHandlerSep.isPageAlreadyInEvict(next, next)) {
                    i += doAdd(next, gRegion);
                } else if (atomicLong != null) {
                    atomicLong.incrementAndGet();
                }
            }
        }
        return i;
    }

    int internalAddPageAddressComposite(PageAddress pageAddress, GRegion gRegion, AtomicLong atomicLong) {
        Iterator<PageAddress> pageIterator = pageAddress.pageIterator();
        while (pageIterator.hasNext()) {
            if (this.evictHandlerSep.isPageAlreadyInEvict(pageAddress, pageIterator.next())) {
                if (atomicLong == null) {
                    return 0;
                }
                atomicLong.incrementAndGet();
                return 0;
            }
        }
        return doAdd(pageAddress, gRegion);
    }

    int doAdd(PageAddress pageAddress, GRegion gRegion) {
        int memorySize = pageAddress.getMemorySize();
        if (memorySize == 0) {
            return 0;
        }
        synchronized ((pageAddress instanceof PageAddressCompositeImpl ? ((PageAddressCompositeImpl) pageAddress).getMainPageAddress() : pageAddress)) {
            if (!pageAddress.isPageValid()) {
                return 0;
            }
            if (this.dataPoolMap.containsKey(pageAddress) || this.dataPoolSizeMap.containsKey(pageAddress)) {
                return 0;
            }
            this.dataPoolSizeMap.put(pageAddress, Long.valueOf(memorySize));
            this.dataPoolMap.put(pageAddress, gRegion);
            this.curDataLen.addAndGet(memorySize);
            return memorySize;
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public boolean containsPage(PageAddress pageAddress) {
        return this.dataPoolMap.containsKey(pageAddress);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public List<EvictablePagePool.SortedEntry> getSortedList() {
        return (this.lastOrderList.size() != 0 || this.curDataLen.get() < this.maxDataLen) ? this.lastOrderList : sortedListAndArrangePool(true);
    }

    @VisibleForTesting
    List<EvictablePagePool.SortedEntry> sortedListAndArrangePool(boolean z) {
        long currentTickTime = this.cacheManager.getCurrentTickTime();
        List list = (List) this.dataPoolMap.entrySet().stream().map(entry -> {
            PageAddress pageAddress = (PageAddress) entry.getKey();
            DataPage dataPageNoReference = pageAddress.getDataPageNoReference();
            if (dataPageNoReference == null || !pageAddress.isPageValid()) {
                return null;
            }
            return new EvictablePagePool.SortedEntry((PageAddress) entry.getKey(), (GRegion) entry.getValue(), dataPageNoReference.score(currentTickTime));
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).sorted(Comparator.comparingDouble((v0) -> {
            return v0.getScore();
        })).collect(Collectors.toList());
        if (!z) {
            for (int size = list.size() - 1; this.curDataLen.get() > this.maxDataLen && size >= 0; size--) {
                remove(((EvictablePagePool.SortedEntry) list.remove(size)).pageAddress);
            }
        }
        List<EvictablePagePool.SortedEntry> subList = list.subList(0, list.size() >> 1);
        this.lastOrderList = new ArrayList(subList);
        return subList;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public boolean tryFillPool(Set<GRegion> set) {
        if (this.evictHandlerSep.getCurThreadTotalPageUsedMem() < this.evictHandlerSep.getCurThreadMemLowMark() - this.evictHandlerSep.getMaxPreparedFlushSize() || !this.fillPollRunning.compareAndSet(false, true)) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (isPoolValid(currentTimeMillis)) {
            this.fillPollRunning.compareAndSet(true, false);
            return false;
        }
        this.fillPoolExecutor.submit(() -> {
            try {
                try {
                    int i = 0;
                    int i2 = 0;
                    int i3 = (int) (this.extraMaxDataLen - this.curDataLen.get());
                    while (i2 < i3 && this.gContext.isDBNormal()) {
                        List<RegionChosen> assignToRegion = assignToRegion(set, i3 - i2);
                        i++;
                        boolean z = i % 100 == 0;
                        int i4 = -1;
                        for (RegionChosen regionChosen : assignToRegion) {
                            if (regionChosen.assignSize != 0) {
                                i4 = doFillPoolPerRegion(regionChosen, z);
                                i2 += i4;
                            }
                        }
                        if (this.extraMaxDataLen <= this.curDataLen.get()) {
                            break;
                        } else if (z) {
                            LOG.info("tryFillPool has run {}, cur needAddSize={} filledSize={} thisLoopSize={} current regionCount({}) audit({})", new Object[]{Integer.valueOf(i), Integer.valueOf(i3), Integer.valueOf(i2), Integer.valueOf(i4), Integer.valueOf(set.size()), audit(set)});
                        }
                    }
                    sortedListAndArrangePool(false);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    this.cacheManager.getCacheStats().addFillPoolTime(currentTimeMillis2);
                    if (currentTimeMillis2 > 1000) {
                        LOG.error("tryFillPool TOO SLOW! {} (ms)", Long.valueOf(currentTimeMillis2));
                    }
                    this.lastFillPoolTime = currentTimeMillis;
                    this.fillPollRunning.compareAndSet(true, false);
                } catch (Throwable th) {
                    LOG.error("Internal Bug!", th);
                    long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis;
                    this.cacheManager.getCacheStats().addFillPoolTime(currentTimeMillis3);
                    if (currentTimeMillis3 > 1000) {
                        LOG.error("tryFillPool TOO SLOW! {} (ms)", Long.valueOf(currentTimeMillis3));
                    }
                    this.lastFillPoolTime = currentTimeMillis;
                    this.fillPollRunning.compareAndSet(true, false);
                }
            } catch (Throwable th2) {
                long currentTimeMillis4 = System.currentTimeMillis() - currentTimeMillis;
                this.cacheManager.getCacheStats().addFillPoolTime(currentTimeMillis4);
                if (currentTimeMillis4 > 1000) {
                    LOG.error("tryFillPool TOO SLOW! {} (ms)", Long.valueOf(currentTimeMillis4));
                }
                this.lastFillPoolTime = currentTimeMillis;
                this.fillPollRunning.compareAndSet(true, false);
                throw th2;
            }
        });
        return true;
    }

    boolean isPoolValid(long j) {
        return this.lastOrderList.size() > this.minSortedListCountForFlush && this.curDataLen.get() >= (this.maxDataLen >> 1) && j - this.lastFillPoolTime < 1000;
    }

    List<RegionChosen> assignToRegion(Set<GRegion> set, int i) {
        long j = 0;
        for (GRegion gRegion : set) {
            RegionChosen computeIfAbsent = this.regionCursorMap.computeIfAbsent(gRegion, RegionChosen::new);
            computeIfAbsent.usedMemory = gRegion.getGRegionContext().getPageStoreStats().getPageUsedMemory();
            computeIfAbsent.assignSize = 0;
            j += computeIfAbsent.usedMemory;
        }
        if (j <= (i << 1)) {
            i = (int) (j >> 1);
        }
        ArrayList<RegionChosen> arrayList = new ArrayList(this.regionCursorMap.values());
        HashSet hashSet = new HashSet();
        int i2 = i;
        int size = set.size();
        loop1: while (i2 > 0 && this.gContext.isDBNormal()) {
            int i3 = (i2 / size) + (i2 % size == 0 ? 0 : 1);
            for (RegionChosen regionChosen : arrayList) {
                if (regionChosen.usedMemory > (regionChosen.assignSize << 1)) {
                    if (regionChosen.usedMemory - regionChosen.assignSize <= (i3 << 1)) {
                        if (regionChosen.usedMemory - regionChosen.assignSize > i3) {
                            int checkSize = checkSize(i2 < (i3 >> 1) ? i2 : i3 >> 1, regionChosen);
                            regionChosen.assignSize += checkSize;
                            i2 -= checkSize;
                            if (i2 <= 0) {
                                break loop1;
                            }
                        } else {
                            continue;
                        }
                    } else {
                        int checkSize2 = checkSize(i2 < i3 ? i2 : i3, regionChosen);
                        regionChosen.assignSize += checkSize2;
                        i2 -= checkSize2;
                        if (i2 <= 0) {
                            break loop1;
                        }
                    }
                } else if (!hashSet.contains(regionChosen)) {
                    hashSet.add(regionChosen);
                    size--;
                }
            }
        }
        return arrayList;
    }

    private int checkSize(int i, RegionChosen regionChosen) {
        int i2 = i;
        if (i + regionChosen.assignSize > (regionChosen.usedMemory >> 1)) {
            i2 = (int) ((regionChosen.usedMemory >> 1) - regionChosen.assignSize);
        }
        return i2;
    }

    private String audit(Set<GRegion> set) {
        try {
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            Iterator<GRegion> it = set.iterator();
            while (it.hasNext()) {
                Iterator<PageAddress> pageIterator = it.next().getPageStore().getPageIndex().pageIterator();
                while (pageIterator.hasNext()) {
                    j++;
                    PageAddress next = pageIterator.next();
                    j2 += next.getDataLen();
                    j3 += next.getMemorySize();
                }
            }
            return String.format("totalPageAddressCount=%s,totalPageSize=%s,totalPageInMemSize=%s %s", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), this.evictHandlerSep);
        } catch (Exception e) {
            LOG.warn("audit error", e);
            return "audit error;";
        }
    }

    @VisibleForTesting
    int doFillPoolPerRegion(RegionChosen regionChosen, boolean z) {
        int internalAdd;
        LogicalPageChain[] pageIndex = regionChosen.gRegion.getPageStore().getPageIndex().getPageIndex();
        int i = 0;
        int i2 = 0;
        int i3 = regionChosen.cursor;
        AtomicLong atomicLong = new AtomicLong(0L);
        while (i2 < regionChosen.assignSize && this.gContext.isDBNormal()) {
            i++;
            LogicalPageChain logicalPageChain = pageIndex[i3];
            if (logicalPageChain != null) {
                int i4 = 0;
                while (true) {
                    if (i4 <= logicalPageChain.getCurrentPageChainIndex()) {
                        PageAddress pageAddress = logicalPageChain.getPageAddress(i4);
                        if (pageAddress != null && (internalAdd = internalAdd(pageAddress, regionChosen.gRegion, atomicLong)) > 0) {
                            i2 += internalAdd;
                            break;
                        }
                        i4++;
                    } else {
                        break;
                    }
                }
            }
            i3++;
            if (i3 >= pageIndex.length) {
                i3 = 0;
            }
            if (i3 == regionChosen.cursor) {
                break;
            }
        }
        if (z) {
            LOG.info("tryFillPool({}) scanCount={}, regionChosen={}, cursor={}, addedSize={}, existedPage={} pages.length={}, dataPool={},lastOrderList={}", new Object[]{this.evictHandlerSep.getName(), Integer.valueOf(i), regionChosen, Integer.valueOf(i3), Integer.valueOf(i2), Long.valueOf(atomicLong.get()), Integer.valueOf(pageIndex.length), Integer.valueOf(this.dataPoolMap.size()), Integer.valueOf(this.lastOrderList.size())});
        }
        regionChosen.cursor = i3;
        return i2;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("name", this.evictHandlerSep.getName()).add("poolCount", size()).add("poolDataSize", dataSize()).add("lastOrderList", this.lastOrderList == null ? 0 : this.lastOrderList.size()).add("maxDataLen", this.maxDataLen).add("lastFillPoolTime", this.lastFillPoolTime).toString();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool
    public void shutdown() {
        this.fillPoolExecutor.shutdownNow();
    }

    @VisibleForTesting
    Map<PageAddress, GRegion> getDataPoolMap() {
        return this.dataPoolMap;
    }

    @VisibleForTesting
    public long getMaxDataLen() {
        return this.maxDataLen;
    }

    @VisibleForTesting
    public long getExtraMaxDataLen() {
        return this.extraMaxDataLen;
    }

    @VisibleForTesting
    public boolean getFillPollRunning() {
        return this.fillPollRunning.get();
    }

    public boolean isEvictBaseOnPageAddressComposite() {
        return this.evictBaseOnPageAddressComposite;
    }

    @VisibleForTesting
    public Map<GRegion, RegionChosen> getRegionCursorMap() {
        return this.regionCursorMap;
    }

    @VisibleForTesting
    public long getLastFillPoolTime() {
        return this.lastFillPoolTime;
    }
}
