package org.apache.flink.runtime.state.heap.internal;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/heap/internal/NestedMapsStateTable.class */
public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
    private static final Logger LOG = LoggerFactory.getLogger(NestedMapsStateTable.class);
    private final Map[] state;
    private final int keyGroupOffset;
    private final int maxParallelism;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/internal/NestedMapsStateTable$MultipleMapIterator.class */
    private static class MultipleMapIterator<K, S> implements Iterator<Map.Entry<K, S>> {
        private final Map<K, S>[] state;
        private int nextIndex = 0;
        private Iterator<Map.Entry<K, S>> currentIterator = Collections.emptyIterator();

        public MultipleMapIterator(Map<K, S>[] mapArr) {
            this.state = (Map[]) Preconditions.checkNotNull(mapArr);
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.currentIterator.hasNext()) {
                return true;
            }
            Map<K, S>[] mapArr = this.state;
            int i = this.nextIndex;
            int length = mapArr.length;
            while (i < length) {
                Map<K, S> map = mapArr[i];
                if (map != null && !map.isEmpty()) {
                    this.currentIterator = map.entrySet().iterator();
                    this.nextIndex = i + 1;
                    return true;
                }
                i++;
            }
            this.nextIndex = i;
            return false;
        }

        @Override // java.util.Iterator
        public Map.Entry<K, S> next() {
            if (hasNext()) {
                return this.currentIterator.next();
            }
            throw new NoSuchElementException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/internal/NestedMapsStateTable$NestedMapsStateTableSnapshot.class */
    public static class NestedMapsStateTableSnapshot<K, N, S> extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>> {
        NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> nestedMapsStateTable) {
            super(nestedMapsStateTable);
        }

        @Override // org.apache.flink.runtime.state.heap.internal.StateTableSnapshot
        public int writeMappingsInKeyGroup(DataOutputView dataOutputView, int i) throws IOException {
            Map mapForKeyGroup = ((NestedMapsStateTable) this.owningStateTable).getMapForKeyGroup(i);
            if (mapForKeyGroup == null) {
                dataOutputView.writeInt(0);
                return 0;
            }
            TypeSerializer<K> keySerializer = ((NestedMapsStateTable) this.owningStateTable).getKeySerializer();
            TypeSerializer<N> namespaceSerializer = ((NestedMapsStateTable) this.owningStateTable).getNamespaceSerializer();
            TypeSerializer<S> stateSerializer = ((NestedMapsStateTable) this.owningStateTable).getStateSerializer();
            boolean isUsingNamespace = ((NestedMapsStateTable) this.owningStateTable).isUsingNamespace();
            int countMappingsInKeyGroupWithNamespace = isUsingNamespace ? NestedMapsStateTable.countMappingsInKeyGroupWithNamespace(mapForKeyGroup) : NestedMapsStateTable.countMappingsInKeyGroup(mapForKeyGroup);
            dataOutputView.writeInt(countMappingsInKeyGroupWithNamespace);
            if (isUsingNamespace) {
                for (Map.Entry entry : mapForKeyGroup.entrySet()) {
                    Object key = entry.getKey();
                    for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                        keySerializer.serialize(key, dataOutputView);
                        namespaceSerializer.serialize(entry2.getKey(), dataOutputView);
                        stateSerializer.serialize(entry2.getValue(), dataOutputView);
                    }
                }
            } else {
                for (Map.Entry entry3 : mapForKeyGroup.entrySet()) {
                    keySerializer.serialize(entry3.getKey(), dataOutputView);
                    stateSerializer.serialize(entry3.getValue(), dataOutputView);
                }
            }
            return countMappingsInKeyGroupWithNamespace;
        }
    }

    public NestedMapsStateTable(AbstractInternalStateBackend abstractInternalStateBackend, RegisteredStateMetaInfo registeredStateMetaInfo, boolean z) {
        super(abstractInternalStateBackend, registeredStateMetaInfo, z);
        KeyGroupRange keyGroupRange = abstractInternalStateBackend.getKeyGroupRange();
        this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
        this.maxParallelism = abstractInternalStateBackend.getNumGroups();
        this.state = new Map[keyGroupRange.getNumberOfKeyGroups()];
    }

    @VisibleForTesting
    public Map[] getState() {
        return this.state;
    }

    @VisibleForTesting
    Map getMapForKeyGroup(int i) {
        int indexToOffset = indexToOffset(i);
        if (indexToOffset < 0 || indexToOffset >= this.state.length) {
            return null;
        }
        return this.state[indexToOffset];
    }

    private void setMapForKeyGroup(int i, Map map) {
        try {
            this.state[indexToOffset(i)] = map;
        } catch (ArrayIndexOutOfBoundsException e) {
            throw new IllegalArgumentException("Key group index " + i + " is out of range of key group range [" + this.keyGroupOffset + ", " + (this.keyGroupOffset + this.state.length) + ").");
        }
    }

    private int indexToOffset(int i) {
        return i - this.keyGroupOffset;
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public int size() {
        int i = 0;
        if (this.usingNamespace) {
            for (Map map : this.state) {
                if (null != map) {
                    for (Map map2 : map.values()) {
                        if (null != map2) {
                            i += map2.size();
                        }
                    }
                }
            }
        } else {
            for (Map map3 : this.state) {
                if (null != map3) {
                    i += map3.size();
                }
            }
        }
        return i;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public S get(K k, N n) {
        checkKeyNamespacePreconditions(k, n);
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        K k2 = k;
        if (mapForKeyGroup == null) {
            return null;
        }
        if (this.usingNamespace) {
            mapForKeyGroup = (Map) mapForKeyGroup.get(k);
            k2 = n;
        }
        if (mapForKeyGroup == null) {
            return null;
        }
        return (S) mapForKeyGroup.get(k2);
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public void put(K k, N n, S s) {
        putAndGetOld(k, n, s);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public S putAndGetOld(K k, N n, S s) {
        checkKeyNamespacePreconditions(k, n);
        int keyGroupIndex = getKeyGroupIndex(k);
        Map mapForKeyGroup = getMapForKeyGroup(keyGroupIndex);
        K k2 = k;
        if (mapForKeyGroup == null) {
            mapForKeyGroup = new HashMap();
            setMapForKeyGroup(keyGroupIndex, mapForKeyGroup);
        }
        if (this.usingNamespace) {
            Map map = (Map) mapForKeyGroup.get(k);
            if (map == null) {
                map = new HashMap();
                mapForKeyGroup.put(k, map);
            }
            mapForKeyGroup = map;
            k2 = n;
        }
        return (S) mapForKeyGroup.put(k2, s);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public boolean containsKey(K k, N n) {
        checkKeyNamespacePreconditions(k, n);
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        K k2 = k;
        if (mapForKeyGroup == null) {
            return false;
        }
        if (this.usingNamespace) {
            mapForKeyGroup = (Map) mapForKeyGroup.get(k);
            k2 = n;
        }
        return mapForKeyGroup != null && mapForKeyGroup.containsKey(k2);
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public boolean remove(K k, N n) {
        return removeAndGetOld(k, n) != null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public S removeAndGetOld(K k, N n) {
        checkKeyNamespacePreconditions(k, n);
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        K k2 = k;
        if (mapForKeyGroup == null) {
            return null;
        }
        if (this.usingNamespace) {
            mapForKeyGroup = (Map) mapForKeyGroup.get(k);
            k2 = n;
        }
        if (mapForKeyGroup == null) {
            return null;
        }
        S s = (S) mapForKeyGroup.remove(k2);
        if (this.usingNamespace && mapForKeyGroup.isEmpty()) {
            mapForKeyGroup.remove(k);
        }
        return s;
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public Map<N, S> getAll(K k) {
        Map<N, S> map;
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        Preconditions.checkNotNull(k, "No key set. This method should not be called outside of a keyed context.");
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        if (mapForKeyGroup != null && (map = (Map) mapForKeyGroup.get(k)) != null) {
            return map;
        }
        return Collections.emptyMap();
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public void removeAll(K k) {
        Preconditions.checkNotNull(k, "No key set. This method should not be called outside of a keyed context.");
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        if (mapForKeyGroup != null) {
            mapForKeyGroup.remove(k);
        }
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public void removeAll() {
        for (int i = 0; i < this.state.length; i++) {
            Map map = this.state[i];
            if (map != null) {
                map.clear();
            }
        }
    }

    private int getKeyGroupIndex(K k) {
        return KeyGroupRangeAssignment.assignToKeyGroup(k, this.maxParallelism);
    }

    private void checkKeyNamespacePreconditions(K k, N n) {
        Preconditions.checkNotNull(k, "No key set. This method should not be called outside of a keyed context.");
        if (this.usingNamespace) {
            Preconditions.checkNotNull(n, "Provided namespace is null.");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public <T> void transform(K k, N n, T t, StateTransformationFunction<S, T> stateTransformationFunction) throws Exception {
        checkKeyNamespacePreconditions(k, n);
        int keyGroupIndex = getKeyGroupIndex(k);
        Map mapForKeyGroup = getMapForKeyGroup(keyGroupIndex);
        K k2 = k;
        if (mapForKeyGroup == null) {
            mapForKeyGroup = new HashMap();
            setMapForKeyGroup(keyGroupIndex, mapForKeyGroup);
        }
        if (this.usingNamespace) {
            Map map = (Map) mapForKeyGroup.get(k);
            if (map == null) {
                map = new HashMap();
                mapForKeyGroup.put(k, map);
            }
            mapForKeyGroup = map;
            k2 = n;
        }
        mapForKeyGroup.put(k2, stateTransformationFunction.apply(mapForKeyGroup.get(k2), t));
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public Stream<K> getKeys(N n) {
        if (!this.usingNamespace) {
            return Arrays.stream(this.state).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap(map -> {
                return map.keySet().stream();
            });
        }
        HashSet hashSet = new HashSet();
        for (Map map2 : this.state) {
            if (map2 != null) {
                for (K k : map2.keySet()) {
                    if (((Map) map2.getOrDefault(k, Collections.emptyMap())).containsKey(n)) {
                        hashSet.add(k);
                    }
                }
            }
        }
        return hashSet.stream();
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public Iterator<Map.Entry<K, S>> entryIterator() {
        if (this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with no namespace");
        }
        return new MultipleMapIterator(this.state);
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public Iterator<N> namespaceIterator(K k) {
        Map map;
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        Preconditions.checkNotNull(k, "No key set. This method should not be called outside of a keyed context.");
        Map mapForKeyGroup = getMapForKeyGroup(getKeyGroupIndex(k));
        if (mapForKeyGroup != null && (map = (Map) mapForKeyGroup.get(k)) != null) {
            return map.keySet().iterator();
        }
        return Collections.emptyIterator();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int countMappingsInKeyGroupWithNamespace(Map<?, Map<?, ?>> map) {
        int i = 0;
        Iterator<Map<?, ?>> it = map.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int countMappingsInKeyGroup(Map map) {
        return map.size();
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public int sizeOfNamespace(Object obj) {
        Preconditions.checkState(isUsingNamespace());
        Preconditions.checkNotNull(obj);
        int i = 0;
        for (Map map : this.state) {
            if (null != map) {
                Map map2 = (Map) map.get(obj);
                i += map2 != null ? map2.size() : 0;
            }
        }
        return i;
    }

    @Override // org.apache.flink.runtime.state.heap.internal.StateTable
    public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
        return new NestedMapsStateTableSnapshot<>(this);
    }
}
