/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBMapState<K, N, UK, UV>
extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class);
    private final TypeSerializer<UK> userKeySerializer;
    private final TypeSerializer<UV> userValueSerializer;

    public RocksDBMapState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<Map<UK, UV>> valueSerializer, Map<UK, UV> defaultValue, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        Preconditions.checkState(valueSerializer instanceof MapSerializer, "Unexpected serializer type.");
        MapSerializer castedMapSerializer = (MapSerializer)valueSerializer;
        this.userKeySerializer = castedMapSerializer.getKeySerializer();
        this.userValueSerializer = castedMapSerializer.getValueSerializer();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return this.valueSerializer;
    }

    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes == null ? null : (UV)this.deserializeUserValue(rawValueBytes);
    }

    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.serializeUserValue(userValue);
        this.backend.db.put(this.columnFamily, this.writeOptions, rawKeyBytes, rawValueBytes);
    }

    public void putAll(Map<UK, UV> map2) throws IOException, RocksDBException {
        if (map2 == null) {
            return;
        }
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.backend.db, this.writeOptions);){
            for (Map.Entry<UK, UV> entry : map2.entrySet()) {
                byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
                byte[] rawValueBytes = this.serializeUserValue(entry.getValue());
                writeBatchWrapper.put(this.columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    public void remove(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        this.backend.db.delete(this.columnFamily, this.writeOptions, rawKeyBytes);
    }

    public boolean contains(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes = this.serializeUserKeyWithCurrentKeyAndNamespace(userKey);
        byte[] rawValueBytes = this.backend.db.get(this.columnFamily, rawKeyBytes);
        return rawValueBytes != null;
    }

    public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException {
        final Iterator<Map.Entry<UK, UV>> iterator = this.iterator();
        if (!iterator.hasNext()) {
            return null;
        }
        return new Iterable<Map.Entry<UK, UV>>(){

            @Override
            public Iterator<Map.Entry<UK, UV>> iterator() {
                return iterator;
            }
        };
    }

    public Iterable<UK> keys() throws IOException, RocksDBException {
        final byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return new Iterable<UK>(){

            @Override
            public Iterator<UK> iterator() {
                return new RocksDBMapIterator<UK>(RocksDBMapState.this.backend.db, prefixBytes, RocksDBMapState.this.userKeySerializer, RocksDBMapState.this.userValueSerializer){

                    @Override
                    public UK next() {
                        RocksDBMapEntry entry = this.nextEntry();
                        return entry == null ? null : (Object)entry.getKey();
                    }
                };
            }
        };
    }

    public Iterable<UV> values() throws IOException, RocksDBException {
        final byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return new Iterable<UV>(){

            @Override
            public Iterator<UV> iterator() {
                return new RocksDBMapIterator<UV>(RocksDBMapState.this.backend.db, prefixBytes, RocksDBMapState.this.userKeySerializer, RocksDBMapState.this.userValueSerializer){

                    @Override
                    public UV next() {
                        RocksDBMapEntry entry = this.nextEntry();
                        return entry == null ? null : (Object)entry.getValue();
                    }
                };
            }
        };
    }

    public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException {
        byte[] prefixBytes = this.serializeCurrentKeyAndNamespace();
        return new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, prefixBytes, (TypeSerializer)this.userKeySerializer, (TypeSerializer)this.userValueSerializer){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
    }

    @Override
    public void clear() {
        try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(this.backend.db, this.columnFamily);
             WriteBatch writeBatch = new WriteBatch(128);){
            byte[] keyBytes;
            byte[] keyPrefixBytes = this.serializeCurrentKeyAndNamespace();
            iterator.seek(keyPrefixBytes);
            while (iterator.isValid() && this.startWithKeyPrefix(keyPrefixBytes, keyBytes = iterator.key())) {
                writeBatch.remove(this.columnFamily, keyBytes);
                iterator.next();
            }
            this.backend.db.write(this.writeOptions, writeBatch);
        }
        catch (Exception e2) {
            LOG.warn("Error while cleaning the state.", (Throwable)e2);
        }
    }

    @Override
    public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<Map<UK, UV>> safeValueSerializer) throws Exception {
        Preconditions.checkNotNull(serializedKeyAndNamespace);
        Preconditions.checkNotNull(safeKeySerializer);
        Preconditions.checkNotNull(safeNamespaceSerializer);
        Preconditions.checkNotNull(safeValueSerializer);
        Tuple2 keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, (int)this.backend.getNumberOfKeyGroups());
        ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128);
        DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream);
        this.writeKeyWithGroupAndNamespace(keyGroup, keyAndNamespace.f0, safeKeySerializer, keyAndNamespace.f1, safeNamespaceSerializer, outputStream, outputView);
        byte[] keyPrefixBytes = outputStream.toByteArray();
        MapSerializer serializer = (MapSerializer)safeValueSerializer;
        TypeSerializer dupUserKeySerializer = serializer.getKeySerializer();
        TypeSerializer dupUserValueSerializer = serializer.getValueSerializer();
        final RocksDBMapIterator iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(this.backend.db, keyPrefixBytes, dupUserKeySerializer, dupUserValueSerializer){

            @Override
            public Map.Entry<UK, UV> next() {
                return this.nextEntry();
            }
        };
        if (!iterator.hasNext()) {
            return null;
        }
        return KvStateSerializer.serializeMap((Iterable)new Iterable<Map.Entry<UK, UV>>(){

            @Override
            public Iterator<Map.Entry<UK, UV>> iterator() {
                return iterator;
            }
        }, dupUserKeySerializer, dupUserValueSerializer);
    }

    private byte[] serializeCurrentKeyAndNamespace() throws IOException {
        this.writeCurrentKeyWithGroupAndNamespace();
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException {
        this.serializeCurrentKeyAndNamespace();
        this.userKeySerializer.serialize(userKey, this.keySerializationDataOutputView);
        return this.keySerializationStream.toByteArray();
    }

    private byte[] serializeUserValue(UV userValue) throws IOException {
        return this.serializeUserValue(userValue, this.userValueSerializer);
    }

    private UV deserializeUserValue(byte[] rawValueBytes) throws IOException {
        return this.deserializeUserValue(rawValueBytes, this.userValueSerializer);
    }

    private byte[] serializeUserValue(UV userValue, TypeSerializer<UV> valueSerializer) throws IOException {
        this.keySerializationStream.reset();
        if (userValue == null) {
            this.keySerializationDataOutputView.writeBoolean(true);
        } else {
            this.keySerializationDataOutputView.writeBoolean(false);
            valueSerializer.serialize(userValue, this.keySerializationDataOutputView);
        }
        return this.keySerializationStream.toByteArray();
    }

    private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException {
        ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes);
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
        in.skipBytes(userKeyOffset);
        return keySerializer.deserialize(in);
    }

    private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer<UV> valueSerializer) throws IOException {
        ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes);
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
        boolean isNull = in.readBoolean();
        return isNull ? null : (UV)valueSerializer.deserialize(in);
    }

    private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
        if (rawKeyBytes.length < keyPrefixBytes.length) {
            return false;
        }
        int i = keyPrefixBytes.length;
        while (--i >= this.backend.getKeyGroupPrefixBytes()) {
            if (rawKeyBytes[i] == keyPrefixBytes[i]) continue;
            return false;
        }
        return true;
    }

    private abstract class RocksDBMapIterator<T>
    implements Iterator<T> {
        private static final int CACHE_SIZE_LIMIT = 128;
        private final RocksDB db;
        private final byte[] keyPrefixBytes;
        private boolean expired = false;
        private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList();
        private int cacheIndex = 0;
        private final TypeSerializer<UK> keySerializer;
        private final TypeSerializer<UV> valueSerializer;

        RocksDBMapIterator(RocksDB db, byte[] keyPrefixBytes, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
            this.db = db;
            this.keyPrefixBytes = keyPrefixBytes;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
        }

        @Override
        public boolean hasNext() {
            this.loadCache();
            return this.cacheIndex < this.cacheEntries.size();
        }

        @Override
        public void remove() {
            if (this.cacheIndex == 0 || this.cacheIndex > this.cacheEntries.size()) {
                throw new IllegalStateException("The remove operation must be called after a valid next operation.");
            }
            RocksDBMapEntry lastEntry = this.cacheEntries.get(this.cacheIndex - 1);
            lastEntry.remove();
        }

        final RocksDBMapEntry nextEntry() {
            this.loadCache();
            if (this.cacheIndex == this.cacheEntries.size()) {
                if (!this.expired) {
                    throw new IllegalStateException();
                }
                return null;
            }
            RocksDBMapEntry entry = this.cacheEntries.get(this.cacheIndex);
            ++this.cacheIndex;
            return entry;
        }

        private void loadCache() {
            if (this.cacheIndex > this.cacheEntries.size()) {
                throw new IllegalStateException();
            }
            if (this.cacheIndex < this.cacheEntries.size() || this.expired) {
                return;
            }
            try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, RocksDBMapState.this.columnFamily);){
                RocksDBMapEntry lastEntry = this.cacheEntries.size() == 0 ? null : this.cacheEntries.get(this.cacheEntries.size() - 1);
                byte[] startBytes = lastEntry == null ? this.keyPrefixBytes : lastEntry.rawKeyBytes;
                this.cacheEntries.clear();
                this.cacheIndex = 0;
                iterator.seek(startBytes);
                if (lastEntry != null && !lastEntry.deleted) {
                    iterator.next();
                }
                while (true) {
                    if (!iterator.isValid() || !RocksDBMapState.this.startWithKeyPrefix(this.keyPrefixBytes, iterator.key())) {
                        this.expired = true;
                        break;
                    }
                    if (this.cacheEntries.size() >= 128) {
                        break;
                    }
                    RocksDBMapEntry entry = new RocksDBMapEntry(this.db, this.keyPrefixBytes.length, iterator.key(), iterator.value(), this.keySerializer, this.valueSerializer);
                    this.cacheEntries.add(entry);
                    iterator.next();
                }
            }
        }
    }

    private class RocksDBMapEntry
    implements Map.Entry<UK, UV> {
        private final RocksDB db;
        private final byte[] rawKeyBytes;
        private byte[] rawValueBytes;
        private boolean deleted;
        private UK userKey;
        private UV userValue;
        private final int userKeyOffset;
        private TypeSerializer<UK> keySerializer;
        private TypeSerializer<UV> valueSerializer;

        RocksDBMapEntry(@Nonnull RocksDB db, @Nonnull int userKeyOffset, @Nonnull byte[] rawKeyBytes, @Nonnull byte[] rawValueBytes, @Nonnull TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
            this.db = db;
            this.userKeyOffset = userKeyOffset;
            this.keySerializer = keySerializer;
            this.valueSerializer = valueSerializer;
            this.rawKeyBytes = rawKeyBytes;
            this.rawValueBytes = rawValueBytes;
            this.deleted = false;
        }

        public void remove() {
            this.deleted = true;
            this.rawValueBytes = null;
            try {
                this.db.delete(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes);
            }
            catch (RocksDBException e2) {
                throw new RuntimeException("Error while removing data from RocksDB.", e2);
            }
        }

        @Override
        public UK getKey() {
            if (this.userKey == null) {
                try {
                    this.userKey = RocksDBMapState.this.deserializeUserKey(this.userKeyOffset, this.rawKeyBytes, this.keySerializer);
                }
                catch (IOException e2) {
                    throw new RuntimeException("Error while deserializing the user key.", e2);
                }
            }
            return this.userKey;
        }

        @Override
        public UV getValue() {
            if (this.deleted) {
                return null;
            }
            if (this.userValue == null) {
                try {
                    this.userValue = RocksDBMapState.this.deserializeUserValue(this.rawValueBytes, this.valueSerializer);
                }
                catch (IOException e2) {
                    throw new RuntimeException("Error while deserializing the user value.", e2);
                }
            }
            return this.userValue;
        }

        @Override
        public UV setValue(UV value) {
            if (this.deleted) {
                throw new IllegalStateException("The value has already been deleted.");
            }
            Object oldValue = this.getValue();
            try {
                this.userValue = value;
                this.rawValueBytes = RocksDBMapState.this.serializeUserValue(value, this.valueSerializer);
                this.db.put(RocksDBMapState.this.columnFamily, RocksDBMapState.this.writeOptions, this.rawKeyBytes, this.rawValueBytes);
            }
            catch (IOException | RocksDBException e2) {
                throw new RuntimeException("Error while putting data into RocksDB.", e2);
            }
            return oldValue;
        }
    }
}

