/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.internal;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.internal.AbstractStateTableSnapshot;
import org.apache.flink.runtime.state.heap.internal.StateTable;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class NestedMapsStateTable<K, N, S>
extends StateTable<K, N, S> {
    private static final Logger LOG = LoggerFactory.getLogger(NestedMapsStateTable.class);
    private final Map[] state;
    private final int keyGroupOffset;
    private final int maxParallelism;

    public NestedMapsStateTable(AbstractInternalStateBackend internalStateBackend, RegisteredStateMetaInfo stateMetaInfo, boolean usingNamespace) {
        super(internalStateBackend, stateMetaInfo, usingNamespace);
        KeyGroupRange groups = internalStateBackend.getKeyGroupRange();
        this.keyGroupOffset = groups.getStartKeyGroup();
        this.maxParallelism = internalStateBackend.getNumGroups();
        int numberOfKeyGroups = groups.getNumberOfKeyGroups();
        Map[] state = new Map[numberOfKeyGroups];
        this.state = state;
    }

    @VisibleForTesting
    public Map[] getState() {
        return this.state;
    }

    @VisibleForTesting
    Map getMapForKeyGroup(int keyGroupIndex) {
        int pos = this.indexToOffset(keyGroupIndex);
        if (pos >= 0 && pos < this.state.length) {
            return this.state[pos];
        }
        return null;
    }

    private void setMapForKeyGroup(int keyGroupId, Map map) {
        try {
            this.state[this.indexToOffset((int)keyGroupId)] = map;
        }
        catch (ArrayIndexOutOfBoundsException e) {
            throw new IllegalArgumentException("Key group index " + keyGroupId + " is out of range of key group range [" + this.keyGroupOffset + ", " + (this.keyGroupOffset + this.state.length) + ").");
        }
    }

    private int indexToOffset(int index) {
        return index - this.keyGroupOffset;
    }

    @Override
    public int size() {
        int count = 0;
        if (this.usingNamespace) {
            for (Map keyMap : this.state) {
                if (null == keyMap) continue;
                for (Map namspaceMap : keyMap.values()) {
                    if (null == namspaceMap) continue;
                    count += namspaceMap.size();
                }
            }
        } else {
            for (Map keyMap : this.state) {
                if (null == keyMap) continue;
                count += keyMap.size();
            }
        }
        return count;
    }

    @Override
    public S get(K key, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map map = this.getMapForKeyGroup(keyGroupIndex);
        Object lastKey = key;
        if (map == null) {
            return null;
        }
        if (this.usingNamespace) {
            map = (Map)map.get(key);
            lastKey = namespace;
        }
        if (map == null) {
            return null;
        }
        return (S)map.get(lastKey);
    }

    @Override
    public void put(K key, N namespace, S state) {
        this.putAndGetOld(key, namespace, state);
    }

    @Override
    public S putAndGetOld(K key, N namespace, S state) {
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.getKeyGroupIndex(key);
        HashMap map = this.getMapForKeyGroup(keyGroupIndex);
        Object lastKey = key;
        if (map == null) {
            map = new HashMap<K, S>();
            this.setMapForKeyGroup(keyGroupIndex, map);
        }
        if (this.usingNamespace) {
            HashMap subMap = (HashMap)map.get(key);
            if (subMap == null) {
                subMap = new HashMap();
                map.put(key, subMap);
            }
            map = subMap;
            lastKey = namespace;
        }
        return map.put((K)lastKey, state);
    }

    @Override
    public boolean containsKey(K key, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map map = this.getMapForKeyGroup(keyGroupIndex);
        Object lastKey = key;
        if (map == null) {
            return false;
        }
        if (this.usingNamespace) {
            map = (Map)map.get(key);
            lastKey = namespace;
        }
        return map != null && map.containsKey(lastKey);
    }

    @Override
    public boolean remove(K key, N namespace) {
        return this.removeAndGetOld(key, namespace) != null;
    }

    @Override
    public S removeAndGetOld(K key, N namespace) {
        Map map;
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map firstMap = map = this.getMapForKeyGroup(keyGroupIndex);
        Object lastKey = key;
        if (map == null) {
            return null;
        }
        if (this.usingNamespace) {
            map = (Map)map.get(key);
            lastKey = namespace;
        }
        if (map == null) {
            return null;
        }
        Object removed = map.remove(lastKey);
        if (this.usingNamespace && map.isEmpty()) {
            firstMap.remove(key);
        }
        return (S)removed;
    }

    @Override
    public Map<N, S> getAll(K key) {
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map map = this.getMapForKeyGroup(keyGroupIndex);
        if (map == null) {
            return Collections.emptyMap();
        }
        Map result = (Map)map.get(key);
        return result == null ? Collections.emptyMap() : result;
    }

    @Override
    public void removeAll(K key) {
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map map = this.getMapForKeyGroup(keyGroupIndex);
        if (map != null) {
            map.remove(key);
        }
    }

    @Override
    public void removeAll() {
        for (int i = 0; i < this.state.length; ++i) {
            Map map = this.state[i];
            if (map == null) continue;
            map.clear();
        }
    }

    private int getKeyGroupIndex(K key) {
        return KeyGroupRangeAssignment.assignToKeyGroup(key, this.maxParallelism);
    }

    private void checkKeyNamespacePreconditions(K key, N namespace) {
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        if (this.usingNamespace) {
            Preconditions.checkNotNull(namespace, (String)"Provided namespace is null.");
        }
    }

    @Override
    public <T> void transform(K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        this.checkKeyNamespacePreconditions(key, namespace);
        int keyGroupIndex = this.getKeyGroupIndex(key);
        HashMap map = this.getMapForKeyGroup(keyGroupIndex);
        Object lastKey = key;
        if (map == null) {
            map = new HashMap<K, S>();
            this.setMapForKeyGroup(keyGroupIndex, map);
        }
        if (this.usingNamespace) {
            HashMap subMap = (HashMap)map.get(key);
            if (subMap == null) {
                subMap = new HashMap();
                map.put(key, subMap);
            }
            map = subMap;
            lastKey = namespace;
        }
        map.put((K)lastKey, transformation.apply(map.get(lastKey), value));
    }

    @Override
    public Stream<K> getKeys(N namespace) {
        if (this.usingNamespace) {
            HashSet keys = new HashSet();
            for (Map map : this.state) {
                if (map == null) continue;
                for (Object key : map.keySet()) {
                    if (!map.getOrDefault(key, Collections.emptyMap()).containsKey(namespace)) continue;
                    keys.add(key);
                }
            }
            return keys.stream();
        }
        return Arrays.stream(this.state).filter(Objects::nonNull).flatMap(namespaceSate -> namespaceSate.keySet().stream());
    }

    @Override
    public Iterator<Map.Entry<K, S>> entryIterator() {
        if (this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with no namespace");
        }
        return new MultipleMapIterator(this.state);
    }

    @Override
    public Iterator<N> namespaceIterator(K key) {
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        int keyGroupIndex = this.getKeyGroupIndex(key);
        Map keyMap = this.getMapForKeyGroup(keyGroupIndex);
        if (keyMap == null) {
            return Collections.emptyIterator();
        }
        Map namespaceMap = (Map)keyMap.get(key);
        return namespaceMap == null ? Collections.emptyIterator() : namespaceMap.keySet().iterator();
    }

    private static int countMappingsInKeyGroupWithNamespace(Map<?, Map<?, ?>> keyGroupMap) {
        int count = 0;
        for (Map<?, ?> namespaceMap : keyGroupMap.values()) {
            count += namespaceMap.size();
        }
        return count;
    }

    private static int countMappingsInKeyGroup(Map keyGroupMap) {
        return keyGroupMap.size();
    }

    @Override
    public int sizeOfNamespace(Object namespace) {
        Preconditions.checkState((boolean)this.isUsingNamespace());
        Preconditions.checkNotNull((Object)namespace);
        int count = 0;
        for (Map namespaceMap : this.state) {
            if (null == namespaceMap) continue;
            Map keyMap = (Map)namespaceMap.get(namespace);
            count += keyMap != null ? keyMap.size() : 0;
        }
        return count;
    }

    @Override
    public NestedMapsStateTableSnapshot<K, N, S> createSnapshot() {
        return new NestedMapsStateTableSnapshot(this);
    }

    private static class MultipleMapIterator<K, S>
    implements Iterator<Map.Entry<K, S>> {
        private final Map<K, S>[] state;
        private int nextIndex;
        private Iterator<Map.Entry<K, S>> currentIterator;

        public MultipleMapIterator(Map<K, S>[] state) {
            this.state = (Map[])Preconditions.checkNotNull(state);
            this.nextIndex = 0;
            this.currentIterator = Collections.emptyIterator();
        }

        @Override
        public boolean hasNext() {
            int localNextIndex;
            if (this.currentIterator.hasNext()) {
                return true;
            }
            Map<K, S>[] localState = this.state;
            int len = localState.length;
            for (localNextIndex = this.nextIndex; localNextIndex < len; ++localNextIndex) {
                Map<K, S> map = localState[localNextIndex];
                if (map == null || map.isEmpty()) continue;
                this.currentIterator = map.entrySet().iterator();
                this.nextIndex = localNextIndex + 1;
                return true;
            }
            this.nextIndex = localNextIndex;
            return false;
        }

        @Override
        public Map.Entry<K, S> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.currentIterator.next();
        }
    }

    static class NestedMapsStateTableSnapshot<K, N, S>
    extends AbstractStateTableSnapshot<K, N, S, NestedMapsStateTable<K, N, S>> {
        NestedMapsStateTableSnapshot(NestedMapsStateTable<K, N, S> owningTable) {
            super(owningTable);
        }

        @Override
        public int writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
            Map keyGroupMap = ((NestedMapsStateTable)this.owningStateTable).getMapForKeyGroup(keyGroupId);
            if (keyGroupMap == null) {
                dov.writeInt(0);
                return 0;
            }
            TypeSerializer keySerializer = ((NestedMapsStateTable)this.owningStateTable).getKeySerializer();
            TypeSerializer namespaceSerializer = ((NestedMapsStateTable)this.owningStateTable).getNamespaceSerializer();
            TypeSerializer stateSerializer = ((NestedMapsStateTable)this.owningStateTable).getStateSerializer();
            boolean usingNamespace = ((NestedMapsStateTable)this.owningStateTable).isUsingNamespace();
            int countMappings = usingNamespace ? NestedMapsStateTable.countMappingsInKeyGroupWithNamespace(keyGroupMap) : NestedMapsStateTable.countMappingsInKeyGroup(keyGroupMap);
            dov.writeInt(countMappings);
            if (usingNamespace) {
                for (Map.Entry entry : keyGroupMap.entrySet()) {
                    Object key = entry.getKey();
                    Map namespaceMap = (Map)entry.getValue();
                    for (Map.Entry namespaceEntry : namespaceMap.entrySet()) {
                        keySerializer.serialize(key, dov);
                        namespaceSerializer.serialize(namespaceEntry.getKey(), dov);
                        stateSerializer.serialize(namespaceEntry.getValue(), dov);
                    }
                }
            } else {
                for (Map.Entry entry : keyGroupMap.entrySet()) {
                    keySerializer.serialize(entry.getKey(), dov);
                    stateSerializer.serialize(entry.getValue(), dov);
                }
            }
            return countMappings;
        }
    }
}

