/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.page;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiShutDownException;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.page.AbstractHashPageStore;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageKMap;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageKMapImpl;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexContext;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink2Key;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink2KeyImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageStoreKMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryKey;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.ByteBufferDataInputView;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinaryHashMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinarySplitHashMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBufferAddressMapping;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;

public class PageStoreHashKMapImpl<K, MK, MV>
extends AbstractHashPageStore<K, Map<MK, GSValue<MV>>>
implements PageStoreKMap<K, MK, MV> {
    protected final PageSerdeFlink2Key<K, MK, MV> pageSerdeFlink2Key;
    protected final boolean dataPageAllowMapSplit;
    protected final int mapSplitSizeThreshold;
    protected final int mapSplitSubMapSize;

    public PageStoreHashKMapImpl(GRegion gRegion, EventExecutor eventExecutor) {
        this(gRegion, null, eventExecutor);
    }

    public PageStoreHashKMapImpl(GRegion gRegion, PageIndex pageIndex, EventExecutor eventExecutor) {
        this(DataPage.DataPageType.KHashMap, gRegion, pageIndex, eventExecutor);
    }

    protected PageStoreHashKMapImpl(DataPage.DataPageType dataPageType, GRegion gRegion, PageIndex pageIndex, EventExecutor eventExecutor) {
        super(dataPageType, gRegion, pageIndex, eventExecutor);
        Preconditions.checkArgument((boolean)dataPageType.isKMapType(), (Object)"The data page type should be a KMap type.");
        this.pageSerdeFlink2Key = (PageSerdeFlink2KeyImpl)this.gRegionContext.getPageSerdeFlink();
        this.dataPageAllowMapSplit = gRegion.getGRegionContext().getGContext().getGConfiguration().getAllowMapSplit();
        this.mapSplitSizeThreshold = gRegion.getGRegionContext().getGContext().getGConfiguration().getMapSplitSizeThreshold();
        this.mapSplitSubMapSize = gRegion.getGRegionContext().getGContext().getGConfiguration().getMapSplitSubMapSize();
    }

    @Override
    public boolean contains(K key) {
        Object result = this.get((Object)key);
        return this.get((Object)key) != null && !result.isEmpty();
    }

    @Override
    long getRequestCount(List<Tuple2<K, GSValue<Map<MK, GSValue<MV>>>>> dataSet) {
        return dataSet.stream().map(value -> ((GSValue)value.f1).getRequestCount()).reduce(0, (a, b) -> a + b).intValue();
    }

    @Override
    DataPage<K, Map<MK, GSValue<MV>>> createDataPage(long version, List<Tuple2<K, GSValue<Map<MK, GSValue<MV>>>>> dataSet, int logicPageId) {
        GBinaryHashMap gBinaryHashMap = GBinaryHashMap.of(this.getDataPageType(), dataSet, this.pageSerdeFlink.getKeySerde(), this.pageSerdeFlink2Key.getMapValueTypeSerializer(), version, logicPageId, this.gContext.getSupervisor().getAllocator(), 1L, this.gRegionContext.getGContext().getInPageGCompressAlgorithm());
        return gBinaryHashMap == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP ? null : new DataPageKMapImpl(gBinaryHashMap, this.pageSerdeFlink2Key.getKey2Serde(), this.pageSerdeFlink2Key.getValueSerde(), this.pageSerdeFlink2Key.getMapValueTypeSerializer());
    }

    @Override
    public Map<MK, GSValue<MV>> get(K key) {
        PageIndexContext pageIndexContext = this.pageIndex.getPageIndexContext(key, false);
        LogicalPageChain logicPageID = pageIndexContext.getLogicalPageChain();
        if (this.isNullPage(logicPageID)) {
            return null;
        }
        Map<MK, GSValue<MV>> finalResult = null;
        int curIndex = logicPageID.getCurrentPageChainIndex();
        ArrayList<BinaryValue> binaryValueReversedOrderList = new ArrayList<BinaryValue>();
        HashMap<Integer, DataPage> fetchedDataPageMap = new HashMap<Integer, DataPage>();
        ArrayList<DataPage> needToRelease = new ArrayList<DataPage>();
        while (curIndex >= 0 && this.gContext.isDBNormal()) {
            DataPage dataPage = this.getDataPageAutoLoadIfNeed(key, logicPageID, pageIndexContext.getPageIndexID(), curIndex, fetchedDataPageMap);
            if (dataPage == null) {
                --curIndex;
                continue;
            }
            needToRelease.add(dataPage);
            if (!(dataPage instanceof DataPageKMap)) {
                needToRelease.forEach(page -> page.release());
                fetchedDataPageMap.values().forEach(page -> page.release());
                throw new IllegalArgumentException("Internal BUG, error page");
            }
            dataPage.addRequestCount(this.cacheManager.getCurrentTickTime(), 1);
            this.gRegionContext.getPageStoreStats().addPageRequestCount(1L);
            BinaryValue binaryValue = dataPage.getBinaryValue(key);
            if (binaryValue != null) {
                if (binaryValue.getGValueType() == GValueType.Delete) break;
                binaryValueReversedOrderList.add(binaryValue);
                if (binaryValue.getGValueType() == GValueType.PutMap) break;
            }
            --curIndex;
        }
        if (!this.gContext.isDBNormal()) {
            needToRelease.forEach(page -> page.release());
            fetchedDataPageMap.values().forEach(page -> page.release());
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        finalResult = binaryValueReversedOrderList.size() == 0 ? null : this.doCompactValueToPOJO(binaryValueReversedOrderList);
        needToRelease.forEach(page -> page.release());
        this.tryLaunchCompactionByRead(pageIndexContext, logicPageID, fetchedDataPageMap);
        return finalResult;
    }

    @Override
    public MV get(K key, MK mapKey) {
        PageIndexContext pageIndexContext = this.pageIndex.getPageIndexContext(key, false);
        LogicalPageChain logicPageID = pageIndexContext.getLogicalPageChain();
        if (this.isNullPage(logicPageID)) {
            return null;
        }
        int curIndex = logicPageID.getCurrentPageChainIndex();
        MV finalResult = null;
        HashMap<Integer, DataPage> fetchedDataPageMap = new HashMap<Integer, DataPage>(curIndex);
        ArrayList<DataPage> needToRelease = new ArrayList<DataPage>();
        while (curIndex >= 0 && this.gContext.isDBNormal()) {
            DataPage dataPage = this.getDataPageAutoLoadIfNeed(key, logicPageID, pageIndexContext.getPageIndexID(), curIndex, fetchedDataPageMap);
            if (dataPage == null) {
                --curIndex;
                continue;
            }
            needToRelease.add(dataPage);
            if (!(dataPage instanceof DataPageKMap)) {
                needToRelease.forEach(page -> page.release());
                fetchedDataPageMap.values().forEach(page -> page.release());
                throw new IllegalArgumentException("Internal BUG, error page");
            }
            dataPage.addRequestCount(this.cacheManager.getCurrentTickTime(), 1);
            this.gRegionContext.getPageStoreStats().addPageRequestCount(1L);
            GSValue result = ((DataPageKMap)dataPage).get(key, mapKey);
            if (result != null) {
                if (result.getValueType() == GValueType.Delete || this.gRegionContext.filterState(result.getSeqID())) break;
                finalResult = result.getValue();
                break;
            }
            --curIndex;
        }
        needToRelease.forEach(page -> page.release());
        if (!this.gContext.isDBNormal()) {
            fetchedDataPageMap.values().forEach(page -> page.release());
            throw new GeminiShutDownException("DB is in abnormal status " + this.gContext.getDBStatus().name());
        }
        this.tryLaunchCompactionByRead(pageIndexContext, logicPageID, fetchedDataPageMap);
        return finalResult;
    }

    @Override
    public void getAll(Map<K, GSValue<Map<MK, GSValue<MV>>>> container) {
    }

    @Override
    public boolean contains(K key, MK mapKey) {
        return this.get(key, mapKey) != null;
    }

    @Override
    public DataPage doCompactPage(PageIndexContext pageIndexContext, boolean isMajor, List<DataPage> canCompactPageListReversedOrder, long version, int logicPageId) {
        if (canCompactPageListReversedOrder == null || canCompactPageListReversedOrder.size() == 0) {
            throw new GeminiRuntimeException("Internal BUG");
        }
        return this.doCompactPageForStructureValue(pageIndexContext, isMajor, canCompactPageListReversedOrder, version, logicPageId);
    }

    @Override
    protected boolean isAllowSubPage() {
        return this.dataPageAllowMapSplit;
    }

    @Override
    BinaryValue doCompactValue(List<BinaryValue> binaryValueList, boolean isMajor, long version, int logicPageId, GBufferAddressMapping pageMapping) {
        return DataPageKMapImpl.doCompactionMapValue(binaryValueList, this.pageSerdeFlink2Key.getKey2Serde(), this.pageSerdeFlink2Key.getValueSerde(), isMajor, version, logicPageId, this.gContext.getSupervisor().getDefaultAllocator(), this.gContext.getStateFilter(), this.gRegionContext, pageMapping, this.dataPageAllowMapSplit, this.mapSplitSizeThreshold, this.mapSplitSubMapSize);
    }

    Map<MK, GSValue<MV>> doCompactValueToPOJO(List<BinaryValue> binaryValueReversedOrderList) {
        Map<BinaryKey, BinaryValue> binaryValueMap = DataPageKMapImpl.doCompactValueToBinaryMap(binaryValueReversedOrderList, this.pageSerdeFlink2Key.getKey2Serde());
        HashMap<MK, GSValue<MV>> result = new HashMap<MK, GSValue<MV>>(binaryValueMap.size());
        for (Map.Entry<BinaryKey, BinaryValue> entry : binaryValueMap.entrySet()) {
            if (entry.getValue() == null || entry.getValue().getGValueType() == GValueType.Delete) continue;
            result.put(this.getMKeyFromBinary(entry.getKey()), this.getMValueFromBinary(entry.getValue()));
        }
        return result;
    }

    protected GSValue<MV> getMValueFromBinary(BinaryValue binaryValue) {
        if (binaryValue == null) {
            return null;
        }
        if (binaryValue.getGValueType() == GValueType.Delete) {
            return new GSValue<Object>(null, GValueType.Delete, binaryValue.getSeqID());
        }
        try {
            ByteBufferDataInputView byteBufferDataInputView = new ByteBufferDataInputView(binaryValue.getBb(), binaryValue.getValueOffset(), binaryValue.getValueLen());
            Object value = this.pageSerdeFlink2Key.getValueSerde().deserialize((DataInputView)byteBufferDataInputView);
            return new GSValue<Object>(value, binaryValue.getGValueType(), binaryValue.getSeqID());
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("Exception: " + e.getMessage(), e);
        }
    }

    protected MK getMKeyFromBinary(BinaryKey key) {
        if (key == null) {
            throw new GeminiRuntimeException("key can't be null");
        }
        try {
            ByteBufferDataInputView byteBufferDataInputView = new ByteBufferDataInputView(key.getBb(), key.getKeyOffset(), key.getKeyLen());
            return (MK)this.pageSerdeFlink2Key.getKey2Serde().deserialize((DataInputView)byteBufferDataInputView);
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("Exception: " + e.getMessage(), e);
        }
    }

    @Override
    protected DataPage doBuildDataPageFromGBinaryMap(boolean isMajor, long version, int logicPageId, TypeSerializer<K> keySerde, Map<BinaryKey, BinaryValue> finalCompactedMap, long compactionCount, GBufferAddressMapping pageMapping) {
        GBinaryHashMap gBinaryHashMap = GBinaryHashMap.ofBinaryMap(this.getDataPageType(), isMajor, version, logicPageId, this.pageSerdeFlink.getKeySerde(), this.gContext.getSupervisor().getAllocator(), finalCompactedMap, compactionCount, this.gContext.getStateFilter(), this.gRegionContext);
        if (gBinaryHashMap != GBinaryHashMap.EMPTY_G_BINARY_HASHMAP && pageMapping != null && !pageMapping.isEmpty()) {
            gBinaryHashMap = new GBinarySplitHashMap(gBinaryHashMap, pageMapping);
        }
        return gBinaryHashMap == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP ? null : new DataPageKMapImpl(gBinaryHashMap, this.pageSerdeFlink2Key.getKey2Serde(), this.pageSerdeFlink2Key.getValueSerde(), this.pageSerdeFlink2Key.getMapValueTypeSerializer());
    }

    @Override
    DataPage<K, Map<MK, GSValue<MV>>> boxDataPage(PageAddress pageAddress, GByteBuffer byteBuffer, int logicPageChainIndex, int logicPageChainHashCode) {
        this.checkDataPageTypeToBox(byteBuffer);
        GBinaryHashMap gBinaryHashMap = new GBinaryHashMap(byteBuffer, this.pageSerdeFlink.getKeySerde(), pageAddress instanceof PageAddressCompositeImpl ? ((PageAddressCompositeImpl)pageAddress).getMainPageAddress().getChecksum() : pageAddress.getChecksum());
        return gBinaryHashMap == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP ? null : new DataPageKMapImpl(pageAddress.toBoxGBinaryHashMap(gBinaryHashMap, this.gRegionContext, logicPageChainIndex, logicPageChainHashCode), this.pageSerdeFlink2Key.getKey2Serde(), this.pageSerdeFlink2Key.getValueSerde(), this.pageSerdeFlink2Key.getMapValueTypeSerializer());
    }
}

