package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.handler.PageHandler;
import org.apache.flink.runtime.state.gemini.engine.handler.PageKMapHandlerImpl;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.runtime.state.gemini.engine.page.PageStoreKMap;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/memstore/AbstractWriteBufferKMapHashImpl.class */
public abstract class AbstractWriteBufferKMapHashImpl<K, MK, MV> extends AbstractWriteBuffer<K, Map<MK, GSValue<MV>>> implements WriteBufferKMap<K, MK, MV> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteBufferKMapHashImpl.class);
    protected SegmentKMap<K, MK, MV> active;
    protected ConcurrentLinkedDeque<SegmentKMap<K, MK, MV>> snapshotQueue;

    public AbstractWriteBufferKMapHashImpl(GRegion gRegion, EventExecutor eventExecutor, PageStore pageStore) {
        super(gRegion, eventExecutor, pageStore);
        this.snapshotQueue = new ConcurrentLinkedDeque<>();
    }

    abstract void initActive();

    abstract Map<MK, GSValue<MV>> createPOJOMap();

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    PageHandler createPageHandler(Segment segment, boolean z) {
        return new PageKMapHandlerImpl(this.gRegionContext, (PageStoreKMap) this.gRegion.getPageStore(), (SegmentKMapImpl) segment, z);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    public Segment getActiveSegment() {
        return this.active;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer
    Segment pollFlushingSegment() {
        return this.snapshotQueue.poll();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public void add(K k, MK mk, MV mv) {
        this.active.add(k, mk, mv);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public void add(K k, Map<MK, MV> map) {
        this.active.add(k, map);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public void remove(K k, MK mk) {
        this.active.remove(k, mk);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public GValueType contains(K k, MK mk) {
        GSValueMap<MK, MV> gSValueMap = this.active.get((SegmentKMap<K, MK, MV>) k);
        if (gSValueMap != null) {
            gSValueMap.requestCount++;
            if (gSValueMap.valueType == GValueType.Delete) {
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                return GValueType.Delete;
            }
            GSValue<MV> gSValue = gSValueMap.getValue().get(mk);
            if (gSValue != null) {
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                return gSValue.valueType;
            }
        }
        Iterator<SegmentKMap<K, MK, MV>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValueMap<MK, MV> gSValueMap2 = descendingIterator.next().get((SegmentKMap<K, MK, MV>) k);
            if (gSValueMap2 != null) {
                gSValueMap2.requestCount++;
                if (gSValueMap2.valueType == GValueType.Delete) {
                    this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                    return GValueType.Delete;
                }
                GSValue<MV> gSValue2 = gSValueMap2.getValue().get(mk);
                if (gSValue2 != null) {
                    this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                    return gSValue2.valueType;
                }
            }
        }
        this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferMissCount();
        return null;
    }

    public void put(K k, Map<MK, GSValue<MV>> map) {
        this.active.put(k, map);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public GSValue<MV> get(K k, MK mk) {
        GSValue<MV> gSValue = this.active.get(k, mk);
        if (gSValue != null) {
            gSValue.requestCount++;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            return gSValue;
        }
        Iterator<SegmentKMap<K, MK, MV>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValue<MV> gSValue2 = descendingIterator.next().get(k, mk);
            if (gSValue2 != null) {
                gSValue2.requestCount++;
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                return gSValue2;
            }
        }
        this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferMissCount();
        return null;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer, org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public GSValueMap<MK, MV> get(K k) {
        ArrayList arrayList = new ArrayList();
        GSValueMap<MK, MV> gSValueMap = this.active.get((SegmentKMap<K, MK, MV>) k);
        if (gSValueMap != null) {
            gSValueMap.requestCount++;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            if (gSValueMap.valueType != GValueType.PutMap && gSValueMap.valueType != GValueType.Delete) {
                arrayList.add(gSValueMap);
            }
            return gSValueMap;
        }
        Iterator<SegmentKMap<K, MK, MV>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValueMap<MK, MV> gSValueMap2 = descendingIterator.next().get((SegmentKMap<K, MK, MV>) k);
            if (gSValueMap2 != null) {
                gSValueMap2.requestCount++;
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                arrayList.add(gSValueMap2);
                if (gSValueMap2.valueType == GValueType.PutMap || gSValueMap2.valueType == GValueType.Delete) {
                    break;
                }
            }
        }
        if (arrayList.size() == 0) {
            return null;
        }
        return mergeGSValueMap(arrayList);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void getAll(Map<K, GSValue<Map<MK, GSValue<MV>>>> map) {
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void allKeysIncludeDeleted(Set<K> set) {
        set.addAll(getActiveSegment().getData().keySet());
        Iterator<SegmentKMap<K, MK, MV>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            set.addAll(descendingIterator.next().getData().keySet());
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void removeKey(K k) {
        this.active.removeKey(k);
        checkResource();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void reset() {
        initActive();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public GValueType contains(K k) {
        GSValueMap<MK, MV> gSValueMap = this.active.get((SegmentKMap<K, MK, MV>) k);
        if (gSValueMap != null) {
            gSValueMap.requestCount++;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            return gSValueMap.valueType;
        }
        Iterator<SegmentKMap<K, MK, MV>> descendingIterator = this.snapshotQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            GSValueMap<MK, MV> gSValueMap2 = descendingIterator.next().get((SegmentKMap<K, MK, MV>) k);
            if (gSValueMap2 != null) {
                gSValueMap2.requestCount++;
                this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
                return gSValueMap2.valueType;
            }
        }
        this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferMissCount();
        return null;
    }

    private GSValueMap<MK, MV> mergeGSValueMap(List<GSValueMap<MK, MV>> list) {
        if (list == null || list.size() == 0) {
            return null;
        }
        Map<MK, GSValue<MV>> createPOJOMap = createPOJOMap();
        GValueType gValueType = GValueType.AddMap;
        long seqID = list.get(0).getSeqID();
        for (int size = list.size() - 1; size >= 0; size--) {
            GSValueMap<MK, MV> gSValueMap = list.get(size);
            if (gSValueMap.getValueType() == GValueType.Delete) {
                createPOJOMap.clear();
                gValueType = GValueType.PutMap;
            } else if (gSValueMap.getValueType() == GValueType.PutMap) {
                createPOJOMap.clear();
                createPOJOMap.putAll((Map) gSValueMap.value);
                gValueType = GValueType.PutMap;
            } else {
                createPOJOMap.putAll((Map) gSValueMap.value);
            }
        }
        return new GSValueMap<>(createPOJOMap, gValueType, seqID);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer, org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKMap
    public /* bridge */ /* synthetic */ GSValue get(Object obj) {
        return get((AbstractWriteBufferKMapHashImpl<K, MK, MV>) obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public /* bridge */ /* synthetic */ void put(Object obj, Object obj2) {
        put((AbstractWriteBufferKMapHashImpl<K, MK, MV>) obj, (Map) obj2);
    }
}
