package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressSingleImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictablePagePool;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/vm/EvictHandlerSepImpl.class */
public class EvictHandlerSepImpl {
    private static final Logger LOG = LoggerFactory.getLogger(EvictHandlerSepImpl.class);
    private final String name;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final CacheManager cacheManager;
    private final EventExecutorGroup flushEventExecutorGroup;
    private final EvictablePagePool pagePriorityPool;
    private final long curThreadMemLowMark;
    private final long curThreadMemMidMark;
    private final long curThreadMemHighMark;
    private final int maxPreparedFlushSize;
    private volatile int dynamicPreparedFlushSize;
    private final Map<PageAddress, GRegion> readyToEvictDataPageMap = new LinkedHashMap();
    private final AtomicLong preparedFlushedPageSize = new AtomicLong(0);
    private final AtomicLong flushingPageSize = new AtomicLong(0);
    private final Map<GRegion, GRegion> allRegions = new ConcurrentHashMap();
    private final AtomicLong curThreadTotalPageUsedMem = new AtomicLong(0);
    private final AtomicLong curThreadTotalFlushedSize = new AtomicLong(0);
    private final AtomicLong curThreadTotalEvictedSize = new AtomicLong(0);
    private final long intervalShrinkDynamicPreparedFlushSize = 300000;
    private volatile long lastTimeChangeMaxFlushSize = System.currentTimeMillis();

    public EvictHandlerSepImpl(String str, GContext gContext) {
        this.name = str;
        this.gContext = gContext;
        this.supervisor = gContext.getSupervisor();
        this.cacheManager = gContext.getSupervisor().getCacheManager();
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        int regionThreadNum = gContext.getGConfiguration().getRegionThreadNum();
        this.curThreadMemLowMark = this.cacheManager.getMemLowMark() / regionThreadNum;
        this.curThreadMemMidMark = this.cacheManager.getMemMidMark() / regionThreadNum;
        this.curThreadMemHighMark = this.cacheManager.getMemHighMark() / regionThreadNum;
        this.maxPreparedFlushSize = gContext.getGConfiguration().getMaxPreparedFlushSize();
        this.dynamicPreparedFlushSize = this.maxPreparedFlushSize;
        this.pagePriorityPool = new EvictablePagePoolSampleImpl(this, gContext, this.cacheManager);
        LOG.info("EvictHandlerSepImpl {} curThreadMemLowMark={},curThreadMemMidMark={},curThreadMemHighMark={},maxPreparedFlushSize={},pagePriorityPool={}", new Object[]{str, Long.valueOf(this.curThreadMemLowMark), Long.valueOf(this.curThreadMemMidMark), Long.valueOf(this.curThreadMemHighMark), Integer.valueOf(this.maxPreparedFlushSize), this.pagePriorityPool});
    }

    public int removeInvalidPage(PageAddress pageAddress) {
        Iterator<PageAddress> pageIterator = pageAddress.pageIterator();
        int i = 0 + (this.pagePriorityPool.remove(pageAddress) ? 1 : 0);
        while (pageIterator.hasNext()) {
            PageAddress next = pageIterator.next();
            i += this.pagePriorityPool.remove(next) ? 1 : 0;
            if (this.readyToEvictDataPageMap.remove(next) != null) {
                this.preparedFlushedPageSize.addAndGet(-next.getDataLen());
                i++;
            }
        }
        return i;
    }

    public void addPage(PageAddress pageAddress, GRegion gRegion) {
        this.pagePriorityPool.add(pageAddress, gRegion);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void tryPrepareFlush(int i) {
        if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemMidMark) {
            if (this.curThreadTotalPageUsedMem.get() >= this.curThreadMemLowMark - this.maxPreparedFlushSize) {
                tryFillPool();
                return;
            }
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.dynamicPreparedFlushSize < i) {
            this.dynamicPreparedFlushSize = this.maxPreparedFlushSize + i;
            this.lastTimeChangeMaxFlushSize = currentTimeMillis;
        } else if (this.dynamicPreparedFlushSize > this.maxPreparedFlushSize && this.maxPreparedFlushSize > i && changeMaxFlushSizeTimeOut(currentTimeMillis)) {
            this.dynamicPreparedFlushSize = this.maxPreparedFlushSize + (((this.dynamicPreparedFlushSize - this.maxPreparedFlushSize) * 3) / 4);
            this.lastTimeChangeMaxFlushSize = currentTimeMillis;
        }
        doPrepareFlush(i, (int) (this.dynamicPreparedFlushSize - this.preparedFlushedPageSize.get()));
    }

    boolean changeMaxFlushSizeTimeOut(long j) {
        return j - this.lastTimeChangeMaxFlushSize >= 300000;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isPageAlreadyInEvict(PageAddress pageAddress, PageAddress pageAddress2) {
        return this.pagePriorityPool.containsPage(pageAddress) || this.readyToEvictDataPageMap.containsKey(pageAddress2);
    }

    @VisibleForTesting
    void doPrepareFlush(int i, int i2) {
        if (i2 > 0 && this.curThreadTotalPageUsedMem.get() >= this.curThreadMemMidMark) {
            long j = 0;
            int i3 = 0;
            while (j < i2 && this.gContext.isDBNormal()) {
                Iterator<EvictablePagePool.SortedEntry> it = getSortedList().iterator();
                while (it.hasNext() && this.gContext.isDBNormal()) {
                    EvictablePagePool.SortedEntry next = it.next();
                    PageAddress pageAddress = next.pageAddress;
                    GRegion gRegion = next.region;
                    if (!this.pagePriorityPool.containsPage(pageAddress)) {
                        it.remove();
                    } else if (!pageAddress.isPageValid() || this.readyToEvictDataPageMap.containsKey(pageAddress)) {
                        it.remove();
                        this.pagePriorityPool.remove(pageAddress);
                    } else {
                        PageAddress pageAddress2 = null;
                        boolean z = false;
                        if (pageAddress instanceof PageAddressSingleImpl) {
                            pageAddress2 = pageAddress;
                            z = true;
                        } else {
                            if (!(pageAddress instanceof PageAddressCompositeImpl)) {
                                throw new GeminiRuntimeException("Internal Bug: error PageAddress");
                            }
                            PageAddress[] subPageAddress = ((PageAddressCompositeImpl) pageAddress).getSubPageAddress();
                            long currentTickTime = this.cacheManager.getCurrentTickTime();
                            boolean z2 = false;
                            Iterator it2 = ((List) Arrays.stream(subPageAddress).map(pageAddress3 -> {
                                DataPage dataPageNoReference = pageAddress3.getDataPageNoReference();
                                if (dataPageNoReference == null || !pageAddress3.isPageValid() || this.readyToEvictDataPageMap.containsKey(pageAddress3)) {
                                    return null;
                                }
                                return new EvictablePagePool.SortedEntry(pageAddress3, gRegion, dataPageNoReference.score(currentTickTime));
                            }).filter((v0) -> {
                                return Objects.nonNull(v0);
                            }).sorted(Comparator.comparingDouble((v0) -> {
                                return v0.getScore();
                            })).collect(Collectors.toList())).iterator();
                            while (true) {
                                if (!it2.hasNext()) {
                                    break;
                                }
                                int prepareFlushSinglePage = prepareFlushSinglePage(((EvictablePagePool.SortedEntry) it2.next()).pageAddress, gRegion);
                                if (prepareFlushSinglePage > 0) {
                                    this.pagePriorityPool.partialSubPageFlush(pageAddress, prepareFlushSinglePage);
                                }
                                j += prepareFlushSinglePage;
                                i3++;
                                if (j >= i2) {
                                    z2 = true;
                                    break;
                                }
                            }
                            if (!z2) {
                                PageAddress mainPageAddress = ((PageAddressCompositeImpl) pageAddress).getMainPageAddress();
                                pageAddress2 = (!pageAddress.isPageValid() || this.readyToEvictDataPageMap.containsKey(mainPageAddress)) ? null : mainPageAddress;
                                z = true;
                            }
                        }
                        if (pageAddress2 != null) {
                            j += prepareFlushSinglePage(pageAddress2, gRegion);
                            i3++;
                        }
                        if (z) {
                            this.pagePriorityPool.remove(pageAddress);
                            it.remove();
                        }
                        if (j >= i2) {
                            break;
                        }
                    }
                }
                tryFillPool();
                if (this.preparedFlushedPageSize.get() > i) {
                    return;
                } else {
                    LOG.info("EvictHandler doFlushRegion NOT WORK bestFlushSize({}) minSize({}) flushedPageSize({}) preparedFlushedPageSize({}) scanPage({}) {}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i), Long.valueOf(j), Long.valueOf(this.preparedFlushedPageSize.get()), Integer.valueOf(i3), toString()});
                }
            }
        }
    }

    List<EvictablePagePool.SortedEntry> getSortedList() {
        return this.pagePriorityPool.getSortedList();
    }

    boolean tryFillPool() {
        return this.pagePriorityPool.tryFillPool(this.allRegions.keySet());
    }

    private int prepareFlushSinglePage(PageAddress pageAddress, GRegion gRegion) {
        Preconditions.checkArgument(pageAddress instanceof PageAddressSingleImpl, "Internal Bug");
        DataPage dataPage = pageAddress.getDataPage();
        if (dataPage == null || !pageAddress.isPageValid()) {
            return 0;
        }
        int size = dataPage.getSize();
        if (this.readyToEvictDataPageMap.containsKey(pageAddress)) {
            throw new GeminiRuntimeException("Internal Bug");
        }
        this.readyToEvictDataPageMap.put(pageAddress, gRegion);
        this.flushingPageSize.addAndGet(size);
        this.preparedFlushedPageSize.addAndGet(size);
        this.supervisor.getFileCache().addPage(pageAddress, gRegion.getGRegionContext(), this.flushEventExecutorGroup.next(), (bool, th) -> {
            this.flushingPageSize.addAndGet(-size);
            this.curThreadTotalFlushedSize.addAndGet(size);
            dataPage.release();
            if (bool.booleanValue()) {
                return;
            }
            LOG.error("prepare flush {} failed", pageAddress, th);
            this.gContext.setDBInternalError(new GeminiRuntimeException("Prepare flush failed, " + th));
        });
        return size;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPageUsedMemory(int i, boolean z) {
        long addAndGet = this.curThreadTotalPageUsedMem.addAndGet(i);
        if (!z || addAndGet <= this.curThreadMemHighMark) {
            return;
        }
        doEvict((int) (addAndGet - this.curThreadMemHighMark));
    }

    void doEvict(int i) {
        if (i <= 0) {
            return;
        }
        if (this.readyToEvictDataPageMap.size() == 0 && this.preparedFlushedPageSize.get() == 0) {
            tryFillPool();
            LOG.info("EvictHandler doEvict NOT WORK expectedSize({}) {}", Integer.valueOf(i), toString());
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        int i2 = 0;
        int i3 = 0;
        while (i2 < i && this.gContext.isDBNormal()) {
            long j = this.flushingPageSize.get();
            Iterator<Map.Entry<PageAddress, GRegion>> it = this.readyToEvictDataPageMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<PageAddress, GRegion> next = it.next();
                PageAddress key = next.getKey();
                Preconditions.checkArgument(key instanceof PageAddressSingleImpl);
                i3++;
                DataPage dataPageNoReference = key.getDataPageNoReference();
                if (dataPageNoReference == null) {
                    if (key.isPageValid()) {
                        throw new GeminiRuntimeException("Internal Bug");
                    }
                } else if (this.supervisor.getFileCache().isCached(key)) {
                    i2 += key.getDataLen();
                    this.supervisor.getBloomFilterManager().addBloomFilter(key, dataPageNoReference);
                    key.setDataPage(null);
                    dataPageNoReference.release();
                    next.getValue().getGRegionContext().getPageStoreStats().addPageUsedMemory(next.getValue(), -key.getDataLen(), false);
                    it.remove();
                    this.preparedFlushedPageSize.addAndGet(-key.getDataLen());
                    this.curThreadTotalEvictedSize.addAndGet(key.getDataLen());
                    this.cacheManager.getCacheStats().addPageCacheEvictSize(key.getDataLen());
                    if (i2 >= i) {
                        break;
                    }
                } else {
                    continue;
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            if (i2 < i) {
                if (this.curThreadTotalPageUsedMem.get() < this.curThreadMemHighMark) {
                    return;
                }
                this.cacheManager.getCacheStats().addEvictBlock(1);
                tryPrepareFlush(i - i2);
                if (j <= i && this.flushingPageSize.get() <= i - i2) {
                    LOG.info("EvictHandler doEvict NOT WORK,have run ({})ms,expectedSize({}), evictedSize({}), scanPageCount({}), currentRunning({}) {}", new Object[]{Long.valueOf(currentTimeMillis2 - currentTimeMillis), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Long.valueOf(j), toString()});
                }
            }
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("name", this.name).add("curThreadMemLowMark", this.curThreadMemLowMark).add("curThreadMemMidMark", this.curThreadMemMidMark).add("curThreadMemHighMark", this.curThreadMemHighMark).add("curThreadTotalPageUsedMem", this.curThreadTotalPageUsedMem).add("maxPreparedFlushSize", this.maxPreparedFlushSize).add("flushingPageSize", this.flushingPageSize).add("preparedFlushedPageSize", this.preparedFlushedPageSize).add("readyToEvictDataPageMapCount", this.readyToEvictDataPageMap.size()).add("logicPagePriorityPoolSize", this.pagePriorityPool.size()).add("logicPagePriorityPoolDataLen", this.pagePriorityPool.dataSize()).add("curThreadTotalEvictedSize", this.curThreadTotalEvictedSize).add("curThreadTotalFlushedSize", this.curThreadTotalFlushedSize).add("dynamicPreparedFlushSize", this.dynamicPreparedFlushSize).add("pagePriorityPool", this.pagePriorityPool.toString()).toString();
    }

    public void addRegion(GRegion gRegion) {
        this.allRegions.put(gRegion, gRegion);
    }

    public long getMaxPreparedFlushSize() {
        return this.maxPreparedFlushSize;
    }

    @VisibleForTesting
    public int getDynamicPreparedFlushSize() {
        return this.dynamicPreparedFlushSize;
    }

    public long getCurThreadMemLowMark() {
        return this.curThreadMemLowMark;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public long getCurThreadTotalPageUsedMem() {
        return this.curThreadTotalPageUsedMem.get();
    }

    @VisibleForTesting
    public long getCurThreadMemMidMark() {
        return this.curThreadMemMidMark;
    }

    @VisibleForTesting
    EvictablePagePool getEvictablePagePool() {
        return this.pagePriorityPool;
    }

    @VisibleForTesting
    public Map<PageAddress, GRegion> getReadyToEvictDataPageMap() {
        return this.readyToEvictDataPageMap;
    }

    @VisibleForTesting
    long getPreparedFlushedPageSize() {
        return this.preparedFlushedPageSize.get();
    }

    public void shutdown() {
        this.pagePriorityPool.shutdown();
    }

    public String getName() {
        return this.name;
    }

    public long getLastTimeChangeMaxFlushSize() {
        return this.lastTimeChangeMaxFlushSize;
    }

    public EvictPolicy.MemoryUsedWaterMark getMemoryUsedWaterMark(int i) {
        return this.curThreadTotalPageUsedMem.get() + ((long) i) < this.curThreadMemLowMark ? EvictPolicy.MemoryUsedWaterMark.Normal : this.curThreadTotalPageUsedMem.get() + ((long) i) < this.curThreadMemMidMark ? EvictPolicy.MemoryUsedWaterMark.Low : this.curThreadTotalPageUsedMem.get() + ((long) i) < this.curThreadMemHighMark ? EvictPolicy.MemoryUsedWaterMark.Middle : EvictPolicy.MemoryUsedWaterMark.High;
    }
}
