package org.apache.flink.runtime.state.gemini.keyed;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GRegionKListImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedListImpl;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/keyed/GeminiKeyedListStateImpl.class */
public final class GeminiKeyedListStateImpl<K, E> implements KeyedListState<K, E> {
    private final KeyedListStateDescriptor<K, E> descriptor;
    private final GTableKeyedListImpl<K, E> table;

    public GeminiKeyedListStateImpl(KeyedListStateDescriptor<K, E> keyedListStateDescriptor, GTableKeyedListImpl<K, E> gTableKeyedListImpl) {
        this.descriptor = (KeyedListStateDescriptor) Preconditions.checkNotNull(keyedListStateDescriptor);
        this.table = (GTableKeyedListImpl) Preconditions.checkNotNull(gTableKeyedListImpl);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public KeyedListStateDescriptor getDescriptor() {
        return this.descriptor;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public boolean contains(K k) {
        return getOrDefault((GeminiKeyedListStateImpl<K, E>) k, (List) null) != null;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public List<E> get(K k) {
        return getOrDefault((GeminiKeyedListStateImpl<K, E>) k, (List) null);
    }

    public List<E> getOrDefault(K k, List<E> list) {
        List<E> list2;
        if (k != null && (list2 = getRegion(k).get((GRegionKListImpl<K, E>) k)) != null) {
            return list2;
        }
        return list;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Map<K, List<E>> getAll(Collection<? extends K> collection) {
        List<E> list;
        if (collection == null || collection.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (K k : collection) {
            if (k != null && (list = get((GeminiKeyedListStateImpl<K, E>) k)) != null && !list.isEmpty()) {
                hashMap.put(k, list);
            }
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void add(K k, E e) {
        Preconditions.checkNotNull(k);
        Preconditions.checkNotNull(e, "You can not add null value to list state.");
        getRegion(k).add(k, e);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void addAll(K k, Collection<? extends E> collection) {
        Preconditions.checkNotNull(k);
        Preconditions.checkNotNull(collection, "List of values to add cannot be null.");
        if (collection.isEmpty()) {
            return;
        }
        getRegion(k).addAll(k, collection);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void addAll(Map<? extends K, ? extends Collection<? extends E>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<? extends K, ? extends Collection<? extends E>> entry : map.entrySet()) {
            addAll(entry.getKey(), entry.getValue());
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void put(K k, E e) {
        Preconditions.checkNotNull(k);
        Preconditions.checkNotNull(e, "You can not add null value to list state.");
        getRegion(k).put((GRegionKListImpl<K, E>) k, (List) new ArrayList(Arrays.asList(e)));
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void putAll(K k, Collection<? extends E> collection) {
        Preconditions.checkNotNull(k);
        ArrayList arrayList = new ArrayList();
        for (E e : collection) {
            Preconditions.checkNotNull(e, "You cannot add null to a ListState.");
            arrayList.add(e);
        }
        getRegion(k).put((GRegionKListImpl<K, E>) k, (List) arrayList);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public void putAll(Map<? extends K, ? extends Collection<? extends E>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<? extends K, ? extends Collection<? extends E>> entry : map.entrySet()) {
            putAll(entry.getKey(), entry.getValue());
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void remove(K k) {
        if (k == null) {
            return;
        }
        getRegion(k).remove(k);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public boolean remove(K k, E e) {
        if (k == null) {
            return false;
        }
        getRegion(k).remove(k, e);
        return true;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void removeAll(Collection<? extends K> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        Iterator<? extends K> it = collection.iterator();
        while (it.hasNext()) {
            remove(it.next());
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public boolean removeAll(K k, Collection<? extends E> collection) {
        if (k == null) {
            return false;
        }
        getRegion(k).removeAll(k, collection);
        return true;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public boolean removeAll(Map<? extends K, ? extends Collection<? extends E>> map) {
        if (map == null || map.isEmpty()) {
            return false;
        }
        boolean z = false;
        for (Map.Entry<? extends K, ? extends Collection<? extends E>> entry : map.entrySet()) {
            z = removeAll(entry.getKey(), entry.getValue()) || z;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Map<K, List<E>> getAll() {
        HashMap hashMap = new HashMap();
        Iterator<GRegion> regionIterator = this.table.regionIterator();
        while (regionIterator.hasNext()) {
            ((GRegionKListImpl) regionIterator.next()).getAll(hashMap);
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void removeAll() {
        Iterator<GRegion> regionIterator = this.table.regionIterator();
        while (regionIterator.hasNext()) {
            ((GRegionKListImpl) regionIterator.next()).removeAll();
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Iterable<K> keys() {
        return getAll().keySet();
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public E poll(K k) {
        return getRegion(k).poll(k);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedListState
    public E peek(K k) {
        return getRegion(k).peek(k);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public byte[] getSerializedValue(byte[] bArr, TypeSerializer<K> typeSerializer, TypeSerializer<List<E>> typeSerializer2) throws Exception {
        List list = get((GeminiKeyedListStateImpl<K, E>) KvStateSerializer.deserializeKeyAndNamespace(bArr, typeSerializer, VoidNamespaceSerializer.INSTANCE).f0);
        if (list == null) {
            return null;
        }
        TypeSerializer<E> elementSerializer = this.descriptor.getElementSerializer();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
        for (int i = 0; i < list.size(); i++) {
            elementSerializer.serialize(list.get(i), dataOutputViewStreamWrapper);
            if (i < list.size() - 1) {
                dataOutputViewStreamWrapper.writeByte(44);
            }
        }
        dataOutputViewStreamWrapper.flush();
        return byteArrayOutputStream.toByteArray();
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public StateStorage<K, List<E>> getStateStorage() {
        throw new UnsupportedOperationException();
    }

    private GRegionKListImpl<K, E> getRegion(K k) {
        return (GRegionKListImpl) this.table.getRegion(k);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public /* bridge */ /* synthetic */ Object getOrDefault(Object obj, Object obj2) {
        return getOrDefault((GeminiKeyedListStateImpl<K, E>) obj, (List) obj2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public /* bridge */ /* synthetic */ Object get(Object obj) {
        return get((GeminiKeyedListStateImpl<K, E>) obj);
    }
}
