package org.apache.flink.runtime.state.keyed;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.HashPartitioner;
import org.apache.flink.api.common.typeutils.SerializationException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.BatchPutWrapper;
import org.apache.flink.runtime.state.GroupIterator;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateIteratorUtil;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.StorageIterator;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapStateStorage;
import org.apache.flink.types.Pair;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/keyed/KeyedValueStateImpl.class */
public final class KeyedValueStateImpl<K, V> implements KeyedValueState<K, V> {
    private final KeyedValueStateDescriptor<K, V> descriptor;
    private final StateStorage stateStorage;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<V> valueSerializer;
    private final byte[] stateNameByte;
    private final byte[] stateNameForSerializer;
    private AbstractInternalStateBackend internalStateBackend;
    private static final HashPartitioner partitioner = HashPartitioner.INSTANCE;
    private ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos();
    private DataOutputView outputView = new DataOutputViewStreamWrapper(this.outputStream);

    public KeyedValueStateImpl(AbstractInternalStateBackend abstractInternalStateBackend, KeyedValueStateDescriptor<K, V> keyedValueStateDescriptor, StateStorage stateStorage) {
        this.descriptor = (KeyedValueStateDescriptor) Preconditions.checkNotNull(keyedValueStateDescriptor);
        this.stateStorage = (StateStorage) Preconditions.checkNotNull(stateStorage);
        this.internalStateBackend = (AbstractInternalStateBackend) Preconditions.checkNotNull(abstractInternalStateBackend);
        this.keySerializer = keyedValueStateDescriptor.getKeySerializer();
        this.valueSerializer = keyedValueStateDescriptor.mo2664getValueSerializer();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            StringSerializer.INSTANCE.serialize(keyedValueStateDescriptor.getName(), new DataOutputViewStreamWrapper(byteArrayOutputStream));
            this.stateNameByte = byteArrayOutputStream.toByteArray();
            this.stateNameForSerializer = stateStorage.supportMultiColumnFamilies() ? null : this.stateNameByte;
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public KeyedValueStateDescriptor getDescriptor() {
        return this.descriptor;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public boolean contains(K k) {
        if (k == null) {
            return false;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                return this.stateStorage.get(k) != null;
            }
            this.outputStream.reset();
            return this.stateStorage.get(StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, k, this.keySerializer, getKeyGroup(k), this.stateNameForSerializer)) != null;
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public V get(K k) {
        return getOrDefault(k, null);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public V getOrDefault(K k, V v) {
        if (k == null) {
            return v;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                V v2 = (V) this.stateStorage.get(k);
                return v2 == null ? v : v2;
            }
            byte[] serializedValue = getSerializedValue(k, this.outputStream, this.outputView, this.keySerializer);
            return serializedValue == null ? v : (V) StateSerializerUtil.getDeserializeSingleValue(serializedValue, this.valueSerializer);
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Map<K, V> getAll(Collection<? extends K> collection) {
        if (collection == null || collection.isEmpty()) {
            return Collections.emptyMap();
        }
        try {
            HashMap hashMap = new HashMap();
            if (this.stateStorage.lazySerde()) {
                for (K k : collection) {
                    if (k != null) {
                        Object obj = this.stateStorage.get(k);
                        if (obj != null) {
                            hashMap.put(k, obj);
                        }
                    }
                }
            } else {
                for (K k2 : collection) {
                    if (k2 != null) {
                        this.outputStream.reset();
                        byte[] bArr = (byte[]) this.stateStorage.get(StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, k2, this.keySerializer, getKeyGroup(k2), this.stateNameForSerializer));
                        if (bArr != null) {
                            hashMap.put(k2, StateSerializerUtil.getDeserializeSingleValue(bArr, this.valueSerializer));
                        }
                    }
                }
            }
            return hashMap;
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void remove(K k) {
        if (k == null) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                this.stateStorage.remove(k);
            } else {
                this.outputStream.reset();
                this.stateStorage.remove(StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, k, this.keySerializer, getKeyGroup(k), this.stateNameForSerializer));
            }
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void removeAll(Collection<? extends K> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        Iterator<? extends K> it = collection.iterator();
        while (it.hasNext()) {
            remove(it.next());
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedValueState
    public void put(K k, V v) {
        Preconditions.checkNotNull(k);
        try {
            if (this.stateStorage.lazySerde()) {
                this.stateStorage.put(k, v);
            } else {
                this.outputStream.reset();
                byte[] serializedKeyForKeyedValueState = StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, k, this.keySerializer, getKeyGroup(k), this.stateNameForSerializer);
                this.outputStream.reset();
                this.stateStorage.put(serializedKeyForKeyedValueState, StateSerializerUtil.getSerializeSingleValue(this.outputStream, this.outputView, v, this.valueSerializer));
            }
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedValueState
    public void putAll(Map<? extends K, ? extends V> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
                    this.stateStorage.put(entry.getKey(), entry.getValue());
                }
            } else {
                BatchPutWrapper batchPutWrapper = this.stateStorage.getStorageInstance().getBatchPutWrapper();
                Throwable th = null;
                try {
                    try {
                        for (Map.Entry<? extends K, ? extends V> entry2 : map.entrySet()) {
                            K key = entry2.getKey();
                            this.outputStream.reset();
                            byte[] serializedKeyForKeyedValueState = StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, key, this.keySerializer, getKeyGroup(key), this.stateNameForSerializer);
                            this.outputStream.reset();
                            batchPutWrapper.put(serializedKeyForKeyedValueState, StateSerializerUtil.getSerializeSingleValue(this.outputStream, this.outputView, entry2.getValue(), this.valueSerializer));
                        }
                        if (batchPutWrapper != null) {
                            if (0 != 0) {
                                try {
                                    batchPutWrapper.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                batchPutWrapper.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Map<K, V> getAll() {
        try {
            HashMap hashMap = new HashMap();
            if (this.stateStorage.lazySerde()) {
                StorageIterator<K, V> it = this.stateStorage.iterator();
                while (it.hasNext()) {
                    Pair<K, V> next = it.next();
                    hashMap.put(next.getKey(), next.getValue());
                }
            } else if (this.stateStorage.supportMultiColumnFamilies() || this.internalStateBackend.getStateStorages().size() <= 1) {
                StorageIterator<K, V> it2 = this.stateStorage.iterator();
                while (it2.hasNext()) {
                    Pair<K, V> next2 = it2.next();
                    hashMap.put(StateSerializerUtil.getDeserializedKeyForKeyedValueState((byte[]) next2.getKey(), this.keySerializer, this.stateStorage.supportMultiColumnFamilies() ? 0 : this.stateNameByte.length), StateSerializerUtil.getDeserializeSingleValue((byte[]) next2.getValue(), this.valueSerializer));
                }
            } else {
                Iterator<Integer> it3 = this.internalStateBackend.getKeyGroupRange().iterator();
                while (it3.hasNext()) {
                    Integer next3 = it3.next();
                    this.outputStream.reset();
                    StateSerializerUtil.serializeGroupPrefix(this.outputStream, next3.intValue(), this.stateNameByte);
                    byte[] byteArray = this.outputStream.toByteArray();
                    this.outputStream.write(127);
                    StorageIterator<K, V> subIterator = this.stateStorage.subIterator(byteArray, this.outputStream.toByteArray());
                    while (subIterator.hasNext()) {
                        Pair<K, V> next4 = subIterator.next();
                        hashMap.put(StateSerializerUtil.getDeserializedKeyForKeyedValueState((byte[]) next4.getKey(), this.keySerializer, this.stateNameByte.length), StateSerializerUtil.getDeserializeSingleValue((byte[]) next4.getValue(), this.valueSerializer));
                    }
                }
            }
            return hashMap;
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public void removeAll() {
        if (this.stateStorage.lazySerde()) {
            ((HeapStateStorage) this.stateStorage).removeAll();
            return;
        }
        try {
            if (this.stateStorage.supportMultiColumnFamilies() || this.internalStateBackend.getStateStorages().size() <= 1) {
                StorageIterator<K, V> it = this.stateStorage.iterator();
                while (it.hasNext()) {
                    it.next();
                    it.remove();
                }
            } else {
                Iterator<Integer> it2 = this.internalStateBackend.getKeyGroupRange().iterator();
                while (it2.hasNext()) {
                    Integer next = it2.next();
                    this.outputStream.reset();
                    StateSerializerUtil.serializeGroupPrefix(this.outputStream, next.intValue(), this.stateNameByte);
                    byte[] byteArray = this.outputStream.toByteArray();
                    this.outputStream.write(127);
                    StorageIterator<K, V> subIterator = this.stateStorage.subIterator(byteArray, this.outputStream.toByteArray());
                    while (subIterator.hasNext()) {
                        subIterator.next();
                        subIterator.remove();
                    }
                }
            }
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public Iterable<K> keys() {
        return new Iterable<K>() { // from class: org.apache.flink.runtime.state.keyed.KeyedValueStateImpl.1
            @Override // java.lang.Iterable
            public Iterator<K> iterator() {
                try {
                    if (KeyedValueStateImpl.this.stateStorage.lazySerde()) {
                        final StorageIterator<K, V> it = KeyedValueStateImpl.this.stateStorage.iterator();
                        return new Iterator<K>() { // from class: org.apache.flink.runtime.state.keyed.KeyedValueStateImpl.1.1
                            @Override // java.util.Iterator
                            public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override // java.util.Iterator
                            public K next() {
                                return (K) ((Pair) it.next()).getKey();
                            }

                            @Override // java.util.Iterator
                            public void remove() {
                                it.remove();
                            }
                        };
                    }
                    if (KeyedValueStateImpl.this.stateStorage.supportMultiColumnFamilies() || KeyedValueStateImpl.this.internalStateBackend.getStateStorages().size() <= 1) {
                        return StateIteratorUtil.createKeyIterator(KeyedValueStateImpl.this.stateStorage.iterator(), KeyedValueStateImpl.this.keySerializer, KeyedValueStateImpl.this.stateStorage.supportMultiColumnFamilies() ? 0 : KeyedValueStateImpl.this.stateNameByte.length);
                    }
                    ArrayList arrayList = new ArrayList();
                    Iterator<Integer> it2 = KeyedValueStateImpl.this.internalStateBackend.getKeyGroupRange().iterator();
                    while (it2.hasNext()) {
                        Integer next = it2.next();
                        KeyedValueStateImpl.this.outputStream.reset();
                        StateSerializerUtil.serializeGroupPrefix(KeyedValueStateImpl.this.outputStream, next.intValue(), KeyedValueStateImpl.this.stateNameByte);
                        byte[] byteArray = KeyedValueStateImpl.this.outputStream.toByteArray();
                        KeyedValueStateImpl.this.outputStream.write(127);
                        arrayList.add(KeyedValueStateImpl.this.stateStorage.subIterator(byteArray, KeyedValueStateImpl.this.outputStream.toByteArray()));
                    }
                    return StateIteratorUtil.createKeyIterator(new GroupIterator(arrayList), KeyedValueStateImpl.this.keySerializer, KeyedValueStateImpl.this.stateNameByte.length);
                } catch (Exception e) {
                    throw new StateAccessException(e);
                }
            }
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedValueState
    public <T> void transform(K k, T t, StateTransformationFunction<V, T> stateTransformationFunction) {
        try {
            if (this.stateStorage.lazySerde()) {
                ((HeapStateStorage) this.stateStorage).transform(k, t, stateTransformationFunction);
            } else {
                this.outputStream.reset();
                byte[] serializedKeyForKeyedValueState = StateSerializerUtil.getSerializedKeyForKeyedValueState(this.outputStream, this.outputView, k, this.keySerializer, getKeyGroup(k), this.stateNameForSerializer);
                byte[] bArr = (byte[]) this.stateStorage.get(serializedKeyForKeyedValueState);
                V apply = stateTransformationFunction.apply(bArr == null ? null : StateSerializerUtil.getDeserializeSingleValue(bArr, this.valueSerializer), t);
                this.outputStream.reset();
                this.valueSerializer.serialize(apply, this.outputView);
                this.stateStorage.put(serializedKeyForKeyedValueState, this.outputStream.toByteArray());
            }
        } catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public byte[] getSerializedValue(byte[] bArr, TypeSerializer<K> typeSerializer, TypeSerializer<V> typeSerializer2) throws Exception {
        Object obj = KvStateSerializer.deserializeKeyAndNamespace(bArr, typeSerializer, VoidNamespaceSerializer.INSTANCE).f0;
        if (!this.stateStorage.lazySerde()) {
            ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos();
            return getSerializedValue(obj, byteArrayOutputStreamWithPos, new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos), typeSerializer);
        }
        Object obj2 = get(obj);
        if (obj2 == null) {
            return null;
        }
        return KvStateSerializer.serializeValue(obj2, typeSerializer2);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedState
    public StateStorage<K, V> getStateStorage() {
        return this.stateStorage;
    }

    private <K> int getKeyGroup(K k) {
        return partitioner.partition(k, this.internalStateBackend.getNumGroups());
    }

    private byte[] getSerializedValue(K k, ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos, DataOutputView dataOutputView, TypeSerializer<K> typeSerializer) throws Exception {
        byteArrayOutputStreamWithPos.reset();
        return (byte[]) this.stateStorage.get(StateSerializerUtil.getSerializedKeyForKeyedValueState(byteArrayOutputStreamWithPos, dataOutputView, k, typeSerializer, getKeyGroup(k), this.stateNameForSerializer));
    }
}
