/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.subkeyed;

import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.HashPartitioner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.BatchPutWrapper;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StorageInstance;
import org.apache.flink.runtime.state.StorageIterator;
import org.apache.flink.runtime.state.heap.HeapStateStorage;
import org.apache.flink.runtime.state.heap.internal.StateTable;
import org.apache.flink.runtime.state.subkeyed.AbstractSubKeyedMapState;
import org.apache.flink.types.Pair;
import org.apache.flink.util.Preconditions;

abstract class AbstractSubKeyedMapStateImpl<K, N, MK, MV, M extends Map<MK, MV>>
implements AbstractSubKeyedMapState<K, N, MK, MV, M> {
    protected final StateStorage stateStorage;
    protected byte[] stateNameByte;
    protected byte[] stateNameForSerialize;
    protected int serializedStateNameLength;
    protected TypeSerializer<K> keySerializer;
    protected TypeSerializer<MK> mapKeySerializer;
    protected TypeSerializer<MV> mapValueSerializer;
    protected TypeSerializer<N> namespaceSerializer;
    private AbstractInternalStateBackend internalStateBackend;
    protected ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos();
    protected DataOutputView outputView = new DataOutputViewStreamWrapper((OutputStream)this.outputStream);
    protected final HashPartitioner partitioner = HashPartitioner.INSTANCE;

    AbstractSubKeyedMapStateImpl(AbstractInternalStateBackend backend, StateStorage stateStorage) {
        this.internalStateBackend = (AbstractInternalStateBackend)Preconditions.checkNotNull((Object)backend);
        this.stateStorage = (StateStorage)Preconditions.checkNotNull((Object)stateStorage);
    }

    abstract M createMap();

    @Override
    public boolean contains(K key, N namespace, MK mapKey) {
        if (key == null || namespace == null) {
            return false;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Object map = this.get((Object)key, (Object)namespace);
                return map != null && map.containsKey(mapKey);
            }
            this.outputStream.reset();
            byte[] serializedKey = StateSerializerUtil.getSerializedKeyForSubKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            return this.stateStorage.get(serializedKey) != null;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public boolean contains(K key, N namespace) {
        if (key == null || namespace == null) {
            return false;
        }
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key, (Object)namespace);
            return map != null;
        }
        Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key, namespace);
        return iterator.hasNext();
    }

    @Override
    public MV get(K key, N namespace, MK mapKey) {
        return this.getOrDefault(key, namespace, mapKey, null);
    }

    @Override
    public MV getOrDefault(K key, N namespace, MK mapKey, MV defaultMapValue) {
        if (key == null || namespace == null) {
            return defaultMapValue;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                if (map == null) {
                    return defaultMapValue;
                }
                Object value = map.get(mapKey);
                return (MV)(value == null ? defaultMapValue : value);
            }
            this.outputStream.reset();
            byte[] serializedKey = StateSerializerUtil.getSerializedKeyForSubKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            byte[] serializedValue = (byte[])this.stateStorage.get(serializedKey);
            if (serializedValue == null) {
                return defaultMapValue;
            }
            return StateSerializerUtil.getDeserializeSingleValue(serializedValue, this.mapValueSerializer);
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public M get(K key, N namespace) {
        return this.getOrDefault(key, namespace, (M)null);
    }

    @Override
    public M getOrDefault(K key, N namespace, M defaultMap) {
        if (key == null || namespace == null) {
            return defaultMap;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                return (M)(map == null || map.isEmpty() ? defaultMap : map);
            }
            Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key, namespace);
            M result = this.createMap();
            while (iterator.hasNext()) {
                Map.Entry<MK, MV> entry = iterator.next();
                result.put(entry.getKey(), entry.getValue());
            }
            return result.isEmpty() ? defaultMap : result;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public M getAll(K key, N namespace, Collection<? extends MK> mapKeys) {
        if (key == null || namespace == null || mapKeys == null || mapKeys.isEmpty()) {
            return null;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Object map = this.get((Object)key, (Object)namespace);
                if (map == null) {
                    return null;
                }
                Map results = null;
                for (MK mapKey : mapKeys) {
                    Object value = map.get(mapKey);
                    if (value == null) continue;
                    if (results == null) {
                        results = (Map)this.createMap();
                    }
                    results.put(mapKey, value);
                }
                return (M)results;
            }
            M result = this.createMap();
            this.outputStream.reset();
            StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            int prefixLength = this.outputStream.getPosition();
            for (MK mapKey : mapKeys) {
                if (mapKey == null) continue;
                this.outputStream.setPosition(prefixLength);
                StateSerializerUtil.serializeItemWithKeyPrefix(this.outputView, mapKey, this.mapKeySerializer);
                byte[] byteValue = (byte[])this.stateStorage.get(this.outputStream.toByteArray());
                if (byteValue == null) continue;
                MV value = StateSerializerUtil.getDeserializeSingleValue(byteValue, this.mapValueSerializer);
                result.put(mapKey, value);
            }
            return result.isEmpty() ? null : (M)result;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Map<N, M> getAll(K key) {
        if (key == null) {
            return Collections.emptyMap();
        }
        try {
            if (this.stateStorage.lazySerde()) {
                return ((HeapStateStorage)this.stateStorage).getAll(key);
            }
            this.outputStream.reset();
            byte[] prefix = StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, null, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            StorageIterator iterator = this.stateStorage.prefixIterator(prefix);
            HashMap<N, Map> result = new HashMap<N, Map>();
            while (iterator.hasNext()) {
                Pair pair = (Pair)iterator.next();
                N namespace = StateSerializerUtil.getDeserializedNamespcae((byte[])pair.getKey(), this.keySerializer, this.namespaceSerializer, this.serializedStateNameLength);
                MK mapKey = StateSerializerUtil.getDeserializedMapKeyForSubKeyedMapState((byte[])pair.getKey(), this.keySerializer, this.namespaceSerializer, this.mapKeySerializer, this.serializedStateNameLength);
                MV mapValue = StateSerializerUtil.getDeserializeSingleValue((byte[])pair.getValue(), this.mapValueSerializer);
                Map innerMap = (Map)result.get(namespace);
                if (innerMap == null) {
                    innerMap = this.createMap();
                    result.put(namespace, innerMap);
                }
                innerMap.put(mapKey, mapValue);
            }
            return result;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void add(K key, N namespace, MK mapKey, MV mapValue) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(namespace);
        Preconditions.checkNotNull(mapKey);
        Preconditions.checkNotNull(mapValue);
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                if (map == null) {
                    map = this.createMap();
                    heapStateStorage.put(key, map);
                }
                map.put(mapKey, mapValue);
            } else {
                this.outputStream.reset();
                byte[] serializedKey = StateSerializerUtil.getSerializedKeyForSubKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                this.outputStream.reset();
                this.mapValueSerializer.serialize(mapValue, this.outputView);
                byte[] serializedValue = this.outputStream.toByteArray();
                this.stateStorage.put(serializedKey, serializedValue);
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void addAll(K key, N namespace, Map<? extends MK, ? extends MV> mappings) {
        block18: {
            Preconditions.checkNotNull(key);
            Preconditions.checkNotNull(namespace);
            if (mappings == null || mappings.isEmpty()) {
                return;
            }
            try {
                if (this.stateStorage.lazySerde()) {
                    HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                    heapStateStorage.setCurrentNamespace(namespace);
                    Map map = (Map)heapStateStorage.get(key);
                    if (map == null) {
                        map = this.createMap();
                        heapStateStorage.put(key, map);
                    }
                    map.putAll(mappings);
                    break block18;
                }
                StorageInstance instance = this.stateStorage.getStorageInstance();
                try (BatchPutWrapper batchPutWrapper = instance.getBatchPutWrapper();){
                    this.outputStream.reset();
                    StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                    int prefixLength = this.outputStream.getPosition();
                    ByteArrayOutputStreamWithPos valueOutputStream = new ByteArrayOutputStreamWithPos();
                    DataOutputViewStreamWrapper valueOutputView = new DataOutputViewStreamWrapper((OutputStream)valueOutputStream);
                    for (Map.Entry<MK, MV> entry : mappings.entrySet()) {
                        Preconditions.checkNotNull(entry.getKey(), (String)"Can not insert null key to mapstate");
                        Preconditions.checkNotNull(entry.getValue(), (String)"Can not insert null value to mapstate");
                        this.outputStream.setPosition(prefixLength);
                        StateSerializerUtil.serializeItemWithKeyPrefix(this.outputView, entry.getKey(), this.mapKeySerializer);
                        byte[] byteKey = this.outputStream.toByteArray();
                        valueOutputStream.reset();
                        this.mapValueSerializer.serialize(entry.getValue(), (DataOutputView)valueOutputView);
                        byte[] byteValue = valueOutputStream.toByteArray();
                        batchPutWrapper.put(byteKey, byteValue);
                    }
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
        }
    }

    @Override
    public void remove(K key, N namespace) {
        if (key == null || namespace == null) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                heapStateStorage.remove(key);
            } else {
                Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key, namespace);
                while (iterator.hasNext()) {
                    iterator.next();
                    iterator.remove();
                }
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void remove(K key, N namespace, MK mapKey) {
        if (key == null || namespace == null) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                if (map != null) {
                    map.remove(mapKey);
                    if (map.isEmpty()) {
                        heapStateStorage.remove(key);
                    }
                }
            } else {
                this.outputStream.reset();
                byte[] serializedKey = StateSerializerUtil.getSerializedKeyForSubKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                this.stateStorage.remove(serializedKey);
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void removeAll(K key, N namespace, Collection<? extends MK> mapKeys) {
        if (key == null || namespace == null || mapKeys.isEmpty()) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                if (map != null) {
                    for (MK mk : mapKeys) {
                        map.remove(mk);
                    }
                    if (map.isEmpty()) {
                        heapStateStorage.remove(key);
                    }
                }
            } else {
                this.outputStream.reset();
                StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, namespace, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                int prefixLength = this.outputStream.getPosition();
                for (MK mapKey : mapKeys) {
                    this.outputStream.setPosition(prefixLength);
                    StateSerializerUtil.serializeItemWithKeyPrefix(this.outputView, mapKey, this.mapKeySerializer);
                    this.stateStorage.remove(this.outputStream.toByteArray());
                }
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void removeAll(K key) {
        if (key == null) {
            return;
        }
        if (this.stateStorage.lazySerde()) {
            ((HeapStateStorage)this.stateStorage).removeAll(key);
        } else {
            try {
                this.outputStream.reset();
                byte[] prefix = StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, null, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                StorageIterator iterator = this.stateStorage.prefixIterator(prefix);
                while (iterator.hasNext()) {
                    this.stateStorage.remove(((Pair)iterator.next()).getKey());
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
        }
    }

    @Override
    public Iterator<Map.Entry<MK, MV>> iterator(K key, N namespace) {
        Preconditions.checkNotNull(key);
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                return map == null ? Collections.emptyIterator() : map.entrySet().iterator();
            }
            return this.getDeSerializedIterator(key, namespace, this.outputStream, this.outputView, this.keySerializer, this.namespaceSerializer, this.mapKeySerializer, this.mapValueSerializer);
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Iterable<Map.Entry<MK, MV>> entries(final K key, final N namespace) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(namespace);
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                return map == null ? Collections.emptySet() : map.entrySet();
            }
            return new Iterable<Map.Entry<MK, MV>>(){

                @Override
                public Iterator<Map.Entry<MK, MV>> iterator() {
                    final Iterator innerIter = AbstractSubKeyedMapStateImpl.this.iterator(key, namespace);
                    return new Iterator<Map.Entry<MK, MV>>(){

                        @Override
                        public boolean hasNext() {
                            return innerIter.hasNext();
                        }

                        @Override
                        public Map.Entry<MK, MV> next() {
                            return (Map.Entry)innerIter.next();
                        }

                        @Override
                        public void remove() {
                            innerIter.remove();
                        }
                    };
                }
            };
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Iterable<MK> keys(final K key, final N namespace) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(namespace);
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                return map == null ? Collections.emptySet() : map.keySet();
            }
            return new Iterable<MK>(){

                @Override
                public Iterator<MK> iterator() {
                    final Iterator innerIter = AbstractSubKeyedMapStateImpl.this.iterator(key, namespace);
                    return new Iterator<MK>(){

                        @Override
                        public boolean hasNext() {
                            return innerIter.hasNext();
                        }

                        @Override
                        public MK next() {
                            return ((Map.Entry)innerIter.next()).getKey();
                        }

                        @Override
                        public void remove() {
                            innerIter.remove();
                        }
                    };
                }
            };
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Iterable<MV> values(final K key, final N namespace) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(namespace);
        try {
            if (this.stateStorage.lazySerde()) {
                HeapStateStorage heapStateStorage = (HeapStateStorage)this.stateStorage;
                heapStateStorage.setCurrentNamespace(namespace);
                Map map = (Map)heapStateStorage.get(key);
                return map == null ? Collections.emptySet() : map.values();
            }
            return new Iterable<MV>(){

                @Override
                public Iterator<MV> iterator() {
                    final Iterator innerIter = AbstractSubKeyedMapStateImpl.this.iterator(key, namespace);
                    return new Iterator<MV>(){

                        @Override
                        public boolean hasNext() {
                            return innerIter.hasNext();
                        }

                        @Override
                        public MV next() {
                            return ((Map.Entry)innerIter.next()).getValue();
                        }

                        @Override
                        public void remove() {
                            innerIter.remove();
                        }
                    };
                }
            };
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Iterator<N> iterator(final K key) {
        Preconditions.checkNotNull(key);
        if (this.stateStorage.lazySerde()) {
            return ((HeapStateStorage)this.stateStorage).namespaceIterator(key);
        }
        try {
            this.outputStream.reset();
            byte[] prefix = StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(this.outputStream, this.outputView, key, this.keySerializer, null, this.namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            StorageIterator iterator = this.stateStorage.prefixIterator(prefix);
            HashSet<N> namespaceSet = new HashSet<N>();
            try {
                while (iterator.hasNext()) {
                    namespaceSet.add(StateSerializerUtil.getDeserializedNamespcae((byte[])((Pair)iterator.next()).getKey(), this.keySerializer, this.namespaceSerializer, this.serializedStateNameLength));
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
            final Iterator namespaceIterator = namespaceSet.iterator();
            return new Iterator<N>(){
                private N namespace = null;

                @Override
                public boolean hasNext() {
                    return namespaceIterator.hasNext();
                }

                @Override
                public N next() {
                    this.namespace = namespaceIterator.next();
                    return this.namespace;
                }

                @Override
                public void remove() {
                    if (this.namespace == null) {
                        throw new IllegalStateException();
                    }
                    namespaceIterator.remove();
                    AbstractSubKeyedMapStateImpl.this.remove(key, this.namespace);
                }
            };
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Iterable<K> keys(N namespace) {
        Preconditions.checkNotNull(namespace, (String)"Do not support null as namespace.");
        Set<Object> keys = new HashSet();
        if (this.stateStorage.lazySerde()) {
            StateTable stateTable = ((HeapStateStorage)this.stateStorage).getStateTable();
            keys = stateTable.getKeys(namespace).collect(Collectors.toSet());
        } else {
            try {
                for (int group : this.internalStateBackend.getKeyGroupRange()) {
                    this.outputStream.reset();
                    StateSerializerUtil.serializeGroupPrefix(this.outputStream, group, this.stateNameForSerialize);
                    StorageIterator iterator = this.stateStorage.prefixIterator(this.outputStream.toByteArray());
                    while (iterator.hasNext()) {
                        Pair pair = (Pair)iterator.next();
                        Pair<K, N> keyAndNamespace = StateSerializerUtil.getDeserializedKeyAndNamespace((byte[])pair.getKey(), this.keySerializer, this.namespaceSerializer, this.serializedStateNameLength);
                        if (!namespace.equals(keyAndNamespace.getValue())) continue;
                        keys.add(keyAndNamespace.getKey());
                    }
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
        }
        return keys.isEmpty() ? null : keys;
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<M> safeValueSerializer) throws Exception {
        Object result;
        Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
        Object key = keyAndNamespace.f0;
        Object namespace = keyAndNamespace.f1;
        MapSerializer mapSerializer = (MapSerializer)safeValueSerializer;
        TypeSerializer dupUserKeySerializer = mapSerializer.getKeySerializer();
        TypeSerializer dupUserValueSerializer = mapSerializer.getValueSerializer();
        if (this.stateStorage.lazySerde()) {
            result = this.get(key, namespace);
            if (result == null) {
                return null;
            }
        } else {
            ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper((OutputStream)baos);
            Iterator<Map.Entry<MK, MV>> iterator = this.getDeSerializedIterator(key, namespace, baos, (DataOutputView)view, safeKeySerializer, safeNamespaceSerializer, dupUserKeySerializer, dupUserValueSerializer);
            result = this.createMap();
            while (iterator.hasNext()) {
                Map.Entry<MK, MV> entry = iterator.next();
                if (entry.getKey() == null || entry.getValue() == null) continue;
                result.put(entry.getKey(), entry.getValue());
            }
            if (result.isEmpty()) {
                return null;
            }
        }
        return KvStateSerializer.serializeMap(result.entrySet(), (TypeSerializer)dupUserKeySerializer, (TypeSerializer)dupUserValueSerializer);
    }

    private Iterator<Map.Entry<MK, MV>> getDeSerializedIterator(K key, N namespace, ByteArrayOutputStreamWithPos outputStream, DataOutputView outputView, final TypeSerializer<K> keySerializer, final TypeSerializer<N> namespaceSerializer, final TypeSerializer<MK> dupUserKeySerializer, final TypeSerializer<MV> dupUserValueSerializer) throws Exception {
        outputStream.reset();
        byte[] prefix = StateSerializerUtil.getSerializedPrefixKeyForSubKeyedState(outputStream, outputView, key, keySerializer, namespace, namespaceSerializer, this.getKeyGroup(key), this.stateNameForSerialize);
        final StorageIterator iterator = this.stateStorage.prefixIterator(prefix);
        return new Iterator<Map.Entry<MK, MV>>(){
            Pair<byte[], byte[]> pair;

            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public void remove() {
                iterator.remove();
            }

            @Override
            public Map.Entry<MK, MV> next() {
                this.pair = (Pair)iterator.next();
                return new Map.Entry<MK, MV>(){

                    @Override
                    public MK getKey() {
                        try {
                            return StateSerializerUtil.getDeserializedMapKeyForSubKeyedMapState((byte[])pair.getKey(), keySerializer, namespaceSerializer, dupUserKeySerializer, AbstractSubKeyedMapStateImpl.this.serializedStateNameLength);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }

                    @Override
                    public MV getValue() {
                        try {
                            return StateSerializerUtil.getDeserializeSingleValue((byte[])pair.getValue(), dupUserValueSerializer);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }

                    @Override
                    public MV setValue(MV value) {
                        Preconditions.checkNotNull(value);
                        try {
                            ByteArrayOutputStreamWithPos valueOutputStream = new ByteArrayOutputStreamWithPos();
                            DataOutputViewStreamWrapper valueOutputView = new DataOutputViewStreamWrapper((OutputStream)valueOutputStream);
                            dupUserValueSerializer.serialize(value, (DataOutputView)valueOutputView);
                            return StateSerializerUtil.getDeserializeSingleValue((byte[])pair.setValue((Object)valueOutputStream.toByteArray()), dupUserValueSerializer);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }
                };
            }
        };
    }

    @Override
    public StateStorage<K, M> getStateStorage() {
        return this.stateStorage;
    }

    protected <K> int getKeyGroup(K key) {
        return this.partitioner.partition(key, this.internalStateBackend.getNumGroups());
    }
}

