/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

public class RocksDBListState<K, N, V>
extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
implements InternalListState<K, N, V> {
    private final TypeSerializer<V> elementSerializer;
    private static final byte DELIMITER = 44;

    public RocksDBListState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<List<V>> valueSerializer, List<V> defaultValue, TypeSerializer<V> elementSerializer, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.elementSerializer = elementSerializer;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<List<V>> getValueSerializer() {
        return this.valueSerializer;
    }

    public Iterable<V> get() {
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            if (valueBytes == null) {
                return null;
            }
            ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
            ArrayList<V> result = new ArrayList<V>();
            while (in.available() > 0) {
                result.add(this.elementSerializer.deserialize(in));
                if (in.available() <= 0) continue;
                in.readByte();
            }
            return result;
        }
        catch (IOException | RocksDBException e2) {
            throw new RuntimeException("Error while retrieving data from RocksDB", e2);
        }
    }

    public void add(V value) throws IOException {
        Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            this.keySerializationStream.reset();
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(this.keySerializationStream);
            this.elementSerializer.serialize(value, out);
            this.backend.db.merge(this.columnFamily, this.writeOptions, key, this.keySerializationStream.toByteArray());
        }
        catch (Exception e2) {
            throw new RuntimeException("Error while adding data to RocksDB", e2);
        }
    }

    public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        Object key = this.backend.getCurrentKey();
        int keyGroup = this.backend.getCurrentKeyGroupIndex();
        try {
            this.writeKeyWithGroupAndNamespace(keyGroup, key, target, this.keySerializationStream, this.keySerializationDataOutputView);
            byte[] targetKey = this.keySerializationStream.toByteArray();
            for (N source : sources) {
                if (source == null) continue;
                this.writeKeyWithGroupAndNamespace(keyGroup, key, source, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] sourceKey = this.keySerializationStream.toByteArray();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                this.backend.db.delete(this.columnFamily, sourceKey);
                if (valueBytes == null) continue;
                this.backend.db.merge(this.columnFamily, this.writeOptions, targetKey, valueBytes);
            }
        }
        catch (Exception e2) {
            throw new Exception("Error while merging state in RocksDB", e2);
        }
    }

    public void update(List<V> values) throws Exception {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");
        this.clear();
        if (!values.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.keySerializationStream.toByteArray();
                byte[] premerge = this.getPreMergedValue(values);
                if (premerge == null) {
                    throw new IOException("Failed pre-merge values in update()");
                }
                this.backend.db.put(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e2) {
                throw new RuntimeException("Error while updating data to RocksDB", e2);
            }
        }
    }

    public void addAll(List<V> values) throws Exception {
        Preconditions.checkNotNull(values, "List of values to add cannot be null.");
        if (!values.isEmpty()) {
            try {
                this.writeCurrentKeyWithGroupAndNamespace();
                byte[] key = this.keySerializationStream.toByteArray();
                byte[] premerge = this.getPreMergedValue(values);
                if (premerge == null) {
                    throw new IOException("Failed pre-merge values in addAll()");
                }
                this.backend.db.merge(this.columnFamily, this.writeOptions, key, premerge);
            }
            catch (IOException | RocksDBException e2) {
                throw new RuntimeException("Error while updating data to RocksDB", e2);
            }
        }
    }

    private byte[] getPreMergedValue(List<V> values) throws IOException {
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(this.keySerializationStream);
        this.keySerializationStream.reset();
        boolean first = true;
        for (V value : values) {
            Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
            if (first) {
                first = false;
            } else {
                this.keySerializationStream.write(44);
            }
            this.elementSerializer.serialize(value, out);
        }
        return this.keySerializationStream.toByteArray();
    }
}

