package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.handler.PageHandler;
import org.apache.flink.runtime.state.gemini.engine.handler.PageKListHandlerImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GRegionKListImpl;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/memstore/WriteBufferKListHashImpl.class */
public class WriteBufferKListHashImpl<K, E> extends AbstractWriteBuffer<K, List<GSValue<E>>> implements WriteBufferKList<K, E> {
    private static final Logger LOG = LoggerFactory.getLogger(WriteBufferKListHashImpl.class);
    protected SegmentKList<K, E> active;
    protected ConcurrentLinkedDeque<SegmentKList<K, E>> snapshotQueue;

    public WriteBufferKListHashImpl(GRegion gRegion, EventExecutor eventExecutor, PageStore pageStore) {
        super(gRegion, eventExecutor, pageStore);
        this.snapshotQueue = new ConcurrentLinkedDeque<>();
        long j = this.segmentID;
        this.segmentID = j + 1;
        this.active = new SegmentKListImpl(j, this.gRegionContext);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void add(K k, E e) {
        this.active.add(k, e);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void addAll(K k, Collection<? extends E> collection) {
        this.active.addAll(k, collection);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void remove(K k, E e) {
        throw new GeminiRuntimeException("not support remove element");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void removeAll(K k, Collection<? extends E> collection) {
        this.active.removeAll(k, collection);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    public Segment getActiveSegment() {
        return this.active;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    Segment pollFlushingSegment() {
        return this.snapshotQueue.poll();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    Segment addFlushingSegment() {
        SegmentKList<K, E> segmentKList = this.active;
        this.snapshotQueue.add(this.active);
        this.gRegionContext.getWriteBufferStats().addFlushingSegmentCount(1);
        long j = this.segmentID;
        this.segmentID = j + 1;
        this.active = new SegmentKListImpl(j, this.gRegionContext);
        return segmentKList;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    PageHandler createPageHandler(Segment segment, boolean z) {
        return new PageKListHandlerImpl((GRegionKListImpl) this.gRegion, (SegmentKListImpl) segment, z);
    }

    public void put(K k, List<GSValue<E>> list) {
        this.active.put(k, list);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer, org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public GSValueList<E> get(K k) {
        ArrayList arrayList = new ArrayList();
        GSValueList<E> gSValueList = this.active.get((SegmentKList<K, E>) k);
        if (gSValueList != null) {
            gSValueList.requestCount++;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            if (gSValueList.valueType != GValueType.PutList && gSValueList.valueType != GValueType.Delete) {
                arrayList.add(gSValueList);
            }
            return gSValueList;
        }
        Iterator<SegmentKList<K, E>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValueList<E> gSValueList2 = descendingIterator.next().get((SegmentKList<K, E>) k);
            if (gSValueList2 != null) {
                gSValueList2.requestCount++;
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                arrayList.add(gSValueList2);
                if (gSValueList2.valueType == GValueType.PutList || gSValueList2.valueType == GValueType.Delete) {
                    break;
                }
            }
        }
        if (arrayList.size() == 0) {
            return null;
        }
        return mergeGSValueList(arrayList);
    }

    private GSValueList<E> mergeGSValueList(List<GSValueList<E>> list) {
        if (list == null || list.size() == 0) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        GValueType gValueType = GValueType.AddList;
        long seqID = list.get(0).getSeqID();
        for (int size = list.size() - 1; size >= 0; size--) {
            GSValueList<E> gSValueList = list.get(size);
            if (gSValueList.getValueType() == GValueType.Delete) {
                arrayList.clear();
                gValueType = GValueType.PutList;
            } else if (gSValueList.getValueType() == GValueType.PutList) {
                arrayList.clear();
                arrayList.addAll((Collection) gSValueList.value);
                gValueType = GValueType.PutList;
            } else {
                arrayList.addAll((Collection) gSValueList.value);
            }
        }
        return new GSValueList<>(arrayList, gValueType, seqID);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void getAll(Map<K, GSValue<List<GSValue<E>>>> map) {
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void allKeysIncludeDeleted(Set<K> set) {
        set.addAll(getActiveSegment().getData().keySet());
        Iterator<SegmentKList<K, E>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            set.addAll(descendingIterator.next().getData().keySet());
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void removeKey(K k) {
        this.active.removeKey(k);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void reset() {
        long j = this.segmentID;
        this.segmentID = j + 1;
        this.active = new SegmentKListImpl(j, this.gRegionContext);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public GValueType contains(K k) {
        GSValueList<E> gSValueList = this.active.get((SegmentKList<K, E>) k);
        if (gSValueList != null) {
            gSValueList.requestCount++;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            return gSValueList.valueType;
        }
        Iterator<SegmentKList<K, E>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValueList<E> gSValueList2 = descendingIterator.next().get((SegmentKList<K, E>) k);
            if (gSValueList2 != null) {
                gSValueList2.requestCount++;
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                return gSValueList2.valueType;
            }
        }
        this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferMissCount();
        return null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer, org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public /* bridge */ /* synthetic */ GSValue get(Object obj) {
        return get((WriteBufferKListHashImpl<K, E>) obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public /* bridge */ /* synthetic */ void put(Object obj, Object obj2) {
        put((WriteBufferKListHashImpl<K, E>) obj, (List) obj2);
    }
}
