/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.handler.PageHandler;
import org.apache.flink.runtime.state.gemini.engine.handler.PageKListHandlerImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GRegionKListImpl;
import org.apache.flink.runtime.state.gemini.engine.memstore.AbstractWriteBuffer;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValueList;
import org.apache.flink.runtime.state.gemini.engine.memstore.Segment;
import org.apache.flink.runtime.state.gemini.engine.memstore.SegmentKList;
import org.apache.flink.runtime.state.gemini.engine.memstore.SegmentKListImpl;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKList;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriteBufferKListHashImpl<K, E>
extends AbstractWriteBuffer<K, List<GSValue<E>>>
implements WriteBufferKList<K, E> {
    private static final Logger LOG = LoggerFactory.getLogger(WriteBufferKListHashImpl.class);
    protected SegmentKList<K, E> active;
    protected ConcurrentLinkedDeque<SegmentKList<K, E>> snapshotQueue = new ConcurrentLinkedDeque();

    public WriteBufferKListHashImpl(GRegion gRegion, EventExecutor eventExecutor, PageStore pageStore) {
        super(gRegion, eventExecutor, pageStore);
        this.active = new SegmentKListImpl(this.segmentID++, this.gRegionContext);
    }

    @Override
    public void add(K key, E element) {
        this.active.add(key, element);
        this.checkResource();
    }

    @Override
    public void addAll(K key, Collection<? extends E> elements) {
        this.active.addAll(key, elements);
        this.checkResource();
    }

    @Override
    public void remove(K key, E element) {
        throw new GeminiRuntimeException("not support remove element");
    }

    @Override
    public void removeAll(K key, Collection<? extends E> elements) {
        this.active.removeAll(key, elements);
        this.checkResource();
    }

    @Override
    public Segment getActiveSegment() {
        return this.active;
    }

    @Override
    Segment pollFlushingSegment() {
        return this.snapshotQueue.poll();
    }

    @Override
    Segment addFlushingSegment() {
        SegmentKList<K, E> result = this.active;
        this.snapshotQueue.add(this.active);
        this.gRegionContext.getWriteBufferStats().addFlushingSegmentCount(1);
        this.active = new SegmentKListImpl(this.segmentID++, this.gRegionContext);
        return result;
    }

    @Override
    PageHandler createPageHandler(Segment segment, boolean onlyEstimatedSize) {
        return new PageKListHandlerImpl((GRegionKListImpl)this.gRegion, (SegmentKListImpl)segment, onlyEstimatedSize);
    }

    @Override
    public void put(K key, List<GSValue<E>> value) {
        this.active.put(key, value);
        this.checkResource();
    }

    @Override
    public GSValueList<E> get(K key) {
        ArrayList<GSValueList<GSValue>> reverseOrderList = new ArrayList<GSValueList<GSValue>>();
        GSValue listValue = this.active.get((Object)key);
        if (listValue != null) {
            ++((GSValueList)listValue).requestCount;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            if (((GSValueList)listValue).valueType == GValueType.PutList) {
                return listValue;
            }
            if (((GSValueList)listValue).valueType == GValueType.Delete) {
                return listValue;
            }
            reverseOrderList.add((GSValueList<GSValue>)listValue);
        }
        Iterator<SegmentKList<K, E>> iterator = this.snapshotQueue.descendingIterator();
        while (iterator.hasNext()) {
            SegmentKList<K, E> inactive = iterator.next();
            listValue = inactive.get((Object)key);
            if (listValue == null) continue;
            ++((GSValueList)listValue).requestCount;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            reverseOrderList.add((GSValueList<GSValue>)listValue);
            if (((GSValueList)listValue).valueType != GValueType.PutList && ((GSValueList)listValue).valueType != GValueType.Delete) continue;
            break;
        }
        return reverseOrderList.size() == 0 ? null : this.mergeGSValueList(reverseOrderList);
    }

    private GSValueList<E> mergeGSValueList(List<GSValueList<E>> reverseOrderList) {
        if (reverseOrderList == null || reverseOrderList.size() == 0) {
            return null;
        }
        ArrayList newList = new ArrayList();
        GValueType finalGValueType = GValueType.AddList;
        long seqID = reverseOrderList.get(0).getSeqID();
        for (int index = reverseOrderList.size() - 1; index >= 0; --index) {
            GSValueList<E> currentList = reverseOrderList.get(index);
            if (currentList.getValueType() == GValueType.Delete) {
                newList.clear();
                finalGValueType = GValueType.PutList;
                continue;
            }
            if (currentList.getValueType() == GValueType.PutList) {
                newList.clear();
                newList.addAll((Collection)currentList.value);
                finalGValueType = GValueType.PutList;
                continue;
            }
            newList.addAll((Collection)currentList.value);
        }
        return new GSValueList(newList, finalGValueType, seqID);
    }

    @Override
    public void getAll(Map<K, GSValue<List<GSValue<E>>>> container) {
    }

    @Override
    public void allKeysIncludeDeleted(Set<K> container) {
        Segment activeMap = this.getActiveSegment();
        container.addAll(activeMap.getData().keySet());
        Iterator<SegmentKList<K, E>> iterator = this.snapshotQueue.descendingIterator();
        while (iterator.hasNext()) {
            SegmentKList<K, E> inactive = iterator.next();
            container.addAll(inactive.getData().keySet());
        }
    }

    @Override
    public void removeKey(K key) {
        this.active.removeKey(key);
        this.checkResource();
    }

    @Override
    public void reset() {
        this.active = new SegmentKListImpl(this.segmentID++, this.gRegionContext);
    }

    @Override
    public GValueType contains(K key) {
        GSValue listValue = this.active.get((Object)key);
        if (listValue != null) {
            ++((GSValueList)listValue).requestCount;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            return ((GSValueList)listValue).valueType;
        }
        Iterator<SegmentKList<K, E>> iterator = this.snapshotQueue.descendingIterator();
        while (iterator.hasNext()) {
            SegmentKList<K, E> inactive = iterator.next();
            listValue = inactive.get((Object)key);
            if (listValue == null) continue;
            ++((GSValueList)listValue).requestCount;
            this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferHitCount();
            return ((GSValueList)listValue).valueType;
        }
        this.gRegionContext.getGContext().getSupervisor().getCacheManager().getCacheStats().addWriteBufferMissCount();
        return null;
    }
}

