/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.page;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiShutDownException;
import org.apache.flink.runtime.state.gemini.engine.filter.StateFilter;
import org.apache.flink.runtime.state.gemini.engine.handler.PageCompactHandler;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.LogicChainedPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexContext;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexContextHashImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexHashImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink;
import org.apache.flink.runtime.state.gemini.engine.page.PageStatus;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryKey;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinaryHashMap;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.WaterMark;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHashPageStore<K, V>
implements PageStore<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractHashPageStore.class);
    protected final PageIndexHashImpl<K> pageIndex;
    protected final GRegionContext gRegionContext;
    protected final EventExecutor eventExecutor;
    protected final CacheManager cacheManager;
    protected final GContext gContext;
    protected final SnapshotManager geminiSnapshotManager;
    private final int spilledPageSizeThresholdLow;
    private final int spilledPageSizeThresholdMiddle;
    private final int spilledPageSizeThresholdHigh;
    private final int maxCompactionChainThreshold;
    protected final GRegion gRegion;
    private final PageCompactHandler pageCompactHandler;
    private final int inMemoryCompactionThreshold;
    private final int maxRunningMajorCompaction;
    private final int maxRunningMinorCompaction;
    protected final PageSerdeFlink<K, V> pageSerdeFlink;

    public AbstractHashPageStore(GRegion gRegion, EventExecutor eventExecutor) {
        this(gRegion, null, eventExecutor);
    }

    public AbstractHashPageStore(GRegion gRegion, @Nullable PageIndex pageIndex, EventExecutor eventExecutor) {
        this.gRegion = gRegion;
        this.gRegionContext = gRegion.getGRegionContext();
        this.eventExecutor = eventExecutor;
        GConfiguration configuration = this.gRegionContext.getGContext().getGConfiguration();
        this.pageIndex = pageIndex != null ? (PageIndexHashImpl)pageIndex : new PageIndexHashImpl(configuration, this, this.gRegionContext.getPageStoreStats());
        this.gContext = this.gRegionContext.getGContext();
        this.cacheManager = this.gContext.getSupervisor().getCacheManager();
        this.gRegionContext.getPageStoreStats().setPageSizeRate(configuration.getPageSizeRateBetweenPOJOAndHeap());
        this.spilledPageSizeThresholdLow = configuration.getSpilledPageSizeThresholdUnderLowMark();
        this.spilledPageSizeThresholdMiddle = configuration.getSpilledPageSizeThresholdUnderMiddleMark();
        this.spilledPageSizeThresholdHigh = configuration.getSpilledPageSizeThresholdUnderHighMark();
        this.geminiSnapshotManager = this.gContext.getSupervisor().getSnapshotManager();
        this.maxCompactionChainThreshold = configuration.getMaxCompactionChainThreshold();
        this.inMemoryCompactionThreshold = configuration.getInMemoryCompactionThreshold();
        this.maxRunningMajorCompaction = configuration.getMaxRunningMajorCompaction();
        this.maxRunningMinorCompaction = configuration.getMaxRunningMinorCompaction();
        this.pageSerdeFlink = this.gRegionContext.getPageSerdeFlink();
        this.pageCompactHandler = new PageCompactHandler(){

            @Override
            public void doAsyncMajorCompaction(LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, long version) {
                AbstractHashPageStore.this.doMajorCompaction(logicChainedPage, curPageIndex, curChainIndex, version);
            }

            @Override
            public void doAsyncMinorCompaction(LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, long version, boolean force) {
                AbstractHashPageStore.this.doMinorCompaction(logicChainedPage, curPageIndex, curChainIndex, version, force);
            }

            @Override
            public void doSyncReplace(LogicChainedPage logicChainedPage, int curPageIndex, int oldCompatedPageSize, int oldMemPageSize, long oldRequstCount, int inclusiveCompactionStartChainIndex, int inclusiveCompactionEndChainIndex, DataPage compactedDataPage, List<PageAddress> invalidPageAddressList, int relatedIndex) {
                AbstractHashPageStore.this.doSyncReplaceLogicPage(logicChainedPage, curPageIndex, oldCompatedPageSize, oldMemPageSize, oldRequstCount, inclusiveCompactionStartChainIndex, inclusiveCompactionEndChainIndex, compactedDataPage, invalidPageAddressList, false, relatedIndex);
            }

            @Override
            public void doAsyncMinorCompactionByRead(LogicChainedPage logicPageID, int curPageIndex, int curChainIndex, Map<Integer, DataPage> fetchedDataPageMap) {
                AbstractHashPageStore.this.doMinorCompactionByRead(logicPageID, curPageIndex, curChainIndex, fetchedDataPageMap);
            }
        };
    }

    @Override
    public EventExecutor getExecutor() {
        return this.eventExecutor;
    }

    @Override
    public boolean contains(K key) {
        return this.get(key) != null;
    }

    @Override
    public PageIndex<K> getPageIndex() {
        return this.pageIndex;
    }

    @Override
    public void addPage(PageIndexContext pageIndexContext, List<Tuple2<K, GSValue<V>>> dataSet, long version) {
        LogicChainedPage currentLogicPageID = pageIndexContext.getPageID();
        if (currentLogicPageID == PageIndexHashImpl.NO_PAGE) {
            String msg = "BUG! addOrMergePage receive NO_PAGE request.";
            LOG.error(msg);
            throw new GeminiRuntimeException(msg);
        }
        if (dataSet == null || dataSet.isEmpty()) {
            this.compactPage(pageIndexContext, version);
        } else {
            this.doWriteDataToPage(pageIndexContext, dataSet, version);
        }
    }

    @Override
    public void compactPage(PageIndexContext pageIndexContext, long version) {
        try {
            LogicChainedPage logicChainedPage = pageIndexContext.getPageID();
            int curPageIndex = pageIndexContext.getPageIndexID();
            if (logicChainedPage != this.pageIndex.getLogicPage(curPageIndex)) {
                return;
            }
            if (logicChainedPage.getCurrentPageChainIndex() <= 0) {
                return;
            }
            if (!logicChainedPage.getPageStatus().canCompaction()) {
                return;
            }
            int curChainIndex = logicChainedPage.getCurrentPageChainIndex();
            LogicChainedPage compactionLogicChainedPage = logicChainedPage;
            if (logicChainedPage.getCurrentPageChainIndex() >= this.maxCompactionChainThreshold) {
                if (logicChainedPage.getPageStatus().canCompaction()) {
                    this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(1);
                    if (this.cacheManager.getCacheStats().getRuningMajorCompactedPages() > this.maxRunningMajorCompaction) {
                        this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(-1);
                        this.tryLaunchMinorCompaction(version, logicChainedPage, curPageIndex, curChainIndex, compactionLogicChainedPage, true);
                        return;
                    }
                    if (!logicChainedPage.setPageStatus(PageStatus.Normal, PageStatus.Compacting)) {
                        this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(-1);
                        return;
                    }
                    EventExecutor eventExecutor = this.gContext.getSupervisor().getCompactionExecutorGroup().next();
                    eventExecutor.submit(() -> {
                        try {
                            this.pageCompactHandler.doAsyncMajorCompaction(compactionLogicChainedPage, curPageIndex, curChainIndex, version);
                        }
                        catch (GeminiShutDownException ignore) {
                            LOG.debug("GeminiDB has shutdown!", (Throwable)ignore);
                        }
                        catch (Exception e) {
                            LOG.error("async major compaction failed", (Throwable)e);
                        }
                    });
                }
            } else if (logicChainedPage.getCurrentPageChainIndex() > this.inMemoryCompactionThreshold) {
                this.tryLaunchMinorCompaction(version, logicChainedPage, curPageIndex, curChainIndex, compactionLogicChainedPage, false);
            }
        }
        catch (Exception e) {
            LOG.error("Bug " + e.getMessage(), (Throwable)e);
            throw new GeminiRuntimeException(e);
        }
    }

    private void tryLaunchMinorCompaction(long version, LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, LogicChainedPage compactionLogicChainedPage, boolean force) {
        if (logicChainedPage.getPageStatus().canCompaction()) {
            PageAddress pageAddress;
            DataPage dataPage;
            this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(1);
            if (!force && this.cacheManager.getCacheStats().getRuningMinorCompactedPages() > this.maxRunningMinorCompaction) {
                this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(-1);
                return;
            }
            int memCandidatePage = 0;
            long lastSumCompactedThreshold = -1L;
            for (int startCompactionIndex = curChainIndex; startCompactionIndex >= 0 && (dataPage = (pageAddress = logicChainedPage.getPageAddress(startCompactionIndex)).getDataPageNoReference()) != null; --startCompactionIndex) {
                if (!force) {
                    long compactedCount = dataPage.getCompactionCount();
                    if (lastSumCompactedThreshold == -1L) {
                        lastSumCompactedThreshold = compactedCount;
                    } else {
                        if (lastSumCompactedThreshold < compactedCount) break;
                        lastSumCompactedThreshold += compactedCount;
                    }
                }
                ++memCandidatePage;
            }
            if (memCandidatePage <= this.inMemoryCompactionThreshold) {
                this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(-1);
                return;
            }
            if (!logicChainedPage.setPageStatus(PageStatus.Normal, PageStatus.Compacting)) {
                this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(-1);
                return;
            }
            EventExecutor eventExecutor = this.gContext.getSupervisor().getCompactionExecutorGroup().next();
            eventExecutor.submit(() -> {
                try {
                    this.pageCompactHandler.doAsyncMinorCompaction(compactionLogicChainedPage, curPageIndex, curChainIndex, version, force);
                }
                catch (GeminiShutDownException ignore) {
                    LOG.debug("GeminiDB has shutdown!", (Throwable)ignore);
                }
                catch (Exception e) {
                    LOG.error("async minor compaction failed", (Throwable)e);
                }
            });
        }
    }

    private LogicChainedPage doSyncReplaceLogicPage(LogicChainedPage logicChainedPage, int curPageIndex, int oldCompatedPageSize, int oldMemPageSize, long oldRequstCount, int inclusiveCompactionStartChainIndex, int inclusiveCompactionEndChainIndex, DataPage compactedDataPage, List<PageAddress> invalidPageAddressList, boolean isSplit, int relatedIndex) {
        int i;
        if (isSplit) {
            if (this.pageIndex.getLogicPage(curPageIndex) != PageIndexHashImpl.WAIT_SPLITTING) {
                if (compactedDataPage != null) {
                    compactedDataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
                return null;
            }
        } else if (logicChainedPage != this.pageIndex.getLogicPage(curPageIndex)) {
            if (compactedDataPage != null) {
                compactedDataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
            }
            return null;
        }
        int compactedPageSize = 0;
        PageAddress compatedPageAddress = null;
        LogicChainedPage compactedLogicChainedPage = this.pageIndex.newLogicChainedPage();
        for (i = 0; i < inclusiveCompactionStartChainIndex; ++i) {
            compactedLogicChainedPage.insertPage(logicChainedPage.getPageAddress(i));
        }
        if (compactedDataPage != null) {
            compatedPageAddress = compactedLogicChainedPage.createPage(oldRequstCount, compactedDataPage);
            compactedPageSize = compactedDataPage.getSize();
        }
        for (i = inclusiveCompactionEndChainIndex + 1; i <= logicChainedPage.getCurrentPageChainIndex(); ++i) {
            compactedLogicChainedPage.insertPage(logicChainedPage.getPageAddress(i));
        }
        compactedLogicChainedPage.addPageSize(logicChainedPage.getPageSize() - oldCompatedPageSize + compactedPageSize);
        this.pageIndex.updateLogicPage(curPageIndex, compactedLogicChainedPage);
        this.cacheManager.getEvictPolicy().removeInvalidPage(this.gRegion, curPageIndex, relatedIndex, invalidPageAddressList);
        if (compactedDataPage != null) {
            this.cacheManager.getEvictPolicy().addEvictablePage(this.gRegion, compatedPageAddress);
        }
        this.gRegionContext.getPageStoreStats().addLogicPageSize(compactedLogicChainedPage.getPageSize() - logicChainedPage.getPageSize());
        this.gRegionContext.getPageStoreStats().addPageUsedMemory(this.gRegion, compactedPageSize - oldMemPageSize);
        this.gRegionContext.getPageStoreStats().addLogicPageChainLen(compactedLogicChainedPage.getCurrentPageChainIndex() - logicChainedPage.getCurrentPageChainIndex());
        this.gRegionContext.getPageStoreStats().addLogicPageChainCapacity(compactedLogicChainedPage.getPageChainCapacity() - logicChainedPage.getPageChainCapacity());
        this.gContext.getSupervisor().discardPage(this.gRegionContext, invalidPageAddressList);
        return compactedLogicChainedPage;
    }

    public void doMinorCompaction(LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, long version, boolean force) {
        PageAddress pageAddress;
        DataPage dataPage2;
        int startCompactionIndex;
        if (logicChainedPage != this.pageIndex.getLogicPage(curPageIndex)) {
            this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(-1);
            return;
        }
        ArrayList<DataPage> canCompactPageListReversedOrder = new ArrayList<DataPage>();
        ArrayList<PageAddress> invalidPageAddressList = new ArrayList<PageAddress>();
        int oldPageSize = 0;
        long oldRequstCount = 0L;
        long lastSumCompactedThreshold = -1L;
        for (startCompactionIndex = curChainIndex; startCompactionIndex >= 0 && (dataPage2 = (pageAddress = logicChainedPage.getPageAddress(startCompactionIndex)).getDataPage()) != null; --startCompactionIndex) {
            if (!force) {
                long compactedCount = dataPage2.getCompactionCount();
                if (lastSumCompactedThreshold == -1L) {
                    lastSumCompactedThreshold = compactedCount;
                } else if (lastSumCompactedThreshold >= compactedCount) {
                    lastSumCompactedThreshold += compactedCount;
                } else {
                    dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                    break;
                }
            }
            oldPageSize += dataPage2.getSize();
            canCompactPageListReversedOrder.add(dataPage2);
            invalidPageAddressList.add(pageAddress);
            oldRequstCount += pageAddress.getRequestCount();
        }
        if (!this.gContext.isDBNormal()) {
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        if (canCompactPageListReversedOrder.size() <= this.inMemoryCompactionThreshold) {
            logicChainedPage.setPageStatus(PageStatus.Compacting, PageStatus.Normal);
            this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(-1);
            canCompactPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
            return;
        }
        int inclusiveCompactionStartChainIndex = startCompactionIndex + 1;
        this.gRegionContext.getPageStoreStats().addMinorCompactedPages(canCompactPageListReversedOrder.size());
        DataPage compactedDataPage = this.doCompactPage(inclusiveCompactionStartChainIndex == 0, canCompactPageListReversedOrder, this.gContext.getCurVersion(), curPageIndex);
        canCompactPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
        long finalOldRequstCount = oldRequstCount;
        int finalOldPageSize = oldPageSize;
        this.getExecutor().submit(() -> {
            try {
                this.pageCompactHandler.doSyncReplace(logicChainedPage, curPageIndex, finalOldPageSize, finalOldPageSize, finalOldRequstCount, inclusiveCompactionStartChainIndex, curChainIndex, compactedDataPage, invalidPageAddressList, curPageIndex);
                this.gRegionContext.getPageStoreStats().addRuningMinorCompactedPages(-1);
            }
            catch (GeminiShutDownException e) {
                LOG.warn("GeminiDB has shutdown!");
            }
        });
    }

    public void doMajorCompaction(LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, long version) {
        if (logicChainedPage != this.pageIndex.getLogicPage(curPageIndex)) {
            this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(-1);
            return;
        }
        ArrayList<DataPage> dataPageListReversedOrder = new ArrayList<DataPage>();
        ArrayList<PageAddress> invalidPageAddressList = new ArrayList<PageAddress>();
        long oldRequstCount = 0L;
        int oldCompactedPageSize = 0;
        int oldMemPageSize = 0;
        for (int cix = curChainIndex; cix >= 0 && this.gContext.isDBNormal(); --cix) {
            PageAddress pageAddress = logicChainedPage.getPageAddress(cix);
            DataPage dataPage2 = pageAddress.getDataPage();
            if (dataPage2 == null) {
                this.cacheManager.getCacheStats().addPageForceFetchByCompactionCount();
                dataPage2 = this.gContext.getSupervisor().getFetchPolicy().fetch(pageAddress, logicChainedPage, cix, this.gRegionContext, false, false);
            } else {
                oldMemPageSize += dataPage2.getSize();
            }
            oldCompactedPageSize += dataPage2.getSize();
            dataPageListReversedOrder.add(dataPage2);
            invalidPageAddressList.add(pageAddress);
            oldRequstCount += pageAddress.getRequestCount();
        }
        if (!this.gContext.isDBNormal()) {
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        if (dataPageListReversedOrder.isEmpty()) {
            throw new GeminiRuntimeException("BUG");
        }
        this.gRegionContext.getPageStoreStats().addMajorCompactedPages(dataPageListReversedOrder.size());
        DataPage compactedDataPage = this.doCompactPage(true, dataPageListReversedOrder, this.gContext.getCurVersion(), curPageIndex);
        dataPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
        long finalOldRequstCount = oldRequstCount;
        int finalOldCompactedPageSize = oldCompactedPageSize;
        int finalOldMemPageSize = oldMemPageSize;
        this.getExecutor().submit(() -> {
            try {
                this.pageCompactHandler.doSyncReplace(logicChainedPage, curPageIndex, finalOldCompactedPageSize, finalOldMemPageSize, finalOldRequstCount, 0, curChainIndex, compactedDataPage, invalidPageAddressList, curPageIndex);
                this.gRegionContext.getPageStoreStats().addRuningMajorCompactedPages(-1);
            }
            catch (GeminiShutDownException e) {
                LOG.warn("GeminiDB has shutdown!");
            }
        });
    }

    public void doMinorCompactionByRead(LogicChainedPage logicChainedPage, int curPageIndex, int curChainIndex, Map<Integer, DataPage> fetchedDataPageMap) {
        int startCompactionIndex;
        if (logicChainedPage != this.pageIndex.getLogicPage(curPageIndex)) {
            this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(-1);
            return;
        }
        ArrayList<DataPage> canCompactPageListReversedOrder = new ArrayList<DataPage>();
        ArrayList<PageAddress> invalidPageAddressList = new ArrayList<PageAddress>();
        int oldCompactedPageSize = 0;
        int oldMemPageSize = 0;
        long oldRequstCount = 0L;
        for (startCompactionIndex = curChainIndex; startCompactionIndex >= 0; --startCompactionIndex) {
            PageAddress pageAddress = logicChainedPage.getPageAddress(startCompactionIndex);
            DataPage dataPage2 = pageAddress.getDataPage();
            if (dataPage2 == null) {
                dataPage2 = fetchedDataPageMap.get(startCompactionIndex);
                if (dataPage2 == null) break;
                dataPage2.addReferenceCount();
            } else {
                oldMemPageSize += dataPage2.getSize();
            }
            oldCompactedPageSize += dataPage2.getSize();
            canCompactPageListReversedOrder.add(dataPage2);
            invalidPageAddressList.add(pageAddress);
            oldRequstCount += pageAddress.getRequestCount();
        }
        if (!this.gContext.isDBNormal()) {
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        if (canCompactPageListReversedOrder.size() < 2) {
            logicChainedPage.setPageStatus(PageStatus.Compacting, PageStatus.Normal);
            this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(-1);
            canCompactPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
            return;
        }
        int inclusiveCompactionStartChainIndex = startCompactionIndex + 1;
        this.gRegionContext.getPageStoreStats().addMinorCompactedPages(canCompactPageListReversedOrder.size());
        DataPage compactedDataPage = this.doCompactPage(inclusiveCompactionStartChainIndex == 0, canCompactPageListReversedOrder, this.gContext.getCurVersion(), curPageIndex);
        canCompactPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
        long finalOldRequstCount = oldRequstCount;
        int finalOldCompactedPageSize = oldCompactedPageSize;
        int finalOldMemPageSize = oldMemPageSize;
        this.getExecutor().submit(() -> {
            try {
                this.pageCompactHandler.doSyncReplace(logicChainedPage, curPageIndex, finalOldCompactedPageSize, finalOldMemPageSize, finalOldRequstCount, inclusiveCompactionStartChainIndex, curChainIndex, compactedDataPage, invalidPageAddressList, curPageIndex);
                this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(-1);
            }
            catch (GeminiShutDownException e) {
                LOG.warn("GeminiDB has shutdown!");
            }
        });
    }

    @Override
    public void splitPage(PageIndexContext pageIndexContext) {
        Tuple2<DataPage, DataPage> splitDataPages;
        LogicChainedPage currentLogicPage = pageIndexContext.getPageID();
        PageIndexContextHashImpl uPageIndexContext = (PageIndexContextHashImpl)pageIndexContext;
        int curBucketNum = uPageIndexContext.getCurBucketNum();
        int curIndex = uPageIndexContext.getCurIndex();
        int destIndex = (curBucketNum = this.pageIndex.getBucketNumASPageFinishSplit(curBucketNum, curIndex)) + curIndex;
        if (this.pageIndex.getLogicPage(destIndex) != PageIndexHashImpl.WAIT_SPLITTING || this.pageIndex.getLogicPage(curIndex) != currentLogicPage) {
            return;
        }
        ArrayList<DataPage> dataPageListReversedOrder = new ArrayList<DataPage>();
        ArrayList<PageAddress> invalidPageAddressList = new ArrayList<PageAddress>();
        long oldRequestNum = 0L;
        int oldCompactedPageSize = 0;
        int oldMemPageSize = 0;
        for (int cix = currentLogicPage.getCurrentPageChainIndex(); cix >= 0 && this.gContext.isDBNormal(); --cix) {
            PageAddress pageAddress = currentLogicPage.getPageAddress(cix);
            DataPage dataPage2 = pageAddress.getDataPage();
            if (dataPage2 == null) {
                this.cacheManager.getCacheStats().addPageForceFetchByCompactionCount();
                dataPage2 = this.gContext.getSupervisor().getFetchPolicy().fetch(pageAddress, currentLogicPage, cix, this.gRegionContext, false, false);
            } else {
                oldMemPageSize += dataPage2.getSize();
            }
            oldCompactedPageSize += dataPage2.getSize();
            dataPageListReversedOrder.add(dataPage2);
            invalidPageAddressList.add(pageAddress);
            oldRequestNum += pageAddress.getRequestCount();
        }
        if (!this.gContext.isDBNormal()) {
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        if (dataPageListReversedOrder.isEmpty()) {
            return;
        }
        DataPage mergeDataPage = this.doCompactPage(true, dataPageListReversedOrder, this.gContext.getCurVersion(), pageIndexContext.getPageIndexID());
        dataPageListReversedOrder.forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
        Tuple2<DataPage, DataPage> tuple2 = splitDataPages = mergeDataPage == null ? new Tuple2<DataPage, DataPage>(null, null) : mergeDataPage.split(curBucketNum, curIndex, this.gContext.getSupervisor().getAllocator(), this.gContext.getInPageGCompressAlgorithm());
        if (mergeDataPage != null) {
            mergeDataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
        }
        if (splitDataPages.f1 == null && splitDataPages.f0 != null) {
            this.doSyncReplaceLogicPage(currentLogicPage, curIndex, oldCompactedPageSize, oldMemPageSize, oldRequestNum, 0, currentLogicPage.getCurrentPageChainIndex(), (DataPage)splitDataPages.f0, invalidPageAddressList, false, destIndex);
            this.pageIndex.updateLogicPage(destIndex, PageIndexHashImpl.NO_PAGE);
            return;
        }
        if (splitDataPages.f0 == null && splitDataPages.f1 != null) {
            this.doSyncReplaceLogicPage(currentLogicPage, destIndex, oldCompactedPageSize, oldMemPageSize, oldRequestNum, 0, currentLogicPage.getCurrentPageChainIndex(), (DataPage)splitDataPages.f1, invalidPageAddressList, true, curIndex);
            this.pageIndex.updateLogicPage(curIndex, PageIndexHashImpl.NO_PAGE);
            return;
        }
        if (splitDataPages.f0 == null && splitDataPages.f1 == null) {
            this.pageIndex.updateLogicPage(destIndex, PageIndexHashImpl.NO_PAGE);
            this.pageIndex.updateLogicPage(curIndex, PageIndexHashImpl.NO_PAGE);
            this.cacheManager.getEvictPolicy().removeInvalidPage(this.gRegion, curIndex, destIndex, invalidPageAddressList);
            this.gRegionContext.getPageStoreStats().addLogicPageCount(-1);
            this.gRegionContext.getPageStoreStats().addLogicPageChainLen(0 - currentLogicPage.getCurrentPageChainIndex() - 1);
            this.gRegionContext.getPageStoreStats().addLogicPageChainCapacity(0 - currentLogicPage.getPageChainCapacity());
            this.gRegionContext.getPageStoreStats().addLogicPageSize(0 - currentLogicPage.getPageSize());
            this.gRegionContext.getPageStoreStats().addPageUsedMemory(this.gRegion, 0 - oldMemPageSize);
            this.gContext.getSupervisor().discardPage(this.gRegionContext, invalidPageAddressList);
            return;
        }
        LogicChainedPage pageSpit1 = this.pageIndex.newLogicChainedPage();
        LogicChainedPage pageSpit2 = this.pageIndex.newLogicChainedPage();
        PageAddress pageAddressSplit1 = pageSpit1.createPage(oldRequestNum / 2L, (DataPage)splitDataPages.f0);
        PageAddress pageAddressSplit2 = pageSpit2.createPage(oldRequestNum - oldRequestNum / 2L, (DataPage)splitDataPages.f1);
        pageSpit1.addPageSize(pageAddressSplit1.getDataLen());
        pageSpit2.addPageSize(pageAddressSplit2.getDataLen());
        this.pageIndex.updateLogicPage(destIndex, pageSpit2);
        this.pageIndex.updateLogicPage(curIndex, pageSpit1);
        this.cacheManager.getEvictPolicy().removeInvalidPage(this.gRegion, curIndex, destIndex, invalidPageAddressList);
        this.cacheManager.getEvictPolicy().addEvictablePage(this.gRegion, pageAddressSplit1);
        this.cacheManager.getEvictPolicy().addEvictablePage(this.gRegion, pageAddressSplit2);
        this.gRegionContext.getPageStoreStats().addLogicPageCount(1);
        this.gRegionContext.getPageStoreStats().addLogicPageChainLen(2 - currentLogicPage.getCurrentPageChainIndex() - 1);
        this.gRegionContext.getPageStoreStats().addLogicPageChainCapacity(pageSpit1.getPageChainCapacity() + pageSpit2.getPageChainCapacity() - currentLogicPage.getPageChainCapacity());
        this.gRegionContext.getPageStoreStats().addLogicPageSize(pageSpit2.getPageSize() + pageSpit1.getPageSize() - currentLogicPage.getPageSize());
        this.gRegionContext.getPageStoreStats().addPageUsedMemory(this.gRegion, pageSpit2.getPageSize() + pageSpit1.getPageSize() - oldMemPageSize);
        this.gContext.getSupervisor().discardPage(this.gRegionContext, invalidPageAddressList);
    }

    @Override
    public void mergePage(PageIndexContext pageIndexContextFirst, PageIndexContext pageIndexContextSecond) {
        PageIndexContextHashImpl uPageIndexContextFirst = (PageIndexContextHashImpl)pageIndexContextFirst;
        int curIndexFirst = uPageIndexContextFirst.getCurIndex();
        if (pageIndexContextFirst != this.pageIndex.getLogicPage(curIndexFirst)) {
            return;
        }
    }

    private void doWriteDataToPage(PageIndexContext pageIndexContext, List<Tuple2<K, GSValue<V>>> dataSet, long version) {
        LogicChainedPage currentLogicPageID = pageIndexContext.getPageID();
        long newRequestCount = this.getRequestCount(dataSet);
        DataPage newDataPage = this.doCreateDataPage(version, dataSet, pageIndexContext.getPageIndexID());
        if (newDataPage == null) {
            LOG.warn("doWriteDataToPage write empty value");
        } else {
            PageAddress pageAddress = this.helpAddDataPage(currentLogicPageID, newRequestCount, newDataPage);
            if (this.cacheManager.getMemWaterMark(0) == WaterMark.High) {
                this.cacheManager.getEvictPolicy().addEvictablePage(this.gRegion, pageAddress);
            }
            int dataSize = newDataPage.getSize();
            currentLogicPageID.addPageSize(dataSize);
            this.gRegionContext.getPageStoreStats().addLogicPageSize(dataSize);
            this.gRegionContext.getPageStoreStats().addPageUsedMemory(this.gRegion, dataSize);
            this.gRegionContext.getPageStoreStats().addPageRequestCount(newRequestCount);
            this.gRegionContext.getPageStoreStats().addPage();
        }
        this.compactPage(pageIndexContext, version);
    }

    private PageAddress helpAddDataPage(LogicChainedPage currentLogicPageID, long newRequestCount, DataPage dataPage) {
        int oldChainCapacity = currentLogicPageID.getPageChainCapacity();
        PageAddress result = currentLogicPageID.createPage(newRequestCount, dataPage);
        int changeCapacity = currentLogicPageID.getPageChainCapacity() - oldChainCapacity;
        this.gRegionContext.getPageStoreStats().addLogicPageChainLen(1);
        this.gRegionContext.getPageStoreStats().addLogicPageChainCapacity(changeCapacity);
        return result;
    }

    @Override
    public void checkResource() {
        if (this.cacheManager.forbidIndexExpand()) {
            LOG.debug("cacheManager forbid index to expand.");
            return;
        }
        if (this.gRegionContext.getPageStoreStats().getLogicPageCount() == 0) {
            LOG.debug("no page here");
            return;
        }
        if (this.gRegionContext.getPageStoreStats().getLogicPageCount() * 2 <= this.gRegionContext.getPageStoreStats().getIndexCapacity()) {
            LOG.debug("page count {} * 2 less than index capacity {}, not to expand index", (Object)this.gRegionContext.getPageStoreStats().getLogicPageCount(), (Object)this.gRegionContext.getPageStoreStats().getIndexCapacity());
            return;
        }
        WaterMark waterMark = this.cacheManager.getIndexCapacityWaterMark();
        int spilledPageSizeThreshold = waterMark == WaterMark.High ? this.spilledPageSizeThresholdHigh : (waterMark == WaterMark.Low ? this.spilledPageSizeThresholdMiddle : this.spilledPageSizeThresholdLow);
        int averagePageSize = (int)this.gRegionContext.getPageStoreStats().getLogicPageSize() / this.gRegionContext.getPageStoreStats().getLogicPageCount();
        if (averagePageSize >= spilledPageSizeThreshold) {
            this.pageIndex.expand();
            LOG.info("averagePageSize {}, spilledPageSizeThreshold {}, to expand index up to {}", new Object[]{averagePageSize, spilledPageSizeThreshold, this.gRegionContext.getPageStoreStats().getIndexCapacity()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void tryLaunchCompactionByRead(PageIndexContext pageIndexContext, LogicChainedPage logicPageID, Map<Integer, DataPage> fetchedDataPageMap) {
        boolean releaseFetchMap = true;
        try {
            if (logicPageID.getCurrentPageChainIndex() > this.inMemoryCompactionThreshold && fetchedDataPageMap.size() > 1 && logicPageID.getPageStatus().canCompaction()) {
                this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(1);
                if (this.cacheManager.getCacheStats().getRuningMinorCompactionByRead() > this.maxRunningMinorCompaction) {
                    this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(-1);
                } else {
                    if (!logicPageID.setPageStatus(PageStatus.Normal, PageStatus.Compacting)) {
                        this.gRegionContext.getPageStoreStats().addRuningMinorCompactionByRead(-1);
                        return;
                    }
                    int curChainIndex = logicPageID.getCurrentPageChainIndex();
                    int curPageIndex = pageIndexContext.getPageIndexID();
                    EventExecutor eventExecutor = this.gContext.getSupervisor().getCompactionExecutorGroup().next();
                    releaseFetchMap = false;
                    eventExecutor.submit(() -> {
                        try {
                            this.pageCompactHandler.doAsyncMinorCompactionByRead(logicPageID, curPageIndex, curChainIndex, fetchedDataPageMap);
                        }
                        catch (GeminiShutDownException e) {
                            LOG.debug("GeminiDB has shutdown!", (Throwable)e);
                        }
                        catch (Exception e) {
                            LOG.error("async minor compaction by read failed", (Throwable)e);
                        }
                        finally {
                            fetchedDataPageMap.values().forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
                        }
                    });
                }
            }
        }
        finally {
            if (releaseFetchMap) {
                fetchedDataPageMap.values().forEach(dataPage -> dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
            }
        }
    }

    protected DataPage doCompactPageForStructureValue(boolean isMajor, List<DataPage> canCompactPageListReversedOrder, long version, int logicPageId) {
        int index;
        ArrayList compactionListReversedOrder = new ArrayList();
        for (DataPage dataPage : canCompactPageListReversedOrder) {
            compactionListReversedOrder.add(dataPage.getGBinaryHashMap());
        }
        HashMap<BinaryKey, ArrayList> newMap = new HashMap<BinaryKey, ArrayList>(((GBinaryHashMap)compactionListReversedOrder.get(index)).keyCount());
        long compactionCount = 0L;
        StateFilter stateFilter = this.gRegionContext.getGContext().getStateFilter();
        for (index = compactionListReversedOrder.size() - 1; index >= 0; --index) {
            for (Map.Entry<BinaryKey, BinaryValue> entry : ((GBinaryHashMap)compactionListReversedOrder.get(index)).getBinaryMap().entrySet()) {
                if (isMajor && stateFilter != null && stateFilter.filter(this.gRegionContext, entry.getValue().getSeqID())) continue;
                if (entry.getValue().getgValueType() == GValueType.Delete) {
                    if (isMajor) {
                        newMap.remove(entry.getKey());
                        continue;
                    }
                    newMap.put(entry.getKey(), Lists.newArrayList((Object[])new BinaryValue[]{entry.getValue()}));
                    continue;
                }
                if (entry.getValue().getgValueType() == GValueType.PutMap || entry.getValue().getgValueType() == GValueType.PutList) {
                    newMap.put(entry.getKey(), Lists.newArrayList((Object[])new BinaryValue[]{entry.getValue()}));
                    continue;
                }
                if (newMap.containsKey(entry.getKey())) {
                    ((List)newMap.get(entry.getKey())).add(entry.getValue());
                    continue;
                }
                newMap.put(entry.getKey(), Lists.newArrayList((Object[])new BinaryValue[]{entry.getValue()}));
            }
            compactionCount += ((GBinaryHashMap)compactionListReversedOrder.get(index)).getCompactionCount();
        }
        HashMap<BinaryKey, BinaryValue> finalCompactedMap = new HashMap<BinaryKey, BinaryValue>(newMap.size());
        for (Map.Entry entry : newMap.entrySet()) {
            if (((List)entry.getValue()).size() == 0) {
                throw new GeminiRuntimeException("Internal BUG!");
            }
            BinaryValue compactedBinaryValue = ((List)entry.getValue()).size() == 1 && !isMajor ? (BinaryValue)((List)entry.getValue()).get(0) : this.doCompactValue((List)entry.getValue(), isMajor, version, logicPageId);
            finalCompactedMap.put((BinaryKey)entry.getKey(), compactedBinaryValue);
        }
        return this.doBuildDataPageFromGBinaryMap(isMajor, version, logicPageId, this.pageSerdeFlink.getKeySerde(), finalCompactedMap, compactionCount);
    }

    protected DataPage getDataPageAutoLoadIfNeed(LogicChainedPage logicPageID, int curIndex, Map<Integer, DataPage> fetchedDataPageMap) {
        PageAddress pageAddress = logicPageID.getPageAddress(curIndex);
        DataPage dataPage = pageAddress.getDataPage();
        if (dataPage == null) {
            this.cacheManager.getCacheStats().addPageCacheMissCount();
            dataPage = this.gContext.getSupervisor().getFetchPolicy().fetch(pageAddress, logicPageID, curIndex, this.gRegionContext, this.gRegionContext.getGContext().getGConfiguration().getEnablePrefetch(), true);
            fetchedDataPageMap.put(curIndex, dataPage);
            dataPage.addReferenceCount();
        } else {
            this.cacheManager.getCacheStats().addPageCacheHitCount();
        }
        return dataPage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void allKeysIncludeDeleted(Set<K> allKeysIncludeDelete) {
        LogicChainedPage[] chains;
        for (LogicChainedPage logicChainedPage : chains = this.pageIndex.getPageIndex()) {
            int numPages;
            if (this.isNullPage(logicChainedPage)) continue;
            for (int i = numPages = logicChainedPage.getCurrentPageChainIndex(); i >= 0; --i) {
                PageAddress pageAddress = logicChainedPage.getPageAddress(i);
                DataPage dataPage = pageAddress.getDataPage();
                try {
                    if (dataPage == null) {
                        this.cacheManager.getCacheStats().addPageCacheMissCount();
                        dataPage = this.gContext.getSupervisor().getFetchPolicy().fetch(pageAddress, logicChainedPage, i, this.gRegionContext, this.gRegionContext.getGContext().getGConfiguration().getEnablePrefetch(), false);
                    } else {
                        this.cacheManager.getCacheStats().addPageCacheHitCount();
                    }
                    allKeysIncludeDelete.addAll(dataPage.getPOJOSet());
                    continue;
                }
                finally {
                    if (dataPage != null) {
                        dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                    }
                }
            }
        }
    }

    abstract long getRequestCount(List<Tuple2<K, GSValue<V>>> var1);

    abstract DataPage doCreateDataPage(long var1, List<Tuple2<K, GSValue<V>>> var3, int var4);

    @VisibleForTesting
    public abstract DataPage doCompactPage(boolean var1, List<DataPage> var2, long var3, int var5);

    abstract BinaryValue doCompactValue(List<BinaryValue> var1, boolean var2, long var3, int var5);

    protected abstract DataPage doBuildDataPageFromGBinaryMap(boolean var1, long var2, int var4, TypeSerializer<K> var5, Map<BinaryKey, BinaryValue> var6, long var7);

    protected boolean isNullPage(LogicChainedPage logicPageID) {
        return logicPageID == null || logicPageID.getCurrentPageChainIndex() == -1;
    }
}

