/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.NoSuchElementException;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDBOrderedSetStore<T>
implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    private static final byte[] DUMMY_BYTES = new byte[]{0};
    @Nonnull
    private final RocksDB db;
    @Nonnull
    private final ColumnFamilyHandle columnFamilyHandle;
    @Nonnull
    private final TypeSerializer<T> byteOrderProducingSerializer;
    @Nonnull
    private final RocksDBWriteBatchWrapper batchWrapper;
    @Nonnull
    private final byte[] groupPrefixBytes;
    @Nonnull
    private final ByteArrayOutputStreamWithPos outputStream;
    @Nonnull
    private final DataOutputViewStreamWrapper outputView;

    public RocksDBOrderedSetStore(@Nonnegative int keyGroupId, @Nonnull RocksDB db, @Nonnull ColumnFamilyHandle columnFamilyHandle, @Nonnull TypeSerializer<T> byteOrderProducingSerializer, @Nonnull ByteArrayOutputStreamWithPos outputStream, @Nonnull DataOutputViewStreamWrapper outputView, @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
        this.db = db;
        this.columnFamilyHandle = columnFamilyHandle;
        this.byteOrderProducingSerializer = byteOrderProducingSerializer;
        this.outputStream = outputStream;
        this.outputView = outputView;
        this.batchWrapper = batchWrapper;
        this.groupPrefixBytes = this.createKeyGroupBytes(keyGroupId);
    }

    private byte[] createKeyGroupBytes(int keyGroupId) {
        this.outputStream.reset();
        try {
            StateSerializerUtil.writeGroup((DataOutputView)this.outputView, (int)keyGroupId);
        }
        catch (IOException e2) {
            throw new FlinkRuntimeException("Could not write key-group bytes.", e2);
        }
        return this.outputStream.toByteArray();
    }

    public void add(@Nonnull T element) {
        byte[] elementBytes = this.serializeElement(element);
        try {
            this.batchWrapper.put(this.columnFamilyHandle, elementBytes, DUMMY_BYTES);
        }
        catch (RocksDBException e2) {
            throw new FlinkRuntimeException("Error while getting element from RocksDB.", e2);
        }
    }

    public void remove(@Nonnull T element) {
        byte[] elementBytes = this.serializeElement(element);
        try {
            this.batchWrapper.remove(this.columnFamilyHandle, elementBytes);
        }
        catch (RocksDBException e2) {
            throw new FlinkRuntimeException("Error while removing element from RocksDB.", e2);
        }
    }

    public int size() {
        int count = 0;
        try (RocksToJavaIteratorAdapter iterator = this.orderedIterator();){
            while (iterator.hasNext()) {
                iterator.next();
                ++count;
            }
        }
        return count;
    }

    @Nonnull
    public RocksToJavaIteratorAdapter orderedIterator() {
        this.flushWriteBatch();
        return new RocksToJavaIteratorAdapter(new RocksIteratorWrapper(this.db.newIterator(this.columnFamilyHandle)));
    }

    private void flushWriteBatch() {
        try {
            this.batchWrapper.flush();
        }
        catch (RocksDBException e2) {
            throw new FlinkRuntimeException(e2);
        }
    }

    private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
        for (int i = 0; i < prefixBytes.length; ++i) {
            if (bytes[i] == prefixBytes[i]) continue;
            return false;
        }
        return true;
    }

    private byte[] serializeElement(T element) {
        try {
            this.outputStream.reset();
            this.outputView.write(this.groupPrefixBytes);
            this.byteOrderProducingSerializer.serialize(element, this.outputView);
            return this.outputStream.toByteArray();
        }
        catch (IOException e2) {
            throw new FlinkRuntimeException("Error while serializing the element.", e2);
        }
    }

    private T deserializeElement(byte[] bytes) {
        try {
            ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
            DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
            inputView.skipBytes(this.groupPrefixBytes.length);
            return this.byteOrderProducingSerializer.deserialize(inputView);
        }
        catch (IOException e2) {
            throw new FlinkRuntimeException("Error while deserializing the element.", e2);
        }
    }

    private class RocksToJavaIteratorAdapter
    implements CloseableIterator<T> {
        @Nonnull
        private final RocksIteratorWrapper iterator;
        @Nullable
        private T currentElement;

        private RocksToJavaIteratorAdapter(RocksIteratorWrapper iterator) {
            this.iterator = iterator;
            try {
                iterator.seek(RocksDBOrderedSetStore.this.groupPrefixBytes);
                this.deserializeNextElementIfAvailable();
            }
            catch (Exception ex) {
                iterator.close();
                throw new FlinkRuntimeException("Could not initialize ordered iterator.", ex);
            }
        }

        @Override
        public void close() {
            this.iterator.close();
        }

        @Override
        public boolean hasNext() {
            return this.currentElement != null;
        }

        @Override
        public T next() {
            Object returnElement = this.currentElement;
            if (returnElement == null) {
                throw new NoSuchElementException("Iterator has no more elements!");
            }
            this.iterator.next();
            this.deserializeNextElementIfAvailable();
            return returnElement;
        }

        private void deserializeNextElementIfAvailable() {
            byte[] elementBytes;
            this.currentElement = this.iterator.isValid() ? (RocksDBOrderedSetStore.isPrefixWith(elementBytes = this.iterator.key(), RocksDBOrderedSetStore.this.groupPrefixBytes) ? RocksDBOrderedSetStore.this.deserializeElement(elementBytes) : null) : null;
        }
    }
}

