/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValueList;
import org.apache.flink.runtime.state.gemini.engine.memstore.Segment;
import org.apache.flink.runtime.state.gemini.engine.memstore.SegmentKList;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;

public class SegmentKListImpl<K, E>
implements SegmentKList<K, E> {
    private final long segmentID;
    private final GRegionContext gRegionContext;
    private final long version;
    private final Map<K, GSValueList<E>> dataMap;
    private int recordCount = 0;
    private boolean writeCopy;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<E> elementSerializer;

    public SegmentKListImpl(long segmentID, GRegionContext gRegionContext) {
        this.segmentID = segmentID;
        this.gRegionContext = gRegionContext;
        this.dataMap = new HashMap<K, GSValueList<E>>();
        this.version = gRegionContext.getGContext().getCurVersion();
        this.writeCopy = gRegionContext.getGContext().getGConfiguration().isWriteCopy();
        this.keySerializer = gRegionContext.getPageSerdeFlink().getKeySerde();
        this.elementSerializer = gRegionContext.getPageSerdeFlink().getValueSerde();
    }

    public SegmentKListImpl(long segmentID, GRegionContext gRegionContext, Map<K, GSValueList<E>> dataMap) {
        this.segmentID = segmentID;
        this.gRegionContext = gRegionContext;
        this.dataMap = dataMap;
        this.version = gRegionContext.getGContext().getCurVersion();
        this.writeCopy = gRegionContext.getGContext().getGConfiguration().isWriteCopy();
        this.keySerializer = gRegionContext.getPageSerdeFlink().getKeySerde();
        this.elementSerializer = gRegionContext.getPageSerdeFlink().getValueSerde();
    }

    @Override
    public void add(K key, E element) {
        GSValueList<E> gValueList = this.getOrCreateList(key);
        long seqID = this.gRegionContext.getNextSeqId();
        this.internalAdd(element, gValueList, seqID);
        this.updateListSeqID(gValueList, seqID);
    }

    private void internalAdd(E element, GSValueList<E> gValueList, long seqID) {
        ((List)gValueList.value).add(GSValue.of(this.copyElementIfNeeded(element), GValueType.PutValue, seqID));
        ++this.recordCount;
        this.gRegionContext.getWriteBufferStats().addTotalRecordCount(1);
    }

    @Override
    public void addAll(K key, Collection<? extends E> elements) {
        GSValueList<E> gValueList = this.getOrCreateList(key);
        long seqID = this.gRegionContext.getNextSeqId();
        HashSet<E> duplicatedElementChecker = new HashSet<E>();
        for (E element : elements) {
            if (!duplicatedElementChecker.add(element)) {
                seqID = this.gRegionContext.getNextSeqId();
            }
            this.internalAdd(element, gValueList, seqID);
        }
        this.updateListSeqID(gValueList, seqID);
    }

    @Override
    public void remove(K key, E element) {
        throw new GeminiRuntimeException("not support remove element");
    }

    @Override
    public void removeAll(K key, Collection<? extends E> elements) {
        elements.forEach(v -> this.remove(key, v));
    }

    @Override
    public void put(K key, List<GSValue<E>> value) {
        GSValueList<E> glist = this.createPutList();
        long seqID = 0L;
        for (GSValue<E> gsValue : value) {
            seqID = Math.max(seqID, gsValue.getSeqID());
            gsValue.setValue(this.copyElementIfNeeded(gsValue.getValue()));
            glist.getValue().add(gsValue);
        }
        GSValueList<E> old = this.dataMap.put(this.copyKeyIfNeeded(key), glist);
        this.updateListSeqID(glist, seqID);
        int oldSize = old == null || old.getValue() == null ? 0 : old.getValue().size();
        int delta = value.size() - oldSize;
        this.recordCount += delta;
        this.gRegionContext.getWriteBufferStats().addTotalRecordCount(delta);
    }

    @Override
    public GSValueList<E> get(K key) {
        return this.dataMap.get(key);
    }

    @Override
    public void removeKey(K key) {
        GSValueList<E> gsValueList = this.createDeleteList();
        long seqID = this.gRegionContext.getNextSeqId();
        this.dataMap.put(this.copyKeyIfNeeded(key), gsValueList);
        this.updateListSeqID(gsValueList, seqID);
    }

    @Override
    public long getSegmentID() {
        return this.segmentID;
    }

    @Override
    public int getRecordCount() {
        return this.recordCount;
    }

    @Override
    public long getVersion() {
        return this.version;
    }

    @Override
    public Segment<K, List<GSValue<E>>> copySegment() {
        HashMap<K, GSValueList<E>> copyMap = new HashMap<K, GSValueList<E>>();
        for (Map.Entry<K, GSValueList<E>> entry : this.dataMap.entrySet()) {
            copyMap.put(entry.getKey(), entry.getValue().copyGSValueList());
        }
        return new SegmentKListImpl(-1L, this.gRegionContext, copyMap);
    }

    @Override
    public Map<K, GSValueList<E>> getData() {
        return this.dataMap;
    }

    private GSValueList<E> getOrCreateList(K key) {
        GSValueList<E> gValueList = this.dataMap.get(key);
        if (gValueList == null || gValueList.getSeqID() < this.gRegionContext.getRemoveAllSeqId()) {
            gValueList = this.createAddList();
            this.dataMap.put(this.copyKeyIfNeeded(key), gValueList);
        } else if (gValueList.getValueType() == GValueType.Delete) {
            gValueList = this.createPutList();
            this.dataMap.put(this.copyKeyIfNeeded(key), gValueList);
        }
        return gValueList;
    }

    private void updateListSeqID(GSValueList<E> gsValueList, long seqID) {
        gsValueList.setSeqID(seqID);
    }

    private GSValueList<E> createAddList() {
        return GSValueList.of(new ArrayList(), GValueType.AddList, 0L);
    }

    private GSValueList<E> createPutList() {
        return GSValueList.of(new ArrayList(), GValueType.PutList, 0L);
    }

    private GSValueList<E> createDeleteList() {
        return GSValueList.of(null, GValueType.Delete, 0L);
    }

    private K copyKeyIfNeeded(K key) {
        return (K)(this.writeCopy ? this.keySerializer.copy(key) : key);
    }

    private E copyElementIfNeeded(E element) {
        return (E)(this.writeCopy ? this.elementSerializer.copy(element) : element);
    }
}

