/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.AbstractRocksDBState;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

public class RocksDBAggregatingState<K, N, T, ACC, R>
extends AbstractRocksDBState<K, N, ACC, AggregatingState<T, R>>
implements InternalAggregatingState<K, N, T, ACC, R> {
    private final AggregateFunction<T, ACC, R> aggFunction;

    public RocksDBAggregatingState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<ACC> valueSerializer, ACC defaultValue, AggregateFunction<T, ACC, R> aggFunction, RocksDBKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        this.aggFunction = aggFunction;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<ACC> getValueSerializer() {
        return this.valueSerializer;
    }

    public R get() throws IOException {
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            if (valueBytes == null) {
                return null;
            }
            Object accumulator = this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
            return this.aggFunction.getResult(accumulator);
        }
        catch (IOException | RocksDBException e2) {
            throw new IOException("Error while retrieving value from RocksDB", e2);
        }
    }

    public void add(T value) throws IOException {
        try {
            this.writeCurrentKeyWithGroupAndNamespace();
            byte[] key = this.keySerializationStream.toByteArray();
            this.keySerializationStream.reset();
            byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
            ACC accumulator = valueBytes == null ? this.aggFunction.createAccumulator() : this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
            accumulator = this.aggFunction.add(value, accumulator);
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(this.keySerializationStream);
            this.valueSerializer.serialize(accumulator, out);
            this.backend.db.put(this.columnFamily, this.writeOptions, key, this.keySerializationStream.toByteArray());
        }
        catch (IOException | RocksDBException e2) {
            throw new IOException("Error while adding value to RocksDB", e2);
        }
    }

    public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        Object key = this.backend.getCurrentKey();
        int keyGroup = this.backend.getCurrentKeyGroupIndex();
        try {
            Object current = null;
            for (N source : sources) {
                if (source == null) continue;
                this.writeKeyWithGroupAndNamespace(keyGroup, key, source, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] sourceKey = this.keySerializationStream.toByteArray();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                this.backend.db.delete(this.columnFamily, sourceKey);
                if (valueBytes == null) continue;
                Object value = this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
                if (current != null) {
                    current = this.aggFunction.merge(current, value);
                    continue;
                }
                current = value;
            }
            if (current != null) {
                this.writeKeyWithGroupAndNamespace(keyGroup, key, target, this.keySerializationStream, this.keySerializationDataOutputView);
                byte[] targetKey = this.keySerializationStream.toByteArray();
                byte[] targetValueBytes = this.backend.db.get(this.columnFamily, targetKey);
                if (targetValueBytes != null) {
                    Object value = this.valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
                    current = this.aggFunction.merge(current, value);
                }
                this.keySerializationStream.reset();
                this.valueSerializer.serialize(current, this.keySerializationDataOutputView);
                this.backend.db.put(this.columnFamily, this.writeOptions, targetKey, this.keySerializationStream.toByteArray());
            }
        }
        catch (Exception e2) {
            throw new Exception("Error while merging state in RocksDB", e2);
        }
    }
}

