package org.apache.flink.runtime.state.gemini.engine.hashtable;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GRegionID;
import org.apache.flink.runtime.state.gemini.engine.GeminiKList;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.memstore.GSValue;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKList;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferKListHashImpl;
import org.apache.flink.runtime.state.gemini.engine.page.GValueType;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageStoreHashKListImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageStoreKList;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/hashtable/GRegionKListImpl.class */
public class GRegionKListImpl<K, E> implements GeminiKList<K, E>, HashGRegion {
    private final GContext gContext;
    private final GRegionContext gRegionContext;
    private WriteBufferKList<K, E> writeBuffer;
    private PageStoreKList<K, E> pageStore;
    private final EventExecutor regionEventExecutor;
    private boolean readCopy;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<E> elementSerializer;

    public GRegionKListImpl(GRegionContext gRegionContext, PageIndex pageIndex) {
        this.gRegionContext = (GRegionContext) Preconditions.checkNotNull(gRegionContext);
        this.gContext = gRegionContext.getGContext();
        this.regionEventExecutor = this.gContext.getSupervisor().getRegionExecutorGroup().next();
        gRegionContext.getPageStoreStats().setRegionExecutor(this.regionEventExecutor);
        this.gContext.getSupervisor().getCacheManager().addRegionEventExecutor(this.regionEventExecutor);
        this.pageStore = new PageStoreHashKListImpl(this, pageIndex, this.regionEventExecutor);
        this.writeBuffer = new WriteBufferKListHashImpl(this, this.regionEventExecutor, this.pageStore);
        this.readCopy = gRegionContext.getGContext().getGConfiguration().isReadCopy();
        this.keySerializer = gRegionContext.getPageSerdeFlink().getKeySerde();
        this.elementSerializer = gRegionContext.getPageSerdeFlink().getValueSerde();
    }

    public GRegionKListImpl(GRegionContext gRegionContext) {
        this.gRegionContext = (GRegionContext) Preconditions.checkNotNull(gRegionContext);
        this.gContext = gRegionContext.getGContext();
        this.regionEventExecutor = this.gContext.getSupervisor().getRegionExecutorGroup().next();
        gRegionContext.getPageStoreStats().setRegionExecutor(this.regionEventExecutor);
        this.gContext.getSupervisor().getCacheManager().addRegionEventExecutor(this.regionEventExecutor);
        this.pageStore = new PageStoreHashKListImpl(this, this.regionEventExecutor);
        this.writeBuffer = new WriteBufferKListHashImpl(this, this.regionEventExecutor, this.pageStore);
        this.readCopy = gRegionContext.getGContext().getGConfiguration().isReadCopy();
        this.keySerializer = gRegionContext.getPageSerdeFlink().getKeySerde();
        this.elementSerializer = gRegionContext.getPageSerdeFlink().getValueSerde();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GRegion
    public GRegionContext getGRegionContext() {
        return this.gRegionContext;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GRegion
    public GRegionID getRegionId() {
        return this.gRegionContext.getRegionId();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GRegion
    public WriteBufferKList<K, E> getWriteBuffer() {
        return this.writeBuffer;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GRegion
    public PageStoreKList<K, E> getPageStore() {
        return this.pageStore;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Executor
    public EventExecutor getExecutor() {
        return this.regionEventExecutor;
    }

    public void put(K k, List<E> list) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        long nextSeqID = this.gRegionContext.getNextSeqID();
        HashSet hashSet = new HashSet();
        this.writeBuffer.put(k, list.stream().map(obj -> {
            long j = nextSeqID;
            if (!hashSet.add(obj)) {
                j = this.gRegionContext.getNextSeqID();
            }
            return GSValue.of(obj, GValueType.PutValue, j);
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public List<E> get(K k) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        return internalGet(k, true);
    }

    public List<E> getOrDefault(K k, List<E> list) {
        this.gContext.checkDBStatus();
        List<E> list2 = get((GRegionKListImpl<K, E>) k);
        return list2 == null ? list : list2;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public void remove(K k) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        this.writeBuffer.removeKey(k);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public Map<K, List<E>> getAll() {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        HashMap hashMap = new HashMap();
        getAll(hashMap);
        return hashMap;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public void getAll(Map<K, List<E>> map) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        HashSet hashSet = new HashSet();
        this.writeBuffer.allKeysIncludeDeleted(hashSet);
        this.pageStore.allKeysIncludeDeleted(hashSet);
        for (E e : hashSet) {
            List<E> list = get((GRegionKListImpl<K, E>) e);
            if (list != null) {
                map.put(copyKeyIfNeeded(e), list);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public void removeAll() {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        this.gRegionContext.setRemoveAllSeqID(this.gRegionContext.getLastSeqID());
        this.writeBuffer.reset();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public Iterable<K> keys() {
        this.gContext.checkDBStatus();
        return getAll().keySet();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public boolean contains(K k) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        List<E> internalGet = internalGet(k, false);
        return (internalGet == null || internalGet.isEmpty()) ? false : true;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void add(K k, E e) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        this.writeBuffer.add(k, e);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void addAll(K k, Collection<? extends E> collection) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        this.writeBuffer.addAll(k, collection);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void remove(K k, E e) {
        this.gContext.checkDBStatus();
        List<E> list = get((GRegionKListImpl<K, E>) k);
        if (list != null) {
            list.remove(e);
            if (list.isEmpty()) {
                remove(k);
            } else {
                put((GRegionKListImpl<K, E>) k, (List) list);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GCommonKList
    public void removeAll(K k, Collection<? extends E> collection) {
        this.gContext.checkDBStatus();
        List<E> list = get((GRegionKListImpl<K, E>) k);
        if (list != null) {
            list.removeAll(collection);
            if (list.isEmpty()) {
                remove(k);
            } else {
                put((GRegionKListImpl<K, E>) k, (List) list);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GExternalKList
    public E poll(K k) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        List<E> internalGet = internalGet(k, false);
        if (internalGet == null) {
            return null;
        }
        E remove = internalGet.remove(0);
        remove(k, remove);
        return copyElementIfNeeded(remove);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.GExternalKList
    public E peek(K k) {
        this.gContext.checkDBStatus();
        this.gContext.incAccessNumber();
        List<E> internalGet = internalGet(k, false);
        if (internalGet == null) {
            return null;
        }
        return copyElementIfNeeded(internalGet.get(0));
    }

    @VisibleForTesting
    public List<E> mergeWriteBufferAndPageStore(List<GSValue<E>> list, List<GSValue<E>> list2) {
        return mergeWriteBufferAndPageStore(list, list2, false);
    }

    @VisibleForTesting
    public List<E> mergeWriteBufferAndPageStore(List<GSValue<E>> list, List<GSValue<E>> list2, boolean z) {
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet();
        updateListFromGSVList(linkedList, list2, null, false);
        if (list2 != null) {
            hashSet.addAll(list2);
        }
        updateListFromGSVList(linkedList, list, hashSet, z);
        if (linkedList.isEmpty()) {
            return null;
        }
        return linkedList;
    }

    private List<E> internalGet(K k, boolean z) {
        GSValue<List<GSValue<E>>> gSValue = this.writeBuffer.get(k);
        if (gSValue != null) {
            if (gSValue.getValueType() == GValueType.Delete) {
                return null;
            }
            if (gSValue.getValueType() == GValueType.PutList) {
                return getListFromCompcatedList(gSValue.getValue(), z);
            }
        }
        List<E> mergeWriteBufferAndPageStore = mergeWriteBufferAndPageStore(gSValue == null ? null : gSValue.getValue(), this.pageStore.get((PageStoreKList<K, E>) k), z);
        if (mergeWriteBufferAndPageStore == null || mergeWriteBufferAndPageStore.isEmpty()) {
            return null;
        }
        return mergeWriteBufferAndPageStore;
    }

    private List<E> getListFromCompcatedList(@Nonnull List<GSValue<E>> list, boolean z) {
        LinkedList linkedList = new LinkedList();
        updateListFromGSVList(linkedList, list, null, z);
        if (linkedList.isEmpty()) {
            return null;
        }
        return linkedList;
    }

    private void updateListFromGSVList(List<E> list, List<GSValue<E>> list2, Set<GSValue<E>> set, boolean z) {
        if (list2 == null || list2 == null) {
            return;
        }
        for (GSValue<E> gSValue : list2) {
            if (!this.gRegionContext.filterState(gSValue.getSeqID()) && (set == null || !set.contains(gSValue))) {
                if (gSValue.getValueType() == GValueType.Delete) {
                    Iterator<E> it = list.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            if (it.next().equals(gSValue.getValue())) {
                                it.remove();
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                } else {
                    if (gSValue.getValueType() != GValueType.PutValue) {
                        throw new GeminiRuntimeException("Unexpected type for list element: " + gSValue.getValueType());
                    }
                    E value = gSValue.getValue();
                    list.add(z ? copyElementIfNeeded(value) : value);
                }
            }
        }
    }

    private K copyKeyIfNeeded(K k) {
        return this.readCopy ? (K) this.keySerializer.copy(k) : k;
    }

    private E copyElementIfNeeded(E e) {
        return this.readCopy ? (E) this.elementSerializer.copy(e) : e;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public /* bridge */ /* synthetic */ Object getOrDefault(Object obj, Object obj2) {
        return getOrDefault((GRegionKListImpl<K, E>) obj, (List) obj2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public /* bridge */ /* synthetic */ Object get(Object obj) {
        return get((GRegionKListImpl<K, E>) obj);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.gemini.engine.GeminiKV
    public /* bridge */ /* synthetic */ void put(Object obj, Object obj2) {
        put((GRegionKListImpl<K, E>) obj, (List) obj2);
    }
}
