/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.page;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filter.StateFilter;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValueMap;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValueMapEntry;
import org.apache.flink.runtime.state.gemini.engine.page.AbstractGMapValueTypeSerializer;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageKMapImpl;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageKSortedMap;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageSortedSubPageImpl;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink2Key;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryKey;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValueForSplit;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValueImpl;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.ByteBufferDataInputView;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinaryHashMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinarySortedMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinarySplitHashMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBufferAddressMapping;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GComparator;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GSortedRoutingValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.SplitHashMapValueHelper;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.SplitSortedMapValueHelper;
import org.apache.flink.runtime.state.gemini.engine.rm.Allocator;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.util.Preconditions;

public class DataPageKSortedMapImpl<K, MK, MV>
extends DataPageKMapImpl<K, MK, MV>
implements DataPageKSortedMap<K, MK, MV> {
    private final GComparator<MK> gComparator;

    public DataPageKSortedMapImpl(GBinaryHashMap<K> gBinaryHashMap, TypeSerializer<MK> mkTypeSerializer, TypeSerializer<MV> mvTypeSerializer, AbstractGMapValueTypeSerializer<MK, MV> gSortedMapValueTypeSerialiZer, GComparator<MK> gComparator) {
        super(gBinaryHashMap, mkTypeSerializer, mvTypeSerializer, gSortedMapValueTypeSerialiZer);
        this.gComparator = gComparator;
    }

    protected GBinarySortedMap<MK> getBinaryMap(GByteBuffer valueBB) {
        return new GBinarySortedMap<MK>(valueBB.getByteBuffer(), this.mkTypeSerializer, this.gComparator);
    }

    @Override
    public GSValueMapEntry<MK, MV> firstEntry(K key) {
        return this.getFirstOrLastEntry(key, true);
    }

    @Override
    public GSValueMapEntry<MK, MV> lastEntry(K key) {
        return this.getFirstOrLastEntry(key, false);
    }

    private GSValueMapEntry<MK, MV> getFirstOrLastEntry(K key, boolean first) {
        try {
            Object mkey;
            GBinarySortedMap<MK> sortedMap;
            BinaryValue binaryValue = this.gBinaryHashMap.get(key);
            if (binaryValue == null) {
                return null;
            }
            GValueType mapType = binaryValue.getGValueType();
            if (binaryValue.getGValueType() == GValueType.Delete) {
                return new GSValueMapEntry(null, mapType, binaryValue.getSeqID());
            }
            if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                Preconditions.checkState((boolean)(this.gBinaryHashMap instanceof GBinarySplitHashMap), (Object)"Internal bug.");
                sortedMap = GSortedRoutingValue.getFirstOrLastSubGBinarySortedMap(binaryValue, this.mkTypeSerializer, this.gComparator, first);
            } else {
                sortedMap = this.getBinaryMap(DataPageKSortedMapImpl.getDuplicateBB(binaryValue));
            }
            Object MK = mkey = first ? sortedMap.firstKey() : sortedMap.lastKey();
            if (mkey == null) {
                return new GSValueMapEntry(null, mapType, binaryValue.getSeqID());
            }
            GSValue gsValue = this.getForMapBinaryValue(sortedMap, mkey);
            return new GSValueMapEntry(mkey, gsValue, mapType, binaryValue.getSeqID());
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("get exception: " + e.getMessage(), e);
        }
    }

    @Override
    public GSValueMap<MK, MV> head(K key, MK endMapKey) {
        return this.subMap(key, null, endMapKey);
    }

    @Override
    public GSValueMap<MK, MV> tail(K key, MK startMapKey) {
        return this.subMap(key, startMapKey, null);
    }

    @Override
    public GSValueMap<MK, MV> subMap(K key, MK startMapKey, MK endMapKey) {
        try {
            SortedMap<MK, BinaryValue> sortedMap;
            BinaryValue binaryValue = this.gBinaryHashMap.get(key);
            if (binaryValue == null) {
                return null;
            }
            GValueType mapType = binaryValue.getGValueType();
            if (binaryValue.getGValueType() == GValueType.Delete) {
                return new GSValueMap(null, GValueType.Delete, binaryValue.getSeqID());
            }
            if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                List<GBinarySortedMap<MK>> subMapList = GSortedRoutingValue.getAllSubGBinarySortedMap(binaryValue, this.mkTypeSerializer, this.gComparator);
                sortedMap = new TreeMap<MK, BinaryValue>(this.gComparator.getJDKCompactor());
                for (GBinarySortedMap<MK> subMap : subMapList) {
                    sortedMap.putAll(subMap.subMap(startMapKey, endMapKey));
                }
            } else {
                Map gBinarySortedMap = this.getBinaryMap(DataPageKSortedMapImpl.getDuplicateBB(binaryValue));
                sortedMap = ((GBinarySortedMap)gBinarySortedMap).subMap(startMapKey, endMapKey);
            }
            if (sortedMap == null || sortedMap.size() == 0) {
                return new GSValueMap(null, mapType, binaryValue.getSeqID());
            }
            SortedMap result = Maps.transformEntries(sortedMap, (mk, mv) -> this.getForBinaryValue((BinaryValue)mv));
            return new GSValueMap(result, mapType, binaryValue.getSeqID());
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("get exception: " + e.getMessage(), e);
        }
    }

    @Override
    public Map<MK, BinaryValue> getBinaryMapByBinaryValue(BinaryValue binaryValue, MK mapKey) {
        GBinarySortedMap<MK> binaryMap;
        if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
            Preconditions.checkState((boolean)(this.gBinaryHashMap instanceof GBinarySplitHashMap), (Object)"Internal bug.");
            binaryMap = GSortedRoutingValue.getSubGBinarySortedMap(mapKey, binaryValue, this.mkTypeSerializer, this.gComparator);
        } else {
            binaryMap = this.getBinaryMap(DataPageKSortedMapImpl.getDuplicateBB(binaryValue));
        }
        return binaryMap;
    }

    @Override
    public Map<MK, GSValue<MV>> getMap(BinaryValue binaryValue) throws IOException {
        Map value = new HashMap();
        if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
            List<GBinarySortedMap<MK>> subMapList = GSortedRoutingValue.getAllSubGBinarySortedMap(binaryValue, this.mkTypeSerializer, this.gComparator);
            for (GBinarySortedMap<MK> subMap : subMapList) {
                ByteBufferDataInputView byteBufferDataInputView = new ByteBufferDataInputView(subMap.getData(), 0, subMap.bytesSize());
                value.putAll((Map)this.valueTypeSerializer.deserialize((DataInputView)byteBufferDataInputView));
            }
        } else {
            ByteBufferDataInputView byteBufferDataInputView = new ByteBufferDataInputView(binaryValue.getBb(), binaryValue.getValueOffset(), binaryValue.getValueLen());
            value = (Map)this.valueTypeSerializer.deserialize((DataInputView)byteBufferDataInputView);
        }
        return value;
    }

    @Override
    public DataPage.DataPageType getDataPageType() {
        return DataPage.DataPageType.KSortedMap;
    }

    @VisibleForTesting
    public static <MK, MV> BinaryValue doCompactionSortedMapValue(List<BinaryValue> valueByOrder, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, boolean isMajor, long version, int logicPageId, Allocator allocator, @Nullable StateFilter stateFilter, @Nullable GRegionContext gRegionContext, GBufferAddressMapping pageMapping, boolean allowMapSplit, int mapSplitSizeThreshold, int mapSplitSubMapSize) {
        if (allowMapSplit) {
            boolean needSplit = false;
            for (BinaryValue binaryValue : valueByOrder) {
                if (binaryValue.getGValueType() == GValueType.Delete || !GSortedRoutingValue.isGSortedRoutingValue(binaryValue) && binaryValue.getValueLen() <= mapSplitSizeThreshold) continue;
                needSplit = true;
                break;
            }
            if (needSplit) {
                return DataPageKSortedMapImpl.doCompactionSortedMapValueSplit(valueByOrder, mkTypeSerializer, gComparator, isMajor, version, logicPageId, allocator, stateFilter, gRegionContext, pageMapping, mapSplitSubMapSize);
            }
        }
        return DataPageKSortedMapImpl.doCompactionSortedMapValueNormal(valueByOrder, mkTypeSerializer, gComparator, isMajor, version, logicPageId, allocator, stateFilter, gRegionContext);
    }

    @VisibleForTesting
    public static <MK, MV> BinaryValue doCompactionSortedMapValueSplit(List<BinaryValue> valueByOrder, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, boolean isMajor, long version, int logicPageId, Allocator allocator, @Nullable StateFilter stateFilter, @Nullable GRegionContext gRegionContext, GBufferAddressMapping pageMapping, int mapSplitSubMapSize) {
        if (valueByOrder.size() == 1 && !isMajor) {
            BinaryValue binaryValue = valueByOrder.get(0);
            if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                GByteBuffer gByteBuffer = SplitHashMapValueHelper.replaceBinaryValueIdList((BinaryValueForSplit)binaryValue, pageMapping);
                return new BinaryValueImpl(gByteBuffer.getByteBuffer(), binaryValue.getGValueType(), binaryValue.getSeqID(), 0, gByteBuffer.capacity());
            }
            return valueByOrder.get(0);
        }
        ArrayList<BinaryValue> listByOrder = new ArrayList<BinaryValue>();
        long seqID = 0L;
        GValueType firstValueType = null;
        int maxSplitValueIndex = -1;
        int maxSplitPartNum = 0;
        for (BinaryValue binaryValue : valueByOrder) {
            int subMapCount;
            if (binaryValue.getGValueType() == GValueType.Delete) {
                firstValueType = GValueType.Delete;
                continue;
            }
            seqID = Math.max(seqID, binaryValue.getSeqID());
            listByOrder.add(binaryValue);
            if (firstValueType == null) {
                firstValueType = binaryValue.getGValueType();
            }
            if ((subMapCount = GSortedRoutingValue.isGSortedRoutingValue(binaryValue) ? GSortedRoutingValue.getSubMapCount(binaryValue) : binaryValue.getValueLen() / mapSplitSubMapSize + (binaryValue.getValueLen() % mapSplitSubMapSize == 0 ? 0 : 1)) <= maxSplitPartNum) continue;
            maxSplitPartNum = subMapCount;
            maxSplitValueIndex = listByOrder.size() - 1;
        }
        return DataPageKSortedMapImpl.compactionSplitBinaryValueList(listByOrder, mkTypeSerializer, gComparator, maxSplitValueIndex, isMajor, version, logicPageId, allocator, stateFilter, gRegionContext, firstValueType, seqID, pageMapping, mapSplitSubMapSize);
    }

    private static <MK> BinaryValue doCompactionSortedMapValueNormal(List<BinaryValue> valueByOrder, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, boolean isMajor, long version, int logicPageId, Allocator allocator, @Nullable StateFilter stateFilter, @Nullable GRegionContext gRegionContext) {
        try {
            GBinarySortedMap<MK> gBinarySortedMap;
            if (valueByOrder.size() == 1 && !isMajor) {
                return valueByOrder.get(0);
            }
            ArrayList<GBinarySortedMap<MK>> listByOrder = new ArrayList<GBinarySortedMap<MK>>();
            long seqID = 0L;
            GValueType firstValueType = null;
            for (BinaryValue binaryValue : valueByOrder) {
                if (binaryValue.getGValueType() == GValueType.Delete) {
                    firstValueType = GValueType.Delete;
                    continue;
                }
                GBinarySortedMap<MK> mapValue = new GBinarySortedMap<MK>(DataPageKSortedMapImpl.getDuplicateBB(binaryValue).getByteBuffer(), mkTypeSerializer, gComparator);
                seqID = Math.max(seqID, binaryValue.getSeqID());
                listByOrder.add(mapValue);
                if (firstValueType != null) continue;
                firstValueType = binaryValue.getGValueType();
            }
            if (listByOrder.size() == 0) {
                gBinarySortedMap = GBinarySortedMap.EMPTY_G_BINARY_SORTEDMAP;
            } else {
                int index = 0;
                Map<BinaryKey, BinaryValue> newMap = ((GBinarySortedMap)listByOrder.get(index)).getBinaryMap();
                long compactionCount = ((GBinarySortedMap)listByOrder.get(index)).getCompactionCount();
                ++index;
                while (index < listByOrder.size()) {
                    newMap.putAll(((GBinarySortedMap)listByOrder.get(index)).getBinaryMap());
                    compactionCount += ((GBinarySortedMap)listByOrder.get(index)).getCompactionCount();
                    ++index;
                }
                gBinarySortedMap = GBinarySortedMap.ofBinaryList(DataPage.DataPageType.KV, isMajor, version, logicPageId, mkTypeSerializer, gComparator, allocator, newMap, compactionCount, stateFilter, gRegionContext);
            }
            ByteBuffer bb = gBinarySortedMap == GBinarySortedMap.EMPTY_G_BINARY_SORTEDMAP ? null : gBinarySortedMap.getData();
            GValueType gValueType = DataPageKSortedMapImpl.judgeFinalValueType(bb, firstValueType, isMajor);
            return new BinaryValueImpl(bb, gValueType, seqID, 0, gBinarySortedMap.bytesSize());
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("Internal BUG " + e.getMessage(), e);
        }
    }

    private static <MK, MV> BinaryValue compactionSplitBinaryValueList(List<BinaryValue> listByOrder, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, int maxSplitValueIndex, boolean isMajor, long version, int logicPageId, Allocator allocator, @Nullable StateFilter stateFilter, @Nullable GRegionContext gRegionContext, GValueType firstValueType, long seqID, GBufferAddressMapping pageMapping, int mapSplitSubMapSize) {
        GByteBuffer finalByteBuffer;
        if (listByOrder.size() == 0) {
            finalByteBuffer = null;
        } else {
            Preconditions.checkState((maxSplitValueIndex >= 0 && maxSplitValueIndex < listByOrder.size() ? 1 : 0) != 0, (Object)"Internal Bug.");
            Tuple2[] buckets = new Tuple2[listByOrder.size()];
            for (int i = 0; i < listByOrder.size(); ++i) {
                buckets[i] = Tuple2.of(new TreeMap(gComparator.getJDKBinaryCompactor()), (Object)0);
            }
            List<BinaryKey> baseKeyIndex = DataPageKSortedMapImpl.getBaseKeyIndexList(listByOrder, maxSplitValueIndex, mkTypeSerializer, gComparator, mapSplitSubMapSize);
            ArrayList keyIndexList = new ArrayList(baseKeyIndex.size());
            ArrayList<Integer> subMapIdList = new ArrayList<Integer>(baseKeyIndex.size());
            int maxSubMapSize = 0;
            for (int part = 0; part < baseKeyIndex.size(); ++part) {
                int subListMaxSize = DataPageKSortedMapImpl.mergeSubList(listByOrder, mkTypeSerializer, gComparator, isMajor, version, logicPageId, allocator, stateFilter, gRegionContext, pageMapping, mapSplitSubMapSize, part, buckets, baseKeyIndex, maxSplitValueIndex, subMapIdList, keyIndexList, maxSubMapSize);
                maxSubMapSize = Math.max(maxSubMapSize, subListMaxSize);
            }
            if (subMapIdList.size() <= 0) {
                return new BinaryValueImpl(null, GValueType.Delete, seqID, 0, 0);
            }
            finalByteBuffer = subMapIdList.size() == 1 ? pageMapping.pollGByteBuffer() : SplitHashMapValueHelper.genRoutingBufferForSplitMap(DataPage.DataPageType.KSplitSortedRouting, subMapIdList, keyIndexList, maxSubMapSize, mkTypeSerializer, logicPageId, allocator);
        }
        ByteBuffer bb = finalByteBuffer == null ? null : finalByteBuffer.getByteBuffer();
        GValueType gValueType = DataPageKSortedMapImpl.judgeFinalValueType(bb, firstValueType, isMajor);
        return new BinaryValueImpl(bb, gValueType, seqID, 0, bb == null ? 0 : bb.capacity());
    }

    private static <MK> int mergeSubList(List<BinaryValue> listByOrder, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, boolean isMajor, long version, int logicPageId, Allocator allocator, @Nullable StateFilter stateFilter, @Nullable GRegionContext gRegionContext, GBufferAddressMapping pageMapping, int mapSplitSubMapSize, int part, Tuple2<SortedMap<BinaryKey, BinaryValue>, Integer>[] buckets, List<BinaryKey> baseKeyIndex, int maxSplitValueIndex, List<Integer> subMapIdList, List<MK> keyIndexList, int maxSubMapSize) {
        SortedMap<BinaryKey, BinaryValue> mergeMap = new TreeMap<BinaryKey, BinaryValue>(gComparator.getJDKBinaryCompactor());
        long compactionCount = 0L;
        PageAddress tmpPageAddress = null;
        BinaryValue tmpBinaryValue = null;
        for (int listIndex = 0; listIndex < listByOrder.size(); ++listIndex) {
            SortedMap bucket = (SortedMap)buckets[listIndex].f0;
            int nextPartIndex = (Integer)buckets[listIndex].f1;
            BinaryValue binaryValue = listByOrder.get(listIndex);
            if (nextPartIndex == 0 && !GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                GBinarySortedMap<MK> gBinarySortedMap = new GBinarySortedMap<MK>(binaryValue.getValueLen() != 0 ? DataPageKSortedMapImpl.getDuplicateBB(binaryValue).getByteBuffer() : null, mkTypeSerializer, gComparator);
                bucket.putAll(gBinarySortedMap.getSortedBinaryMap());
                compactionCount += gBinarySortedMap.getCompactionCount();
                buckets[listIndex] = Tuple2.of((Object)buckets[listIndex].f0, (Object)(++nextPartIndex));
            }
            if (!bucket.isEmpty() && gComparator.compare((BinaryKey)bucket.lastKey(), baseKeyIndex.get(part)) >= 0) {
                Map.Entry entry;
                if (part == baseKeyIndex.size() - 1) {
                    mergeMap.putAll(bucket);
                    bucket.clear();
                    continue;
                }
                Iterator iterator = bucket.entrySet().iterator();
                while (iterator.hasNext() && gComparator.compare((BinaryKey)(entry = iterator.next()).getKey(), baseKeyIndex.get(part)) <= 0) {
                    mergeMap.put((BinaryKey)entry.getKey(), (BinaryValue)entry.getValue());
                    iterator.remove();
                }
                continue;
            }
            if (!bucket.isEmpty()) {
                mergeMap.putAll(bucket);
                bucket.clear();
            }
            int maxPartNum = 1;
            if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                maxPartNum = GSortedRoutingValue.getSubMapCount(binaryValue);
            }
            while (nextPartIndex < maxPartNum) {
                if (listIndex == maxSplitValueIndex && tmpPageAddress == null && mergeMap.isEmpty()) {
                    if (gComparator.compare(GSortedRoutingValue.getKeyIndexBySlot(binaryValue, part), baseKeyIndex.get(part)) != 0) {
                        throw new GeminiRuntimeException("Internal bug");
                    }
                    tmpPageAddress = GSortedRoutingValue.getSubMapPageAddress(nextPartIndex++, binaryValue);
                    tmpBinaryValue = binaryValue;
                    break;
                }
                GBinarySortedMap<Object> nextSubSortedMap = GSortedRoutingValue.getSubGBinarySortedMapWithKey(null, nextPartIndex++, binaryValue, mkTypeSerializer, gComparator);
                compactionCount += nextSubSortedMap.getCompactionCount();
                if (part == baseKeyIndex.size() - 1) {
                    mergeMap.putAll(nextSubSortedMap.getSortedBinaryMap());
                    continue;
                }
                if (gComparator.compare((MK)nextSubSortedMap.lastKey(), baseKeyIndex.get(part)) < 0) {
                    mergeMap.putAll(nextSubSortedMap.getSortedBinaryMap());
                    continue;
                }
                Iterator<Map.Entry<BinaryKey, BinaryValue>> iterator = nextSubSortedMap.getSortedBinaryMap().entrySet().iterator();
                TreeMap<BinaryKey, BinaryValue> newBucket = new TreeMap<BinaryKey, BinaryValue>(gComparator.getJDKBinaryCompactor());
                while (iterator.hasNext()) {
                    Map.Entry<BinaryKey, BinaryValue> entry = iterator.next();
                    if (gComparator.compare(entry.getKey(), baseKeyIndex.get(part)) <= 0) {
                        mergeMap.put(entry.getKey(), entry.getValue());
                        continue;
                    }
                    newBucket.put(entry.getKey(), entry.getValue());
                }
                buckets[listIndex] = Tuple2.of(newBucket, (Object)nextPartIndex);
                break;
            }
            buckets[listIndex] = Tuple2.of((Object)buckets[listIndex].f0, (Object)nextPartIndex);
        }
        if (tmpPageAddress != null) {
            if (mergeMap.isEmpty()) {
                int subMapId = pageMapping.putGByteBufferAddress(tmpPageAddress);
                subMapIdList.add(subMapId);
                keyIndexList.add(GSortedRoutingValue.getKeyIndexBySlot(tmpBinaryValue, part, mkTypeSerializer));
                maxSubMapSize = Math.max(maxSubMapSize, tmpPageAddress.getDataLen());
                return maxSubMapSize;
            }
            mergeMap = DataPageKSortedMapImpl.mergeTmpSubMapIntoCompactionMap(mergeMap, gComparator, tmpPageAddress, tmpBinaryValue, mkTypeSerializer);
        }
        if (mergeMap.isEmpty()) {
            return 0;
        }
        List<SortedMap<BinaryKey, BinaryValue>> subMapList = SplitSortedMapValueHelper.divideSortedMap(mergeMap, mapSplitSubMapSize);
        for (SortedMap<BinaryKey, BinaryValue> subMap : subMapList) {
            GBinarySortedMap<MK> finalSubMap = GBinarySortedMap.ofBinaryList(DataPage.DataPageType.KV, isMajor, version, logicPageId, mkTypeSerializer, gComparator, allocator, subMap, compactionCount, stateFilter, gRegionContext);
            if (finalSubMap == GBinarySortedMap.EMPTY_G_BINARY_SORTEDMAP) continue;
            int subMapId = pageMapping.putGByteBufferAddress(new DataPageSortedSubPageImpl(finalSubMap));
            subMapIdList.add(subMapId);
            keyIndexList.add(finalSubMap.lastKey());
            maxSubMapSize = Math.max(finalSubMap.bytesSize(), maxSubMapSize);
        }
        return maxSubMapSize;
    }

    private static <MK> SortedMap<BinaryKey, BinaryValue> mergeTmpSubMapIntoCompactionMap(SortedMap<BinaryKey, BinaryValue> mergeMap, GComparator<MK> gComparator, PageAddress tmpPageAddress, BinaryValue tmpBinaryValue, TypeSerializer<MK> mkTypeSerializer) {
        TreeMap<BinaryKey, BinaryValue> tmpMap = new TreeMap<BinaryKey, BinaryValue>(gComparator.getJDKBinaryCompactor());
        GBinarySortedMap<MK> gBinarySortedMap = new GBinarySortedMap<MK>(tmpBinaryValue.getPageMapping().getGByteBuffer(tmpPageAddress, null).getByteBuffer(), mkTypeSerializer, gComparator);
        tmpMap.putAll(gBinarySortedMap.getSortedBinaryMap());
        tmpMap.putAll(mergeMap);
        mergeMap = tmpMap;
        return mergeMap;
    }

    private static <MK> List<BinaryKey> getBaseKeyIndexList(List<BinaryValue> listByOrder, int maxSplitValueIndex, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator, int mapSplitSubMapSize) {
        List<BinaryKey> baseKeyIndex;
        BinaryValue baseBinaryValue = listByOrder.get(maxSplitValueIndex);
        if (GSortedRoutingValue.isGSortedRoutingValue(baseBinaryValue)) {
            baseKeyIndex = GSortedRoutingValue.getKeyIndexArray(baseBinaryValue);
        } else {
            GBinarySortedMap<MK> baseMap = new GBinarySortedMap<MK>(DataPageKSortedMapImpl.getDuplicateBB(baseBinaryValue).getByteBuffer(), mkTypeSerializer, gComparator);
            List<SortedMap<BinaryKey, BinaryValue>> subMapList = SplitSortedMapValueHelper.divideSortedMap(baseMap.getSortedBinaryMap(), mapSplitSubMapSize);
            baseKeyIndex = new ArrayList<BinaryKey>();
            subMapList.forEach(subMap -> baseKeyIndex.add((BinaryKey)subMap.lastKey()));
        }
        return baseKeyIndex;
    }

    public static <MK> Map<BinaryKey, BinaryValue> doCompactValueToBinaryMap(List<BinaryValue> binaryValueReversedOrderList, TypeSerializer<MK> mkTypeSerializer, GComparator<MK> gComparator) {
        try {
            HashMap<BinaryKey, BinaryValue> newMap = new HashMap<BinaryKey, BinaryValue>();
            for (int i = binaryValueReversedOrderList.size() - 1; i >= 0; --i) {
                BinaryValue binaryValue = binaryValueReversedOrderList.get(i);
                if (GSortedRoutingValue.isGSortedRoutingValue(binaryValue)) {
                    List<GBinarySortedMap<MK>> subMapList = GSortedRoutingValue.getAllSubGBinarySortedMap(binaryValue, mkTypeSerializer, gComparator);
                    for (GBinarySortedMap<MK> subMap : subMapList) {
                        newMap.putAll(subMap.getBinaryMap());
                    }
                    continue;
                }
                GByteBuffer gByteBuffer = DataPageKSortedMapImpl.getDuplicateBB(binaryValue);
                GBinarySortedMap<MK> mapValue = new GBinarySortedMap<MK>(gByteBuffer == null ? null : gByteBuffer.getByteBuffer(), mkTypeSerializer, gComparator);
                newMap.putAll(mapValue.getBinaryMap());
            }
            return newMap;
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("Internal BUG " + e.getMessage(), e);
        }
    }

    @VisibleForTesting
    public static <K, MK, MV> DataPageKSortedMapImpl<K, MK, MV> readKSortedMapPageFrom(PageSerdeFlink2Key<K, MK, MV> pageSerdeFlink, GByteBuffer dataPage, int crc) {
        GBinaryHashMap gBinaryHashMap = new GBinaryHashMap(dataPage, pageSerdeFlink.getKeySerde(), crc);
        return new DataPageKSortedMapImpl(gBinaryHashMap, pageSerdeFlink.getKey2Serde(), pageSerdeFlink.getValueSerde(), pageSerdeFlink.getMapValueTypeSerializer(), pageSerdeFlink.getMapComparator());
    }

    @Override
    public Tuple2<DataPage, DataPage> getSplitDataByGBinaryMap(GBinaryHashMap<K> gBinaryHashMap1, GBinaryHashMap<K> gBinaryHashMap2) {
        DataPageKSortedMapImpl<K, MK, MV> dataPage1 = gBinaryHashMap1 == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP ? null : new DataPageKSortedMapImpl<K, MK, MV>(gBinaryHashMap1, this.mkTypeSerializer, this.mvTypeSerializer, (AbstractGMapValueTypeSerializer)this.valueTypeSerializer, this.gComparator);
        DataPageKSortedMapImpl<K, MK, MV> dataPage2 = gBinaryHashMap2 == GBinaryHashMap.EMPTY_G_BINARY_HASHMAP ? null : new DataPageKSortedMapImpl<K, MK, MV>(gBinaryHashMap2, this.mkTypeSerializer, this.mvTypeSerializer, (AbstractGMapValueTypeSerializer)this.valueTypeSerializer, this.gComparator);
        return Tuple2.of((Object)dataPage1, (Object)dataPage2);
    }
}

