/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.keyed;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.HashPartitioner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.BatchPutWrapper;
import org.apache.flink.runtime.state.GroupIterator;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateIteratorUtil;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StorageInstance;
import org.apache.flink.runtime.state.StorageIterator;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapStateStorage;
import org.apache.flink.runtime.state.keyed.AbstractKeyedMapState;
import org.apache.flink.types.Pair;
import org.apache.flink.util.Preconditions;

abstract class AbstractKeyedMapStateImpl<K, MK, MV, M extends Map<MK, MV>>
implements AbstractKeyedMapState<K, MK, MV, M> {
    protected final StateStorage stateStorage;
    protected byte[] stateNameByte;
    protected byte[] stateNameForSerialize;
    protected int serializedStateNameLength;
    protected TypeSerializer<K> keySerializer;
    protected TypeSerializer<MK> mapKeySerializer;
    protected TypeSerializer<MV> mapValueSerializer;
    private AbstractInternalStateBackend internalStateBackend;
    protected ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos();
    protected DataOutputView outputView = new DataOutputViewStreamWrapper((OutputStream)this.outputStream);
    protected static final HashPartitioner PARTITIONER = HashPartitioner.INSTANCE;

    AbstractKeyedMapStateImpl(AbstractInternalStateBackend internalStateBackend, StateStorage stateStorage) {
        this.internalStateBackend = (AbstractInternalStateBackend)Preconditions.checkNotNull((Object)internalStateBackend);
        this.stateStorage = (StateStorage)Preconditions.checkNotNull((Object)stateStorage);
    }

    abstract M createMap();

    @Override
    public boolean contains(K key) {
        if (key == null) {
            return false;
        }
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map != null;
        }
        Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key);
        return iterator.hasNext();
    }

    @Override
    public boolean contains(K key, MK mapKey) {
        if (key == null || mapKey == null) {
            return false;
        }
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map != null && map.containsKey(mapKey);
        }
        try {
            this.outputStream.reset();
            byte[] serializedKey = StateSerializerUtil.getSerializedKeyForKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            return this.stateStorage.get(serializedKey) != null;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public M get(K key) {
        return this.getOrDefault(key, (M)null);
    }

    @Override
    public M getOrDefault(K key, M defaultValue) {
        if (key == null) {
            return defaultValue;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Map map = (Map)this.stateStorage.get(key);
                return (M)(map == null || map.isEmpty() ? defaultValue : map);
            }
            Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key);
            M result = this.createMap();
            while (iterator.hasNext()) {
                Map.Entry<MK, MV> entry = iterator.next();
                if (entry.getKey() == null || entry.getValue() == null) continue;
                result.put(entry.getKey(), entry.getValue());
            }
            return result.isEmpty() ? defaultValue : result;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public MV get(K key, MK mapKey) {
        return this.getOrDefault(key, mapKey, null);
    }

    @Override
    public MV getOrDefault(K key, MK mapKey, MV defaultMapValue) {
        if (key == null || mapKey == null) {
            return defaultMapValue;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Map map = (Map)this.stateStorage.get(key);
                if (map == null) {
                    return defaultMapValue;
                }
                Object value = map.get(mapKey);
                return (MV)(value == null ? defaultMapValue : value);
            }
            this.outputStream.reset();
            byte[] serializedKey = StateSerializerUtil.getSerializedKeyForKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
            byte[] serializedValue = (byte[])this.stateStorage.get(serializedKey);
            return serializedValue == null ? defaultMapValue : StateSerializerUtil.getDeserializeSingleValue(serializedValue, this.mapValueSerializer);
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public Map<K, M> getAll(Collection<? extends K> keys) {
        if (keys == null || keys.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<K, Object> results = new HashMap<K, Object>();
        for (K key : keys) {
            Object result;
            if (key == null || (result = this.get((Object)key)) == null || result.isEmpty()) continue;
            results.put(key, result);
        }
        return results;
    }

    @Override
    public M getAll(K key, Collection<? extends MK> mapKeys) {
        if (key == null || mapKeys == null || mapKeys.isEmpty()) {
            return this.createMap();
        }
        M results = this.createMap();
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            if (map != null) {
                for (MK mapKey : mapKeys) {
                    Object value;
                    if (mapKey == null || (value = map.get(mapKey)) == null) continue;
                    results.put(mapKey, value);
                }
            }
            return results;
        }
        for (MK mapKey : mapKeys) {
            MV value;
            if (mapKey == null || (value = this.get(key, mapKey)) == null) continue;
            results.put(mapKey, value);
        }
        return results;
    }

    @Override
    public Map<K, M> getAll(Map<K, ? extends Collection<? extends MK>> map) {
        if (map == null || map.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap results = new HashMap();
        if (this.stateStorage.lazySerde()) {
            for (Map.Entry<K, Collection<MK>> entry : map.entrySet()) {
                K key = entry.getKey();
                Object keyMap = this.get((Object)key);
                if (keyMap == null) continue;
                Map subMap = null;
                for (MK mk : entry.getValue()) {
                    Object mv = keyMap.get(mk);
                    if (mv == null) continue;
                    if (subMap == null) {
                        subMap = (Map)this.createMap();
                        results.put(key, subMap);
                    }
                    subMap.put(mk, mv);
                }
            }
        } else {
            for (Map.Entry<K, Collection<MK>> entry : map.entrySet()) {
                Collection<? extends MK> mapKeys;
                K key = entry.getKey();
                M resultMap = this.getAll(key, mapKeys = entry.getValue());
                if (resultMap.isEmpty()) continue;
                results.put(key, this.getAll(key, mapKeys));
            }
        }
        return results;
    }

    @Override
    public void add(K key, MK mapKey, MV mapValue) {
        Preconditions.checkNotNull(key);
        try {
            if (this.stateStorage.lazySerde()) {
                Map map = (Map)this.stateStorage.get(key);
                if (map == null) {
                    map = this.createMap();
                    this.stateStorage.put(key, map);
                }
                map.put(mapKey, mapValue);
            } else {
                this.outputStream.reset();
                byte[] serializedKey = StateSerializerUtil.getSerializedKeyForKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                this.outputStream.reset();
                this.mapValueSerializer.serialize(mapValue, this.outputView);
                byte[] serializedValue = this.outputStream.toByteArray();
                this.stateStorage.put(serializedKey, serializedValue);
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void addAll(K key, Map<? extends MK, ? extends MV> mappings) {
        block18: {
            Preconditions.checkNotNull(key);
            if (mappings == null || mappings.isEmpty()) {
                return;
            }
            try {
                if (this.stateStorage.lazySerde()) {
                    Map map = (Map)this.stateStorage.get(key);
                    if (map == null) {
                        map = this.createMap();
                        this.stateStorage.put(key, map);
                    }
                    map.putAll(mappings);
                    break block18;
                }
                StorageInstance instance = this.stateStorage.getStorageInstance();
                try (BatchPutWrapper batchPutWrapper = instance.getBatchPutWrapper();){
                    this.outputStream.reset();
                    StateSerializerUtil.getSerializedPrefixKeyForKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                    int prefixLength = this.outputStream.getPosition();
                    ByteArrayOutputStreamWithPos valueOutputStream = new ByteArrayOutputStreamWithPos();
                    DataOutputViewStreamWrapper valueOutputView = new DataOutputViewStreamWrapper((OutputStream)valueOutputStream);
                    for (Map.Entry<MK, MV> entry : mappings.entrySet()) {
                        Preconditions.checkNotNull(entry.getKey(), (String)"Can not insert null key to mapstate");
                        Preconditions.checkNotNull(entry.getValue(), (String)"Can not insert null value to mapstate");
                        this.outputStream.setPosition(prefixLength);
                        StateSerializerUtil.serializeItemWithKeyPrefix(this.outputView, entry.getKey(), this.mapKeySerializer);
                        byte[] byteKey = this.outputStream.toByteArray();
                        valueOutputStream.reset();
                        this.mapValueSerializer.serialize(entry.getValue(), (DataOutputView)valueOutputView);
                        byte[] byteValue = valueOutputStream.toByteArray();
                        batchPutWrapper.put(byteKey, byteValue);
                    }
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
        }
    }

    @Override
    public void addAll(Map<? extends K, ? extends Map<? extends MK, ? extends MV>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<K, Map<MK, MV>> entry : map.entrySet()) {
            this.addAll(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public void remove(K key) {
        if (key == null) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                this.stateStorage.remove(key);
            } else {
                Iterator<Map.Entry<MK, MV>> iterator = this.iterator(key);
                while (iterator.hasNext()) {
                    Map.Entry<MK, MV> entry = iterator.next();
                    this.remove(key, entry.getKey());
                }
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void remove(K key, MK mapKey) {
        if (key == null) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Map map = (Map)this.stateStorage.get(key);
                if (map == null) {
                    return;
                }
                map.remove(mapKey);
                if (map.isEmpty()) {
                    this.remove(key);
                }
            } else {
                this.outputStream.reset();
                byte[] serializedKey = StateSerializerUtil.getSerializedKeyForKeyedMapState(this.outputStream, this.outputView, key, this.keySerializer, mapKey, this.mapKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
                this.stateStorage.remove(serializedKey);
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void removeAll(Collection<? extends K> keys) {
        if (keys == null || keys.isEmpty()) {
            return;
        }
        for (K key : keys) {
            this.remove(key);
        }
    }

    @Override
    public void removeAll(K key, Collection<? extends MK> mapKeys) {
        if (key == null || mapKeys == null || mapKeys.isEmpty()) {
            return;
        }
        try {
            if (this.stateStorage.lazySerde()) {
                Map map = (Map)this.stateStorage.get(key);
                if (map != null) {
                    for (MK mapKey : mapKeys) {
                        map.remove(mapKey);
                    }
                    if (map.isEmpty()) {
                        this.remove(key);
                    }
                }
            } else {
                for (MK mapKey : mapKeys) {
                    if (mapKey == null) continue;
                    this.remove(key, mapKey);
                }
            }
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void removeAll(Map<? extends K, ? extends Collection<? extends MK>> map) {
        if (map == null || map.isEmpty()) {
            return;
        }
        for (Map.Entry<K, Collection<MK>> entry : map.entrySet()) {
            this.removeAll(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public Iterator<Map.Entry<MK, MV>> iterator(K key) {
        Preconditions.checkNotNull(key);
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map == null ? Collections.emptyIterator() : map.entrySet().iterator();
        }
        try {
            return this.getDeSerializedIterator(key, this.outputStream, this.outputView, this.keySerializer, this.mapKeySerializer, this.mapValueSerializer);
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    private Iterator<Map.Entry<MK, MV>> getDeSerializedIterator(K key, ByteArrayOutputStreamWithPos outputStream, DataOutputView outputView, final TypeSerializer<K> safeKeySerializer, final TypeSerializer<MK> safeMapKeySerializer, final TypeSerializer<MV> safeMapValueSerializer) throws Exception {
        outputStream.reset();
        byte[] keyPrefix = StateSerializerUtil.getSerializedPrefixKeyForKeyedMapState(outputStream, outputView, key, safeKeySerializer, this.getKeyGroup(key), this.stateNameForSerialize);
        final StorageIterator iterator = this.stateStorage.prefixIterator(keyPrefix);
        return new Iterator<Map.Entry<MK, MV>>(){
            Pair<byte[], byte[]> pair = null;

            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public Map.Entry<MK, MV> next() {
                this.pair = (Pair)iterator.next();
                return new Map.Entry<MK, MV>(){

                    @Override
                    public MK getKey() {
                        try {
                            return StateSerializerUtil.getDeserializedMapKeyForKeyedMapState((byte[])pair.getKey(), safeKeySerializer, safeMapKeySerializer, AbstractKeyedMapStateImpl.this.serializedStateNameLength);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }

                    @Override
                    public MV getValue() {
                        try {
                            return StateSerializerUtil.getDeserializeSingleValue((byte[])pair.getValue(), safeMapValueSerializer);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }

                    @Override
                    public MV setValue(MV value) {
                        try {
                            ByteArrayOutputStreamWithPos valueOutputStream = new ByteArrayOutputStreamWithPos();
                            DataOutputViewStreamWrapper valueOutputView = new DataOutputViewStreamWrapper((OutputStream)valueOutputStream);
                            safeMapValueSerializer.serialize(value, (DataOutputView)valueOutputView);
                            return StateSerializerUtil.getDeserializeSingleValue((byte[])pair.setValue((Object)valueOutputStream.toByteArray()), safeMapValueSerializer);
                        }
                        catch (Exception e) {
                            throw new StateAccessException(e);
                        }
                    }
                };
            }

            @Override
            public void remove() {
                iterator.remove();
            }
        };
    }

    @Override
    public Iterable<Map.Entry<MK, MV>> entries(final K key) {
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map == null ? Collections.emptySet() : map.entrySet();
        }
        return new Iterable<Map.Entry<MK, MV>>(){

            @Override
            public Iterator<Map.Entry<MK, MV>> iterator() {
                final Iterator innerIter = AbstractKeyedMapStateImpl.this.iterator(key);
                return new Iterator<Map.Entry<MK, MV>>(){

                    @Override
                    public boolean hasNext() {
                        return innerIter.hasNext();
                    }

                    @Override
                    public Map.Entry<MK, MV> next() {
                        return (Map.Entry)innerIter.next();
                    }

                    @Override
                    public void remove() {
                        innerIter.remove();
                    }
                };
            }
        };
    }

    @Override
    public Iterable<MK> mapKeys(final K key) {
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map == null ? Collections.emptySet() : map.keySet();
        }
        return new Iterable<MK>(){

            @Override
            public Iterator<MK> iterator() {
                final Iterator innerIter = AbstractKeyedMapStateImpl.this.iterator(key);
                return new Iterator<MK>(){

                    @Override
                    public boolean hasNext() {
                        return innerIter.hasNext();
                    }

                    @Override
                    public MK next() {
                        return ((Map.Entry)innerIter.next()).getKey();
                    }

                    @Override
                    public void remove() {
                        innerIter.remove();
                    }
                };
            }
        };
    }

    @Override
    public Iterable<MV> mapValues(final K key) {
        if (this.stateStorage.lazySerde()) {
            Object map = this.get((Object)key);
            return map == null ? Collections.emptySet() : map.values();
        }
        return new Iterable<MV>(){

            @Override
            public Iterator<MV> iterator() {
                final Iterator innerIter = AbstractKeyedMapStateImpl.this.iterator(key);
                return new Iterator<MV>(){

                    @Override
                    public boolean hasNext() {
                        return innerIter.hasNext();
                    }

                    @Override
                    public MV next() {
                        return ((Map.Entry)innerIter.next()).getValue();
                    }

                    @Override
                    public void remove() {
                        innerIter.remove();
                    }
                };
            }
        };
    }

    @Override
    public Map<K, M> getAll() {
        try {
            HashMap<Object, Object> result = new HashMap<Object, Object>();
            if (this.stateStorage.lazySerde()) {
                StorageIterator iterator = this.stateStorage.iterator();
                while (iterator.hasNext()) {
                    Pair pair = (Pair)iterator.next();
                    result.put(pair.getKey(), pair.getValue());
                }
            } else if (!this.stateStorage.supportMultiColumnFamilies() && this.internalStateBackend.getStateStorages().size() > 1) {
                for (Integer group : this.internalStateBackend.getKeyGroupRange()) {
                    this.outputStream.reset();
                    StateSerializerUtil.serializeGroupPrefix(this.outputStream, group, this.stateNameByte);
                    byte[] groupPrefix = this.outputStream.toByteArray();
                    this.outputStream.write(127);
                    byte[] groupPrefixEnd = this.outputStream.toByteArray();
                    StorageIterator<byte[], byte[]> iterator = this.stateStorage.subIterator(groupPrefix, groupPrefixEnd);
                    this.iteratorToMap(iterator, result, this.stateNameByte.length);
                }
            } else {
                StorageIterator<byte[], byte[]> iterator = this.stateStorage.iterator();
                this.iteratorToMap(iterator, result, this.serializedStateNameLength);
            }
            return result;
        }
        catch (Exception e) {
            throw new StateAccessException(e);
        }
    }

    @Override
    public void removeAll() {
        if (this.stateStorage.lazySerde()) {
            ((HeapStateStorage)this.stateStorage).removeAll();
        } else {
            try {
                if (!this.stateStorage.supportMultiColumnFamilies() && this.internalStateBackend.getStateStorages().size() > 1) {
                    for (Integer group : this.internalStateBackend.getKeyGroupRange()) {
                        this.outputStream.reset();
                        StateSerializerUtil.serializeGroupPrefix(this.outputStream, group, this.stateNameByte);
                        byte[] groupPrefix = this.outputStream.toByteArray();
                        this.outputStream.write(127);
                        byte[] groupPrefixEnd = this.outputStream.toByteArray();
                        StorageIterator iterator = this.stateStorage.subIterator(groupPrefix, groupPrefixEnd);
                        while (iterator.hasNext()) {
                            iterator.next();
                            iterator.remove();
                        }
                    }
                } else {
                    StorageIterator iterator = this.stateStorage.iterator();
                    while (iterator.hasNext()) {
                        iterator.next();
                        iterator.remove();
                    }
                }
            }
            catch (Exception e) {
                throw new StateAccessException(e);
            }
        }
    }

    @Override
    public Iterable<K> keys() {
        return new Iterable<K>(){

            @Override
            public Iterator<K> iterator() {
                try {
                    if (AbstractKeyedMapStateImpl.this.stateStorage.lazySerde()) {
                        final StorageIterator iterator = AbstractKeyedMapStateImpl.this.stateStorage.iterator();
                        return new Iterator<K>(){

                            @Override
                            public boolean hasNext() {
                                return iterator.hasNext();
                            }

                            @Override
                            public K next() {
                                return ((Pair)iterator.next()).getKey();
                            }

                            @Override
                            public void remove() {
                                iterator.remove();
                            }
                        };
                    }
                    if (!AbstractKeyedMapStateImpl.this.stateStorage.supportMultiColumnFamilies() && AbstractKeyedMapStateImpl.this.internalStateBackend.getStateStorages().size() > 1) {
                        ArrayList<Iterator<Pair<byte[], byte[]>>> groupIterators = new ArrayList<Iterator<Pair<byte[], byte[]>>>();
                        for (Integer group : AbstractKeyedMapStateImpl.this.internalStateBackend.getKeyGroupRange()) {
                            AbstractKeyedMapStateImpl.this.outputStream.reset();
                            StateSerializerUtil.serializeGroupPrefix(AbstractKeyedMapStateImpl.this.outputStream, group, AbstractKeyedMapStateImpl.this.stateNameByte);
                            byte[] groupPrefix = AbstractKeyedMapStateImpl.this.outputStream.toByteArray();
                            AbstractKeyedMapStateImpl.this.outputStream.write(127);
                            byte[] groupPrefixEnd = AbstractKeyedMapStateImpl.this.outputStream.toByteArray();
                            StorageIterator iterator = AbstractKeyedMapStateImpl.this.stateStorage.subIterator(groupPrefix, groupPrefixEnd);
                            groupIterators.add(iterator);
                        }
                        GroupIterator groupIterator = new GroupIterator(groupIterators);
                        return StateIteratorUtil.createKeyIterator(groupIterator, AbstractKeyedMapStateImpl.this.keySerializer, AbstractKeyedMapStateImpl.this.stateNameByte.length);
                    }
                    StorageIterator<byte[], byte[]> iterator = AbstractKeyedMapStateImpl.this.stateStorage.iterator();
                    return StateIteratorUtil.createKeyIterator(iterator, AbstractKeyedMapStateImpl.this.keySerializer, AbstractKeyedMapStateImpl.this.serializedStateNameLength);
                }
                catch (Exception e) {
                    throw new StateAccessException(e);
                }
            }
        };
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<M> safeValueSerializer) throws Exception {
        Object result;
        Object key = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, (TypeSerializer)VoidNamespaceSerializer.INSTANCE).f0;
        MapSerializer mapSerializer = (MapSerializer)safeValueSerializer;
        TypeSerializer dupUserKeySerializer = mapSerializer.getKeySerializer();
        TypeSerializer dupUserValueSerializer = mapSerializer.getValueSerializer();
        if (this.stateStorage.lazySerde()) {
            result = this.get(key);
            if (result == null) {
                return null;
            }
        } else {
            ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
            DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper((OutputStream)baos);
            Iterator<Map.Entry<MK, MV>> iterator = this.getDeSerializedIterator(key, baos, (DataOutputView)view, safeKeySerializer, dupUserKeySerializer, dupUserValueSerializer);
            result = this.createMap();
            while (iterator.hasNext()) {
                Map.Entry<MK, MV> entry = iterator.next();
                if (entry.getKey() == null || entry.getValue() == null) continue;
                result.put(entry.getKey(), entry.getValue());
            }
            if (result.isEmpty()) {
                return null;
            }
        }
        return KvStateSerializer.serializeMap(result.entrySet(), (TypeSerializer)dupUserKeySerializer, (TypeSerializer)dupUserValueSerializer);
    }

    @Override
    public StateStorage<K, M> getStateStorage() {
        return this.stateStorage;
    }

    protected <K> int getKeyGroup(K key) {
        return PARTITIONER.partition(key, this.internalStateBackend.getNumGroups());
    }

    private void iteratorToMap(StorageIterator<byte[], byte[]> iterator, Map<K, M> result, int stateNameByteLength) throws IOException {
        while (iterator.hasNext()) {
            Pair pair = (Pair)iterator.next();
            K key = StateSerializerUtil.getDeserializedKeyForKeyedMapState((byte[])pair.getKey(), this.keySerializer, stateNameByteLength);
            MK mapKey = StateSerializerUtil.getDeserializedMapKeyForKeyedMapState((byte[])pair.getKey(), this.keySerializer, this.mapKeySerializer, stateNameByteLength);
            MV mapValue = StateSerializerUtil.getDeserializeSingleValue((byte[])pair.getValue(), this.mapValueSerializer);
            Map map = (Map)result.get(key);
            if (map == null) {
                map = this.createMap();
                result.put(key, map);
            }
            map.put(mapKey, mapValue);
        }
    }
}

