/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.RocksDBFullSnapshotOperation;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.RocksDBIncrementalSnapshotOperation;
import org.apache.flink.contrib.streaming.state.RocksDBOrderedSetStore;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateStorage;
import org.apache.flink.contrib.streaming.state.RocksDBStorageInstance;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.io.async.StoppableCallbackCallable;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateSnapshot;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StorageIterator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TieBreakingPriorityComparator;
import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBInternalStateBackend
extends AbstractInternalStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBInternalStateBackend.class);
    private static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private final DBOptions dbOptions;
    private final ColumnFamilyOptions columnOptions;
    private ColumnFamilyHandle defaultColumnFamilyHandle;
    private final ColumnFamilyDescriptor defaultColumnFamilyDescriptor;
    private final String defaultColumnFamilyName = "default";
    final ResourceGuard rocksDBResourceGuard;
    private RocksDB db;
    private WriteOptions writeOptions;
    private final Map<String, Tuple2<ColumnFamilyHandle, ColumnFamilyDescriptor>> columnFamilyHandles;
    private File instanceBasePath;
    private File instanceRocksDBPath;
    private final boolean enableIncrementalCheckpointing;
    private final int restoringThreadNum;
    final SortedMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>> materializedSstFiles;
    long lastCompletedCheckpointId = -1L;
    final LocalRecoveryConfig localRecoveryConfig;
    private PriorityQueueSetFactory priorityQueueFactory;
    private RocksDBWriteBatchWrapper writeBatchWrapper;
    private final Map<String, RegisteredPriorityQueueStateBackendMetaInfo> pqInformations;
    private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;
    private final String operatorIdentifier;

    public RocksDBInternalStateBackend(ClassLoader userClassLoader, File instanceBasePath, DBOptions dbOptions, ColumnFamilyOptions columnOptions, int numberOfGroups, KeyGroupRange keyGroupRange, boolean enableIncrementalCheckpointing, int restoringThreadNum, LocalRecoveryConfig localRecoveryConfig, RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, TaskKvStateRegistry kvStateRegistry, ExecutionConfig executionConfig, String operatorIdentifier) throws IOException {
        super(numberOfGroups, keyGroupRange, userClassLoader, kvStateRegistry, executionConfig);
        this.dbOptions = Preconditions.checkNotNull(dbOptions);
        this.columnOptions = Preconditions.checkNotNull(columnOptions).setMergeOperatorName(MERGE_OPERATOR_NAME);
        this.defaultColumnFamilyDescriptor = new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions);
        this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
        this.instanceRocksDBPath = new File(instanceBasePath, "db");
        RocksDBInternalStateBackend.checkAndCreateDirectory(instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            this.cleanInstanceBasePath();
        }
        this.rocksDBResourceGuard = new ResourceGuard();
        this.columnFamilyHandles = new HashMap<String, Tuple2<ColumnFamilyHandle, ColumnFamilyDescriptor>>();
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        this.materializedSstFiles = new TreeMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>>();
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.restoringThreadNum = restoringThreadNum;
        this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : new FullSnapshotStrategy();
        this.localRecoveryConfig = localRecoveryConfig;
        switch (priorityQueueStateType) {
            case HEAP: {
                this.priorityQueueFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfGroups, 128);
                break;
            }
            case ROCKSDB: {
                this.priorityQueueFactory = new RocksDBPriorityQueueSetFactory();
                break;
            }
            default: {
                throw new FlinkRuntimeException("Not a valid priority queue set factory type.");
            }
        }
        this.pqInformations = new LinkedHashMap<String, RegisteredPriorityQueueStateBackendMetaInfo>();
        this.operatorIdentifier = operatorIdentifier;
    }

    protected void closeImpl() {
        this.rocksDBResourceGuard.close();
        if (this.db != null) {
            IOUtils.closeQuietly(this.writeBatchWrapper);
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamilyHandle);
            for (StateStorage stateStorage : this.getStateStorages().values()) {
                ((RocksDBStorageInstance)stateStorage.getStorageInstance()).close();
            }
            for (String pqName : this.pqInformations.keySet()) {
                IOUtils.closeQuietly((AutoCloseable)this.columnFamilyHandles.get((Object)pqName).f0);
            }
            IOUtils.closeQuietly((AutoCloseable)this.db);
            IOUtils.closeQuietly((AutoCloseable)this.writeOptions);
            IOUtils.closeQuietly((AutoCloseable)this.columnOptions);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            this.db = null;
            this.cleanInstanceBasePath();
        }
    }

    protected StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo descriptor) {
        StateStorage stateStorage = (StateStorage)this.stateStorages.get(descriptor.getName());
        if (stateStorage == null) {
            try {
                stateStorage = new RocksDBStateStorage(new RocksDBStorageInstance(this.db, this.getOrCreateColumnFamily(descriptor.getName()), this.writeOptions));
                this.stateStorages.put(descriptor.getName(), stateStorage);
            }
            catch (FlinkRuntimeException e2) {
                throw new StateAccessException((Throwable)e2);
            }
        }
        return stateStorage;
    }

    ColumnFamilyHandle getOrCreateColumnFamily(String handleName) {
        if (this.columnFamilyHandles.containsKey(handleName)) {
            return (ColumnFamilyHandle)this.columnFamilyHandles.get((Object)handleName).f0;
        }
        try {
            ColumnFamilyDescriptor descriptor = this.createColumnFamilyDescriptor(handleName);
            ColumnFamilyHandle handle = this.db.createColumnFamily(descriptor);
            this.columnFamilyHandles.put(handleName, Tuple2.of(handle, descriptor));
            return handle;
        }
        catch (RocksDBException e2) {
            throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e2);
        }
    }

    ColumnFamilyDescriptor createColumnFamilyDescriptor(String cfName) {
        byte[] nameBytes = cfName.getBytes(ConfigConstants.DEFAULT_CHARSET);
        Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!");
        return new ColumnFamilyDescriptor(nameBytes, this.columnOptions);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.priorityQueueFactory.create(stateName, byteOrderedElementSerializer);
    }

    public WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    public Map<String, RegisteredPriorityQueueStateBackendMetaInfo> getPriorityQueueInformations() {
        return this.pqInformations;
    }

    protected StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo stateMetaInfo) {
        StateStorage stateStorage = (StateStorage)this.stateStorages.get(stateMetaInfo.getName());
        if (stateStorage == null) {
            try {
                stateStorage = new RocksDBStateStorage(new RocksDBStorageInstance(this.db, this.getOrCreateColumnFamily(stateMetaInfo.getName()), this.writeOptions));
            }
            catch (FlinkRuntimeException e2) {
                throw new StateAccessException((Throwable)e2);
            }
            this.stateStorages.put(stateMetaInfo.getName(), stateStorage);
        }
        return stateStorage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) throws Exception {
        if (!this.enableIncrementalCheckpointing) {
            return;
        }
        SortedMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            if (completedCheckpointId < this.lastCompletedCheckpointId) {
                return;
            }
            this.materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
            this.lastCompletedCheckpointId = completedCheckpointId;
        }
    }

    public int numStateEntries() {
        int count = 0;
        List stateStorages = this.getKeyedStates().values().stream().map(KeyedState::getStateStorage).collect(Collectors.toList());
        stateStorages.addAll(this.getSubKeyedStates().values().stream().map(SubKeyedState::getStateStorage).collect(Collectors.toList()));
        for (StateStorage stateStorage : stateStorages) {
            try {
                StorageIterator iterator = stateStorage.iterator();
                Throwable throwable = null;
                try {
                    while (iterator.hasNext()) {
                        ++count;
                        iterator.next();
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (iterator == null) continue;
                    if (throwable != null) {
                        try {
                            iterator.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    iterator.close();
                }
            }
            catch (Exception e2) {
                throw new StateAccessException((Throwable)e2);
            }
        }
        return count;
    }

    private void cleanInstanceBasePath() {
        LOG.info("Deleting existing instance base directory {}.", (Object)this.instanceBasePath);
        try {
            FileUtils.deleteDirectory(this.instanceBasePath);
        }
        catch (IOException ex) {
            LOG.warn("Could not delete instance base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
        }
    }

    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        this.writeBatchWrapper.flush();
        return this.snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    public void restore(Collection<KeyedStateHandle> restoredSnapshots) throws Exception {
        LOG.info("Initializing RocksDB internal state backend.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from remote.", restoredSnapshots, (Object)this.restoringThreadNum);
        }
        this.restoredKvStateMetaInfos.clear();
        try {
            if (restoredSnapshots == null || restoredSnapshots.isEmpty()) {
                this.createDB();
                LOG.info("Successfully created RocksDB state backend at {}.", (Object)this.instanceRocksDBPath);
            } else {
                LOG.info("Restoring RocksDB internal state backend at {}.", (Object)this.instanceRocksDBPath);
                long startMillis = System.currentTimeMillis();
                KeyedStateHandle stateSnapshot = restoredSnapshots.iterator().next();
                if (stateSnapshot instanceof IncrementalKeyedStateSnapshot || stateSnapshot instanceof IncrementalLocalKeyedStateSnapshot) {
                    RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this, this.restoringThreadNum);
                    restoreOperation.restore(restoredSnapshots);
                } else {
                    RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
                    restoreOperation.restore(restoredSnapshots);
                }
                long endMillis = System.currentTimeMillis();
                LOG.info("Successfully restored RocksDB internal state-backend at {}, duration {} ms.", (Object)this.instanceRocksDBPath, (Object)(endMillis - startMillis));
            }
        }
        catch (Exception ex) {
            this.closeImpl();
            throw ex;
        }
    }

    File getInstanceBasePath() {
        if (this.instanceBasePath == null) {
            throw new IllegalStateException("RocksDBInternalStateBackend has not been initialized, it's illegal to get the instance base path.");
        }
        return this.instanceBasePath;
    }

    public File getInstanceRocksDBPath() {
        if (this.instanceRocksDBPath == null) {
            throw new IllegalStateException("RocksDBInternalStateBackend has not been initialized, it's illegal to get the instance DB path.");
        }
        return this.instanceRocksDBPath;
    }

    Path getLocalRestorePath(KeyGroupRange groupRange) {
        Preconditions.checkNotNull(this.instanceBasePath);
        String dirName = String.format("%s-%d-%d", "restore", groupRange.getStartKeyGroup(), groupRange.getEndKeyGroup());
        return new Path(this.instanceBasePath.getAbsolutePath(), dirName);
    }

    CloseableRegistry getCancelStreamRegistry() {
        return this.cancelStreamRegistry;
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public RocksDB getDbInstance() {
        if (this.db == null) {
            try {
                this.createDB();
            }
            catch (IOException e2) {
                throw new RocksDBInitException(e2.getMessage());
            }
        }
        return this.db;
    }

    void takeDbSnapshot(String localCheckpointPath) throws RocksDBException {
        Checkpoint checkpoint = Checkpoint.create((RocksDB)this.db);
        checkpoint.createCheckpoint(localCheckpointPath);
    }

    void createDB() throws IOException {
        List<ColumnFamilyDescriptor> columnFamilyDescriptors = Collections.singletonList(this.defaultColumnFamilyDescriptor);
        ArrayList stateColumnFamilyHandles = new ArrayList(1);
        try {
            this.db = RocksDB.open((DBOptions)this.dbOptions, (String)this.instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, stateColumnFamilyHandles);
            this.writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeOptions);
            this.defaultColumnFamilyHandle = (ColumnFamilyHandle)stateColumnFamilyHandles.remove(0);
        }
        catch (RocksDBException e2) {
            throw new IOException("Error while opening rocksDB instance at " + this.instanceRocksDBPath, e2);
        }
    }

    ColumnFamilyDescriptor getDefaultColumnFamilyDescriptor() {
        return this.defaultColumnFamilyDescriptor;
    }

    void createDBWithColumnFamily(List<ColumnFamilyDescriptor> descriptors, List<String> descriptorName) throws IOException {
        Preconditions.checkState(this.db == null, "Can not create db twice.");
        try {
            int size = descriptors.size();
            ArrayList handles = new ArrayList(size);
            this.db = RocksDB.open((DBOptions)this.dbOptions, (String)this.instanceRocksDBPath.getAbsolutePath(), descriptors, handles);
            this.writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeOptions);
            for (int i = 1; i < size; ++i) {
                this.columnFamilyHandles.put(descriptorName.get(i), Tuple2.of(handles.get(i), descriptors.get(i)));
            }
            this.defaultColumnFamilyHandle = (ColumnFamilyHandle)handles.remove(0);
        }
        catch (RocksDBException e2) {
            throw new IOException("Error while opening rocksDB instance at " + this.instanceBasePath, e2);
        }
    }

    Map<String, Tuple2<ColumnFamilyHandle, ColumnFamilyDescriptor>> getColumnFamilyHandles() {
        return this.columnFamilyHandles;
    }

    void registerAllStates() {
        for (RegisteredStateMetaInfo stateMetaInfo : this.registeredStateMetaInfos.values()) {
            if (stateMetaInfo.getStateType().isKeyedState()) {
                this.getOrCreateStateStorageForKeyedState(stateMetaInfo);
                continue;
            }
            this.getOrCreateStateStorageForSubKeyedState(stateMetaInfo);
        }
        for (RegisteredPriorityQueueStateBackendMetaInfo queueStateBackendMetaInfo : this.pqInformations.values()) {
            this.getOrCreateColumnFamily(queueStateBackendMetaInfo.getName());
        }
    }

    String getDefaultColumnFamilyName() {
        return "default";
    }

    public boolean requiresLegacySynchronousTimerSnapshots() {
        return this.priorityQueueFactory instanceof HeapPriorityQueueSetFactory;
    }

    class RocksDBPriorityQueueSetFactory
    implements PriorityQueueSetFactory {
        private static final int DEFAULT_CACHES_SIZE = 128;
        @Nonnull
        private final ByteArrayOutputStreamWithPos elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
        @Nonnull
        private final DataOutputViewStreamWrapper elementSerializationOutView = new DataOutputViewStreamWrapper(this.elementSerializationOutStream);

        RocksDBPriorityQueueSetFactory() {
        }

        @Nonnull
        public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, final @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
            PriorityComparator priorityComparator = PriorityComparator.forPriorityComparableObjects();
            RegisteredPriorityQueueStateBackendMetaInfo entry = (RegisteredPriorityQueueStateBackendMetaInfo)RocksDBInternalStateBackend.this.pqInformations.get(stateName);
            if (entry == null) {
                RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
                RocksDBInternalStateBackend.this.getOrCreateColumnFamily(stateName);
                RocksDBInternalStateBackend.this.pqInformations.put(stateName, metaInfo);
            }
            final ColumnFamilyHandle columnFamilyHandle = RocksDBInternalStateBackend.this.getOrCreateColumnFamily(stateName);
            final TieBreakingPriorityComparator tieBreakingComparator = new TieBreakingPriorityComparator(priorityComparator, byteOrderedElementSerializer, this.elementSerializationOutStream, this.elementSerializationOutView);
            return new KeyGroupPartitionedPriorityQueue(KeyExtractorFunction.forKeyedObjects(), priorityComparator, new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>(){

                @Nonnull
                public CachingInternalPriorityQueueSet<T> create(int keyGroupId, int numKeyGroups, @Nonnull PriorityComparator<T> elementPriorityComparator) {
                    TreeOrderedSetCache cache = new TreeOrderedSetCache((Comparator)tieBreakingComparator, 128);
                    RocksDBOrderedSetStore store = new RocksDBOrderedSetStore(keyGroupId, RocksDBInternalStateBackend.this.db, columnFamilyHandle, byteOrderedElementSerializer, RocksDBPriorityQueueSetFactory.this.elementSerializationOutStream, RocksDBPriorityQueueSetFactory.this.elementSerializationOutView, RocksDBInternalStateBackend.this.writeBatchWrapper);
                    return new CachingInternalPriorityQueueSet((CachingInternalPriorityQueueSet.OrderedSetCache)cache, store);
                }
            }, RocksDBInternalStateBackend.this.getKeyGroupRange(), RocksDBInternalStateBackend.this.getNumGroups());
        }
    }

    private class RocksDBInitException
    extends RuntimeException {
        private static final long serialVersionUID = 1L;

        RocksDBInitException(String message) {
            super("The rocksDB init failed with reported message: " + message);
        }
    }

    private class FullSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private FullSnapshotStrategy() {
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            long startTime = System.currentTimeMillis();
            if (RocksDBInternalStateBackend.this.registeredStateMetaInfos.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning empty snapshot.", (Object)timestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = RocksDBInternalStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() && !checkpointOptions.getCheckpointType().isSavepoint() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory, (LocalRecoveryDirectoryProvider)RocksDBInternalStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory);
            final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
            final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(RocksDBInternalStateBackend.this, checkpointId, supplier, snapshotCloseableRegistry);
            snapshotOperation.takeDBSnapShot();
            AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){

                protected void acquireResources() throws Exception {
                    RocksDBInternalStateBackend.this.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
                    snapshotOperation.openCheckpointStream();
                }

                protected void releaseResources() throws Exception {
                    this.closeLocalRegistry();
                    this.releaseSnapshotOperationResources();
                }

                private void releaseSnapshotOperationResources() {
                    snapshotOperation.releaseSnapshotResources();
                }

                protected void stopOperation() throws Exception {
                    this.closeLocalRegistry();
                }

                private void closeLocalRegistry() {
                    if (RocksDBInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
                        try {
                            snapshotCloseableRegistry.close();
                        }
                        catch (Exception ex) {
                            LOG.warn("Error closing local registry", (Throwable)ex);
                        }
                    }
                }

                @Nonnull
                public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long startTime = System.currentTimeMillis();
                    if (this.isStopped()) {
                        throw new IOException("RocksDB closed.");
                    }
                    snapshotOperation.writeDBSnapshot();
                    LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime});
                    return snapshotOperation.getKeyGroupStateSnapshot();
                }
            };
            LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", new Object[]{primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime});
            return AsyncStoppableTaskWithCallback.from((StoppableCallbackCallable)ioCallable);
        }
    }

    private class IncrementalSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;

        public IncrementalSnapshotStrategy() {
            this.savepointDelegate = new FullSnapshotStrategy();
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long checkpointTimestamp, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            SnapshotDirectory snapshotDirectory;
            if (checkpointOptions.getCheckpointType().isSavepoint()) {
                return this.savepointDelegate.performSnapshot(checkpointId, checkpointTimestamp, checkpointStreamFactory, checkpointOptions);
            }
            if (RocksDBInternalStateBackend.this.db == null) {
                throw new IOException("RocksDB closed.");
            }
            if (RocksDBInternalStateBackend.this.getKeyedStates().isEmpty() && RocksDBInternalStateBackend.this.getSubKeyedStates().isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning empty snapshot.", (Object)checkpointTimestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            if (RocksDBInternalStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled()) {
                LocalRecoveryDirectoryProvider directoryProvider = RocksDBInternalStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider();
                File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
                if (!directory.exists() && !directory.mkdirs()) {
                    throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory);
                }
                File rdbSnapshotDir = new File(directory, RocksDBInternalStateBackend.this.operatorIdentifier);
                if (rdbSnapshotDir.exists()) {
                    rdbSnapshotDir.delete();
                }
                Path path = new Path(rdbSnapshotDir.toURI());
                snapshotDirectory = SnapshotDirectory.permanent((Path)path);
            } else {
                Path path = new Path(RocksDBInternalStateBackend.this.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
                snapshotDirectory = SnapshotDirectory.temporary((Path)path);
            }
            final RocksDBIncrementalSnapshotOperation snapshotOperation = new RocksDBIncrementalSnapshotOperation(RocksDBInternalStateBackend.this, checkpointStreamFactory, snapshotDirectory, checkpointId);
            try {
                snapshotOperation.takeSnapshot();
            }
            catch (Exception e2) {
                snapshotOperation.stop();
                snapshotOperation.releaseResources(true);
                throw e2;
            }
            return new FutureTask<SnapshotResult<KeyedStateHandle>>(snapshotOperation::runSnapshot){

                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    snapshotOperation.stop();
                    return super.cancel(mayInterruptIfRunning);
                }

                @Override
                protected void done() {
                    snapshotOperation.releaseResources(this.isCancelled());
                }
            };
        }
    }
}

