package org.apache.flink.runtime.state.gemini.engine.page;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiShutDownException;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryKey;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinaryHashMap;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/page/PageStoreHashKVImpl.class */
public class PageStoreHashKVImpl<K, V> extends AbstractHashPageStore<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(PageStoreHashKVImpl.class);

    public PageStoreHashKVImpl(GRegion gRegion, EventExecutor eventExecutor) {
        super(gRegion, eventExecutor);
    }

    public PageStoreHashKVImpl(GRegion gRegion, PageIndex pageIndex, EventExecutor eventExecutor) {
        super(gRegion, pageIndex, eventExecutor);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.PageStore, org.apache.flink.runtime.state.gemini.engine.page.PageStoreKList
    public V get(K k) {
        PageIndexContext pageIndexContext = this.pageIndex.getPageIndexContext(k, false);
        LogicChainedPage pageID = pageIndexContext.getPageID();
        if (isNullPage(pageID)) {
            return null;
        }
        int currentPageChainIndex = pageID.getCurrentPageChainIndex();
        V v = null;
        HashMap hashMap = new HashMap();
        while (true) {
            if (currentPageChainIndex < 0 || !this.gContext.isDBNormal()) {
                break;
            }
            DataPage dataPageAutoLoadIfNeed = getDataPageAutoLoadIfNeed(pageID, currentPageChainIndex, hashMap);
            pageID.getPageAddress(currentPageChainIndex).addRequestCount(1);
            this.gRegionContext.getPageStoreStats().addPageRequestCount(1L);
            GSValue<V> gSValue = dataPageAutoLoadIfNeed.get(k);
            dataPageAutoLoadIfNeed.delReferenceCount(ReferenceCount.ReleaseType.Normal);
            if (gSValue == null) {
                currentPageChainIndex--;
            } else if (gSValue.getValueType() != GValueType.Delete && !this.gRegionContext.filterState(gSValue.getSeqID())) {
                v = gSValue.getValue();
            }
        }
        if (!this.gContext.isDBNormal()) {
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        tryLaunchCompactionByRead(pageIndexContext, pageID, hashMap);
        return v;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.PageStore
    public void getAll(Map<K, GSValue<V>> map) {
        for (LogicChainedPage logicChainedPage : this.pageIndex.getPageIndex()) {
            if (!isNullPage(logicChainedPage)) {
                for (int currentPageChainIndex = logicChainedPage.getCurrentPageChainIndex(); currentPageChainIndex >= 0; currentPageChainIndex--) {
                    PageAddress pageAddress = logicChainedPage.getPageAddress(currentPageChainIndex);
                    DataPage dataPage = pageAddress.getDataPage();
                    if (dataPage == null) {
                        try {
                            this.cacheManager.getCacheStats().addPageCacheMissCount();
                            dataPage = this.gContext.getSupervisor().getFetchPolicy().fetch(pageAddress, logicChainedPage, currentPageChainIndex, this.gRegionContext, this.gRegionContext.getGContext().getGConfiguration().getEnablePrefetch(), false);
                        } finally {
                            if (dataPage != null) {
                                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                            }
                        }
                    } else {
                        this.cacheManager.getCacheStats().addPageCacheHitCount();
                    }
                    for (Map.Entry<K, GSValue<V>> entry : dataPage.getPOJOMap().entrySet()) {
                        if (!this.gRegionContext.filterState(entry.getValue().getSeqID())) {
                            map.putIfAbsent(entry.getKey(), entry.getValue());
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore
    public DataPage doCompactPage(boolean z, List<DataPage> list, long j, int i) {
        Map<BinaryKey, BinaryValue> binaryMap;
        if (list == null || list.size() == 0) {
            throw new GeminiRuntimeException("Interal BUG");
        }
        ArrayList arrayList = new ArrayList();
        Iterator<DataPage> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getGBinaryHashMap());
        }
        int size = arrayList.size() - 1;
        long j2 = 0;
        if (this.gContext.hasTtl()) {
            binaryMap = new HashMap();
        } else {
            binaryMap = ((GBinaryHashMap) arrayList.get(size)).getBinaryMap();
            j2 = 0 + ((GBinaryHashMap) arrayList.get(size)).getCompactionCount();
            size--;
        }
        while (size >= 0) {
            binaryMap.putAll(((GBinaryHashMap) arrayList.get(size)).getBinaryMap());
            j2 += ((GBinaryHashMap) arrayList.get(size)).getCompactionCount();
            size--;
        }
        GBinaryHashMap ofBinaryList = GBinaryHashMap.ofBinaryList(DataPage.DataPageType.KV, z, j, i, this.pageSerdeFlink.getKeySerde(), this.gContext.getSupervisor().getAllocator(), binaryMap, j2, this.gContext.getStateFilter(), this.gRegionContext);
        if (ofBinaryList == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP) {
            return null;
        }
        return new DataPageKVImpl(ofBinaryList, this.pageSerdeFlink.getValueSerde());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore
    BinaryValue doCompactValue(List<BinaryValue> list, boolean z, long j, int i) {
        throw new GeminiRuntimeException("Internal Bug");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore
    protected DataPage doBuildDataPageFromGBinaryMap(boolean z, long j, int i, TypeSerializer<K> typeSerializer, Map<BinaryKey, BinaryValue> map, long j2) {
        throw new GeminiRuntimeException("Internal Bug");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore
    long getRequestCount(List<Tuple2<K, GSValue<V>>> list) {
        return ((Integer) list.stream().map(tuple2 -> {
            return Integer.valueOf(((GSValue) tuple2.f1).getRequestCount());
        }).reduce(0, (num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        })).intValue();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore
    DataPage doCreateDataPage(long j, List<Tuple2<K, GSValue<V>>> list, int i) {
        GBinaryHashMap of = GBinaryHashMap.of(DataPage.DataPageType.KV, list, this.pageSerdeFlink.getKeySerde(), this.pageSerdeFlink.getValueSerde(), j, i, this.gContext.getSupervisor().getAllocator(), 1L, this.gContext.getInPageGCompressAlgorithm());
        if (of == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP) {
            return null;
        }
        return new DataPageKVImpl(of, this.pageSerdeFlink.getValueSerde());
    }
}
