/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.page.bmap;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.DataPageHashSubPageImpl;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.AbstractGRoutingValue;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.BinaryValueForSplit;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.ByteBufferUtils;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBinaryHashMap;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GBufferAddressMapping;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GByteArrayOutputStreamWithPos;
import org.apache.flink.runtime.state.gemini.engine.page.bmap.GHashHeaderImpl;
import org.apache.flink.runtime.state.gemini.engine.page.compress.GCompressAlgorithm;
import org.apache.flink.runtime.state.gemini.engine.rm.Allocator;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.GUnPooledByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCountable;
import org.apache.flink.util.MathUtils;

public class SplitHashMapValueHelper {
    public static <MK, MV> GBinaryHashMap<MK> trySplit(DataPage.DataPageType dataPageType, List<Tuple2<MK, GSValue<MV>>> keyValueList, TypeSerializer<MK> keySerializer, TypeSerializer<MV> valueSerializer, long version, int logicPageId, Allocator allocator, long compactionCount, GCompressAlgorithm gCompressAlgorithm, GBufferAddressMapping mapping, int mapSplitSubMapSize, int mapSplitMinKeyNum) {
        int totalKeys = keyValueList.size();
        if (totalKeys == 0) {
            return GBinaryHashMap.EMPTY_G_BINARY_HASHMAP;
        }
        List<List<Tuple2<MK, GSValue<MV>>>> subMapList = SplitHashMapValueHelper.splitGSValueMap(keyValueList, keySerializer, valueSerializer, mapSplitMinKeyNum, mapSplitSubMapSize);
        if (subMapList.size() == 1) {
            return GBinaryHashMap.of(dataPageType, keyValueList, keySerializer, valueSerializer, version, logicPageId, allocator, compactionCount, gCompressAlgorithm);
        }
        int[] subMapIdList = new int[subMapList.size()];
        int subMapMaxSize = 0;
        for (int i = 0; i < subMapList.size(); ++i) {
            int subMapId;
            GBinaryHashMap subGBinaryHashMap = GBinaryHashMap.of(dataPageType, subMapList.get(i), keySerializer, valueSerializer, version, logicPageId, allocator, compactionCount, gCompressAlgorithm);
            subMapIdList[i] = subMapId = mapping.putGByteBufferAddress(new DataPageHashSubPageImpl(subGBinaryHashMap));
            subMapMaxSize = Math.max(subGBinaryHashMap.bytesSize(), subMapMaxSize);
        }
        int splitMapIndexLen = 0;
        GHashHeaderImpl pageHelper = GHashHeaderImpl.getPageHelper(splitMapIndexLen);
        GByteBuffer gByteBuffer = SplitHashMapValueHelper.genRoutingValueForSplitHashMap(subMapIdList, logicPageId, allocator, subMapMaxSize);
        return new GBinaryHashMap<MK>(pageHelper, gByteBuffer, keySerializer);
    }

    public static GByteBuffer genRoutingValueForSplitHashMap(int[] subMapIdArray, int logicPageId, Allocator allocator, int subMapMaxSize) {
        if (subMapMaxSize <= 0) {
            return null;
        }
        ArrayList<Integer> subMapIdList = new ArrayList<Integer>(subMapIdArray.length);
        ArrayList<Integer> hashIndexList = new ArrayList<Integer>(subMapIdArray.length);
        for (int i = 0; i < subMapIdArray.length; ++i) {
            subMapIdList.add(subMapIdArray[i]);
            hashIndexList.add(i);
        }
        GByteBuffer gByteBuffer = SplitHashMapValueHelper.genRoutingBufferForSplitMap(DataPage.DataPageType.KSplitHashRouting, subMapIdList, hashIndexList, subMapMaxSize, IntSerializer.INSTANCE, logicPageId, allocator);
        return gByteBuffer;
    }

    public static <K> GByteBuffer genRoutingBufferForSplitMap(DataPage.DataPageType dataPageType, List<Integer> subMapIdList, List<K> subMapIndexList, int subMapMaxSize, TypeSerializer<K> keySerializer, int logicPageId, Allocator allocator) {
        int subMapCount = subMapIdList.size();
        if (subMapCount <= 0) {
            return null;
        }
        byte[] header = new byte[16];
        ByteBuffer headerBB = ByteBuffer.wrap(header);
        GByteArrayOutputStreamWithPos outputStreamForKey = new GByteArrayOutputStreamWithPos(1024);
        DataOutputViewStreamWrapper outputViewForKey = new DataOutputViewStreamWrapper((OutputStream)outputStreamForKey);
        int lastKeyPosition = subMapCount * 4;
        GByteArrayOutputStreamWithPos outputStreamForValue = new GByteArrayOutputStreamWithPos(1024);
        DataOutputViewStreamWrapper outputViewForValue = new DataOutputViewStreamWrapper((OutputStream)outputStreamForValue);
        int lastValuePosition = 0;
        ReferenceCountable gByteBuffer = null;
        try {
            int keyCursor = 0;
            for (int i = 0; i < subMapIdList.size(); ++i) {
                int subMapId = subMapIdList.get(i);
                outputStreamForKey.setPosition(lastKeyPosition);
                keySerializer.serialize(subMapIndexList.get(i), (DataOutputView)outputViewForKey);
                lastKeyPosition = outputStreamForKey.getPosition();
                outputStreamForKey.setPosition(keyCursor * 4);
                IntSerializer.INSTANCE.serialize(Integer.valueOf(lastKeyPosition), (DataOutputView)outputViewForKey);
                IntSerializer.INSTANCE.serialize(Integer.valueOf(subMapId), (DataOutputView)outputViewForValue);
                lastValuePosition = outputStreamForValue.getPosition();
                ++keyCursor;
            }
            outputStreamForKey.setPosition(lastKeyPosition);
            outputStreamForValue.setPosition(lastValuePosition);
            ByteBuffer keyBytes = ByteBuffer.wrap(outputStreamForKey.getBuf(), 0, lastKeyPosition);
            ByteBuffer valueBytes = ByteBuffer.wrap(outputStreamForValue.getBuf(), 0, lastValuePosition);
            AbstractGRoutingValue.writeHeaderRoutingType(headerBB, dataPageType.getCode());
            AbstractGRoutingValue.writeHeaderSubMapCount(headerBB, subMapCount);
            AbstractGRoutingValue.writeHeaderSubMapMaxSize(headerBB, subMapMaxSize);
            AbstractGRoutingValue.writeHeaderBaseValueOffset(headerBB, header.length + lastKeyPosition);
            int newBufferLen = header.length + lastKeyPosition + lastValuePosition;
            gByteBuffer = allocator.allocate(newBufferLen);
            ByteBufferUtils.copyFromArrayToBuffer(gByteBuffer.getByteBuffer(), 0, header, 0, header.length);
            ByteBufferUtils.copyFromBufferToBuffer(keyBytes, gByteBuffer.getByteBuffer(), 0, header.length, lastKeyPosition);
            ByteBufferUtils.copyFromBufferToBuffer(valueBytes, gByteBuffer.getByteBuffer(), 0, header.length + lastKeyPosition, lastValuePosition);
            return gByteBuffer;
        }
        catch (Exception e) {
            if (gByteBuffer != null) {
                gByteBuffer.release();
            }
            throw new GeminiRuntimeException("GBinaryHashMap get exception: " + e.getMessage(), e);
        }
    }

    public static GByteBuffer replaceBinaryValueIdList(BinaryValueForSplit binaryValue, GBufferAddressMapping pageMapping) {
        int[] oldIdList = AbstractGRoutingValue.getAllSubMapId(binaryValue);
        int[] newIdList = pageMapping.mergeMapping(binaryValue.getPageMapping(), oldIdList);
        GByteArrayOutputStreamWithPos outputStreamForValue = new GByteArrayOutputStreamWithPos(1024);
        DataOutputViewStreamWrapper outputViewForValue = new DataOutputViewStreamWrapper((OutputStream)outputStreamForValue);
        outputStreamForValue.setPosition(0);
        try {
            for (int i = 0; i < newIdList.length; ++i) {
                IntSerializer.INSTANCE.serialize(Integer.valueOf(newIdList[i]), (DataOutputView)outputViewForValue);
            }
            int lastValuePosition = outputStreamForValue.getPosition();
            int headAndKeyLen = AbstractGRoutingValue.getGRoutingBaseValueOffset(binaryValue);
            int newBufferLen = headAndKeyLen + lastValuePosition;
            GUnPooledByteBuffer gByteBuffer = new GUnPooledByteBuffer(ByteBuffer.allocate(newBufferLen));
            ByteBufferUtils.copyFromBufferToBuffer(binaryValue.getBb(), gByteBuffer.getByteBuffer(), binaryValue.getValueOffset(), 0, headAndKeyLen);
            ByteBuffer valueBytes = ByteBuffer.wrap(outputStreamForValue.getBuf(), 0, lastValuePosition);
            ByteBufferUtils.copyFromBufferToBuffer(valueBytes, gByteBuffer.getByteBuffer(), 0, headAndKeyLen, lastValuePosition);
            return gByteBuffer;
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("replaceBinaryValueIdList get exception: " + e.getMessage(), e);
        }
    }

    private static <MK, MV> List<List<Tuple2<MK, GSValue<MV>>>> splitGSValueMap(List<Tuple2<MK, GSValue<MV>>> keyValueList, TypeSerializer<MK> keySerializer, TypeSerializer<MV> valueSerializer, int mapSplitMinKeyNum, int mapSplitSubMapSize) {
        if (keyValueList.size() <= mapSplitMinKeyNum) {
            return Collections.singletonList(keyValueList);
        }
        int subMapNum = SplitHashMapValueHelper.getSplitNumBySampling(keyValueList, keySerializer, valueSerializer, mapSplitMinKeyNum, mapSplitSubMapSize);
        int realSubMapNum = MathUtils.roundUpToPowerOfTwo((int)subMapNum);
        if (realSubMapNum == 1) {
            return Collections.singletonList(keyValueList);
        }
        return SplitHashMapValueHelper.divideKeyValueList(keyValueList, realSubMapNum);
    }

    public static <MK, MV> int getSplitNumBySampling(List<Tuple2<MK, GSValue<MV>>> keyValueList, TypeSerializer<MK> keySerializer, TypeSerializer<MV> valueSerializer, int mapSplitMinKeyNum, int mapSplitSubMapSize) {
        GByteArrayOutputStreamWithPos outputStreamForSampling = new GByteArrayOutputStreamWithPos(1024);
        DataOutputViewStreamWrapper outputViewForSampling = new DataOutputViewStreamWrapper((OutputStream)outputStreamForSampling);
        outputStreamForSampling.setPosition(0);
        int index = 0;
        int samplingStepSize = mapSplitMinKeyNum;
        int samplingNum = 0;
        try {
            while (index < keyValueList.size()) {
                keySerializer.serialize(keyValueList.get((int)index).f0, (DataOutputView)outputViewForSampling);
                valueSerializer.serialize(((GSValue)keyValueList.get((int)index).f1).getValue(), (DataOutputView)outputViewForSampling);
                index += samplingStepSize;
                ++samplingNum;
            }
        }
        catch (Exception e) {
            throw new GeminiRuntimeException("Exception occur when GBinaryHashMap splitGSValueMap" + e.getMessage(), e);
        }
        int avgSizePerKey = outputStreamForSampling.getPosition() / samplingNum;
        int keyNumPerSubMap = mapSplitSubMapSize < avgSizePerKey ? 1 : mapSplitSubMapSize / avgSizePerKey;
        int subMapNum = keyValueList.size() / keyNumPerSubMap + (keyValueList.size() % keyNumPerSubMap == 0 ? 0 : 1);
        return subMapNum;
    }

    private static <MK, MV> List<List<Tuple2<MK, GSValue<MV>>>> divideKeyValueList(List<Tuple2<MK, GSValue<MV>>> keyValueList, int subMapNum) {
        ArrayList subMapList = new ArrayList(subMapNum);
        for (int i = 0; i < subMapNum; ++i) {
            subMapList.add(new ArrayList(keyValueList.size() / subMapNum + 1));
        }
        keyValueList.forEach(entry -> ((List)subMapList.get(entry.f0.hashCode() & subMapNum - 1)).add(entry));
        return subMapList;
    }
}

