/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.RocksDBAggregatingState;
import org.apache.flink.contrib.streaming.state.RocksDBFoldingState;
import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
import org.apache.flink.contrib.streaming.state.RocksDBListState;
import org.apache.flink.contrib.streaming.state.RocksDBMapState;
import org.apache.flink.contrib.streaming.state.RocksDBReducingState;
import org.apache.flink.contrib.streaming.state.RocksDBValueState;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.io.async.StoppableCallbackCallable;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class RocksDBKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
    public static final String MERGE_OPERATOR_NAME = "stringappendtest";
    private static final String SST_FILE_SUFFIX = ".sst";
    private final String operatorIdentifier;
    private final ColumnFamilyOptions columnOptions;
    private final DBOptions dbOptions;
    private final File instanceBasePath;
    private final File instanceRocksDBPath;
    private final ResourceGuard rocksDBResourceGuard;
    protected RocksDB db;
    private ColumnFamilyHandle defaultColumnFamily;
    private final WriteOptions writeOptions;
    private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
    private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
    private final int keyGroupPrefixBytes;
    private final boolean enableIncrementalCheckpointing;
    private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
    private long lastCompletedCheckpointId = -1L;
    private UUID backendUID;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> snapshotStrategy;

    public RocksDBKeyedStateBackend(String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
        this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        this.rocksDBResourceGuard = new ResourceGuard();
        this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions).setMergeOperatorName(MERGE_OPERATOR_NAME);
        this.dbOptions = Preconditions.checkNotNull(dbOptions);
        this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
        this.instanceRocksDBPath = new File(instanceBasePath, "db");
        RocksDBKeyedStateBackend.checkAndCreateDirectory(instanceBasePath);
        if (this.instanceRocksDBPath.exists()) {
            this.cleanInstanceBasePath();
        }
        this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig);
        this.keyGroupPrefixBytes = this.getNumberOfKeyGroups() > 128 ? 2 : 1;
        this.kvStateInformation = new LinkedHashMap();
        this.restoredKvStateMetaInfos = new HashMap();
        this.materializedSstFiles = new TreeMap<Long, Set<StateHandleID>>();
        this.backendUID = UUID.randomUUID();
        this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : new FullSnapshotStrategy();
        this.writeOptions = new WriteOptions().setDisableWAL(true);
        LOG.debug("Setting initial keyed backend uid for operator {} to {}.", (Object)this.operatorIdentifier, (Object)this.backendUID);
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create RocksDB data directory at %s.", directory));
        }
    }

    public <N> Stream<K> getKeys(String state, N namespace) {
        byte[] nameSpaceBytes;
        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = this.kvStateInformation.get(state);
        if (columnInfo == null) {
            return Stream.empty();
        }
        TypeSerializer namespaceSerializer = ((RegisteredKeyedBackendStateMetaInfo)columnInfo.f1).getNamespaceSerializer();
        ByteArrayOutputStreamWithPos namespaceOutputStream = new ByteArrayOutputStreamWithPos(8);
        boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(this.keySerializer, namespaceSerializer);
        try {
            RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, namespaceOutputStream, new DataOutputViewStreamWrapper(namespaceOutputStream), ambiguousKeyPossible);
            nameSpaceBytes = namespaceOutputStream.toByteArray();
        }
        catch (IOException ex) {
            throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
        }
        RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)columnInfo.f0);
        iterator.seekToFirst();
        RocksIteratorForKeysWrapper iteratorWrapper = new RocksIteratorForKeysWrapper(iterator, state, this.keySerializer, this.keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes);
        Stream targetStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iteratorWrapper, 16), false);
        return (Stream)targetStream.onClose(iteratorWrapper::close);
    }

    @VisibleForTesting
    ColumnFamilyHandle getColumnFamilyHandle(String state) {
        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnInfo = this.kvStateInformation.get(state);
        return columnInfo != null ? (ColumnFamilyHandle)columnInfo.f0 : null;
    }

    public void dispose() {
        super.dispose();
        this.rocksDBResourceGuard.close();
        if (this.db != null) {
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
            for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData : this.kvStateInformation.values()) {
                IOUtils.closeQuietly((AutoCloseable)columnMetaData.f0);
            }
            IOUtils.closeQuietly((AutoCloseable)this.db);
            this.db = null;
            IOUtils.closeQuietly((AutoCloseable)this.columnOptions);
            IOUtils.closeQuietly((AutoCloseable)this.dbOptions);
            IOUtils.closeQuietly((AutoCloseable)this.writeOptions);
            this.kvStateInformation.clear();
            this.restoredKvStateMetaInfos.clear();
            this.cleanInstanceBasePath();
        }
    }

    private void cleanInstanceBasePath() {
        LOG.info("Deleting existing instance base directory {}.", (Object)this.instanceBasePath);
        try {
            FileUtils.deleteDirectory(this.instanceBasePath);
        }
        catch (IOException ex) {
            LOG.warn("Could not delete instance base path for RocksDB: " + this.instanceBasePath, (Throwable)ex);
        }
    }

    public int getKeyGroupPrefixBytes() {
        return this.keyGroupPrefixBytes;
    }

    public WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        return this.snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
        LOG.info("Initializing RocksDB keyed state backend.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
        }
        this.kvStateInformation.clear();
        this.restoredKvStateMetaInfos.clear();
        try {
            if (restoreState == null || restoreState.isEmpty()) {
                this.createDB();
            } else {
                KeyedStateHandle firstStateHandle = restoreState.iterator().next();
                if (firstStateHandle instanceof IncrementalKeyedStateHandle || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
                    restoreOperation.restore(restoreState);
                } else {
                    RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
                    restoreOperation.doRestore(restoreState);
                }
            }
        }
        catch (Exception ex) {
            this.dispose();
            throw ex;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long completedCheckpointId) {
        if (!this.enableIncrementalCheckpointing) {
            return;
        }
        SortedMap<Long, Set<StateHandleID>> sortedMap = this.materializedSstFiles;
        synchronized (sortedMap) {
            if (completedCheckpointId < this.lastCompletedCheckpointId) {
                return;
            }
            this.materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
            this.lastCompletedCheckpointId = completedCheckpointId;
        }
    }

    private void createDB() throws IOException {
        ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(1);
        this.db = this.openDB(this.instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
        this.defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
    }

    private RocksDB openDB(String path, List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
        RocksDB dbRef;
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(1 + stateColumnFamilyDescriptors.size());
        columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, this.columnOptions));
        columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
        try {
            dbRef = RocksDB.open((DBOptions)Preconditions.checkNotNull(this.dbOptions), (String)Preconditions.checkNotNull(path), columnFamilyDescriptors, stateColumnFamilyHandles);
        }
        catch (RocksDBException e2) {
            throw new IOException("Error while opening RocksDB instance.", e2);
        }
        Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created");
        return dbRef;
    }

    private <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tryRegisterKvStateInformation(StateDescriptor<?, S> stateDesc, TypeSerializer<N> namespaceSerializer) throws StateMigrationException, IOException {
        RegisteredKeyedBackendStateMetaInfo newMetaInfo;
        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = this.kvStateInformation.get(stateDesc.getName());
        if (stateInfo != null) {
            RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(stateDesc.getName());
            Preconditions.checkState(restoredMetaInfoSnapshot != null, "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            newMetaInfo = RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(restoredMetaInfoSnapshot, namespaceSerializer, stateDesc);
            stateInfo.f1 = newMetaInfo;
        } else {
            String stateName = stateDesc.getName();
            newMetaInfo = new RegisteredKeyedBackendStateMetaInfo(stateDesc.getType(), stateName, namespaceSerializer, stateDesc.getSerializer());
            ColumnFamilyHandle columnFamily = this.createColumnFamily(stateName);
            stateInfo = Tuple2.of(columnFamily, newMetaInfo);
            this.kvStateInformation.put(stateDesc.getName(), stateInfo);
        }
        return Tuple2.of(stateInfo.f0, newMetaInfo);
    }

    private ColumnFamilyHandle createColumnFamily(String stateName) throws IOException {
        byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
        Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), "The chosen state name 'default' collides with the name of the default column family!");
        ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, this.columnOptions);
        try {
            return this.db.createColumnFamily(columnDescriptor);
        }
        catch (RocksDBException e2) {
            throw new IOException("Error creating ColumnFamilyHandle.", e2);
        }
    }

    protected <N, T> InternalValueState<K, N, T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
        Tuple2 registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBValueState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), this);
    }

    protected <N, T> InternalListState<K, N, T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        Tuple2 registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBListState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), (List)stateDesc.getDefaultValue(), stateDesc.getElementSerializer(), this);
    }

    protected <N, T> InternalReducingState<K, N, T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        Tuple2 registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBReducingState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), stateDesc.getReduceFunction(), this);
    }

    protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(TypeSerializer<N> namespaceSerializer, AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
        Tuple2 registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBAggregatingState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), stateDesc.getAggregateFunction(), this);
    }

    protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBFoldingState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), stateDesc.getDefaultValue(), stateDesc.getFoldFunction(), this);
    }

    protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, MapStateDescriptor<UK, UV> stateDesc) throws Exception {
        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, UV>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        return new RocksDBMapState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyedBackendStateMetaInfo)registerResult.f1).getStateSerializer(), (Map)stateDesc.getDefaultValue(), this);
    }

    public File getInstanceBasePath() {
        return this.instanceBasePath;
    }

    public boolean supportsAsynchronousSnapshots() {
        return true;
    }

    @VisibleForTesting
    public int numStateEntries() {
        int count = 0;
        for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : this.kvStateInformation.values()) {
            RocksIteratorWrapper rocksIterator = RocksDBKeyedStateBackend.getRocksIterator(this.db, (ColumnFamilyHandle)column.f0);
            Throwable throwable = null;
            try {
                rocksIterator.seekToFirst();
                while (rocksIterator.isValid()) {
                    ++count;
                    rocksIterator.next();
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (rocksIterator == null) continue;
                if (throwable != null) {
                    try {
                        rocksIterator.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                rocksIterator.close();
            }
        }
        return count;
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
        return new RocksIteratorWrapper(db.newIterator());
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
        return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
    }

    public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
        return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
    }

    private static final class RocksDBIncrementalSnapshotOperation<K> {
        private final RocksDBKeyedStateBackend<K> stateBackend;
        private final CheckpointStreamFactory checkpointStreamFactory;
        private final long checkpointId;
        private Set<StateHandleID> baseSstFiles;
        private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList();
        private SnapshotDirectory localBackupDirectory;
        private final CloseableRegistry closeableRegistry = new CloseableRegistry();
        private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<StateHandleID, StreamStateHandle>();
        private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<StateHandleID, StreamStateHandle>();
        private final ResourceGuard.Lease dbLease;
        private SnapshotResult<StreamStateHandle> metaStateHandle = null;

        private RocksDBIncrementalSnapshotOperation(RocksDBKeyedStateBackend<K> stateBackend, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory localBackupDirectory, long checkpointId) throws IOException {
            this.stateBackend = stateBackend;
            this.checkpointStreamFactory = checkpointStreamFactory;
            this.checkpointId = checkpointId;
            this.dbLease = ((RocksDBKeyedStateBackend)this.stateBackend).rocksDBResourceGuard.acquireResource();
            this.localBackupDirectory = localBackupDirectory;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private StreamStateHandle materializeStateData(Path filePath) throws Exception {
            InputStream inputStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
            try {
                int numBytes;
                byte[] buffer = new byte[8192];
                FileSystem backupFileSystem = this.localBackupDirectory.getFileSystem();
                inputStream = backupFileSystem.open(filePath);
                this.closeableRegistry.registerCloseable(inputStream);
                outputStream = this.checkpointStreamFactory.createCheckpointStateOutputStream(this.checkpointId, CheckpointedStateScope.SHARED);
                this.closeableRegistry.registerCloseable(outputStream);
                while ((numBytes = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, numBytes);
                }
                StreamStateHandle result = null;
                if (this.closeableRegistry.unregisterCloseable(outputStream)) {
                    result = outputStream.closeAndGetHandle();
                    outputStream = null;
                }
                StreamStateHandle streamStateHandle = result;
                return streamStateHandle;
            }
            finally {
                if (this.closeableRegistry.unregisterCloseable(inputStream)) {
                    inputStream.close();
                }
                if (this.closeableRegistry.unregisterCloseable(outputStream)) {
                    outputStream.close();
                }
            }
        }

        @Nonnull
        private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
            LocalRecoveryConfig localRecoveryConfig = ((RocksDBKeyedStateBackend)this.stateBackend).localRecoveryConfig;
            CheckpointStreamWithResultProvider streamWithResultProvider = localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory, (LocalRecoveryDirectoryProvider)localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory);
            try {
                this.closeableRegistry.registerCloseable(streamWithResultProvider);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.stateBackend).keySerializer, this.stateMetaInfoSnapshots, false);
                DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)streamWithResultProvider.getCheckpointOutputStream());
                serializationProxy.write((DataOutputView)out);
                if (this.closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
                    SnapshotResult result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                    streamWithResultProvider = null;
                    SnapshotResult snapshotResult = result;
                    return snapshotResult;
                }
                throw new IOException("Stream already closed and cannot return a handle.");
            }
            finally {
                if (streamWithResultProvider != null && this.closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
                    IOUtils.closeQuietly((AutoCloseable)streamWithResultProvider);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void takeSnapshot() throws Exception {
            long lastCompletedCheckpoint;
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                lastCompletedCheckpoint = ((RocksDBKeyedStateBackend)this.stateBackend).lastCompletedCheckpointId;
                this.baseSstFiles = (Set)((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.get(lastCompletedCheckpoint);
            }
            LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) files as base: {}.", new Object[]{this.checkpointId, lastCompletedCheckpoint, this.baseSstFiles});
            for (Map.Entry entry : ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.entrySet()) {
                this.stateMetaInfoSnapshots.add(((RegisteredKeyedBackendStateMetaInfo)((Tuple2)entry.getValue()).f1).snapshot());
            }
            LOG.trace("Local RocksDB checkpoint goes to backup path {}.", (Object)this.localBackupDirectory);
            if (this.localBackupDirectory.exists()) {
                throw new IllegalStateException("Unexpected existence of the backup directory.");
            }
            Throwable throwable = null;
            try (Checkpoint checkpoint = Checkpoint.create((RocksDB)this.stateBackend.db);){
                checkpoint.createCheckpoint(this.localBackupDirectory.getDirectory().getPath());
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                throw throwable2;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nonnull
        SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable(this.closeableRegistry);
            this.metaStateHandle = this.materializeMetaData();
            Preconditions.checkNotNull(this.metaStateHandle, "Metadata was not properly created.");
            Preconditions.checkNotNull(this.metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
            Preconditions.checkState(this.localBackupDirectory.exists());
            FileStatus[] fileStatuses = this.localBackupDirectory.listStatus();
            if (fileStatuses != null) {
                for (FileStatus fileStatus : fileStatuses) {
                    Path filePath = fileStatus.getPath();
                    String fileName = filePath.getName();
                    StateHandleID stateHandleID = new StateHandleID(fileName);
                    if (fileName.endsWith(RocksDBKeyedStateBackend.SST_FILE_SUFFIX)) {
                        boolean existsAlready;
                        boolean bl = existsAlready = this.baseSstFiles != null && this.baseSstFiles.contains(stateHandleID);
                        if (existsAlready) {
                            this.sstFiles.put(stateHandleID, (StreamStateHandle)new PlaceholderStreamStateHandle());
                            continue;
                        }
                        this.sstFiles.put(stateHandleID, this.materializeStateData(filePath));
                        continue;
                    }
                    StreamStateHandle fileHandle = this.materializeStateData(filePath);
                    this.miscFiles.put(stateHandleID, fileHandle);
                }
            }
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.put(this.checkpointId, this.sstFiles.keySet());
            }
            IncrementalKeyedStateHandle incrementalKeyedStateHandle = new IncrementalKeyedStateHandle(((RocksDBKeyedStateBackend)this.stateBackend).backendUID, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange, this.checkpointId, this.sstFiles, this.miscFiles, (StreamStateHandle)this.metaStateHandle.getJobManagerOwnedSnapshot());
            StreamStateHandle taskLocalSnapshotMetaDataStateHandle = (StreamStateHandle)this.metaStateHandle.getTaskLocalSnapshot();
            DirectoryStateHandle directoryStateHandle = null;
            try {
                directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
            }
            catch (IOException ex) {
                Exception collector = ex;
                try {
                    taskLocalSnapshotMetaDataStateHandle.discardState();
                }
                catch (Exception discardEx) {
                    collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
                }
                LOG.warn("Problem with local state snapshot.", (Throwable)collector);
            }
            if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
                IncrementalLocalKeyedStateHandle localDirKeyedStateHandle = new IncrementalLocalKeyedStateHandle(((RocksDBKeyedStateBackend)this.stateBackend).backendUID, this.checkpointId, directoryStateHandle, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange, taskLocalSnapshotMetaDataStateHandle, this.sstFiles.keySet());
                return SnapshotResult.withLocalState((StateObject)incrementalKeyedStateHandle, (StateObject)localDirKeyedStateHandle);
            }
            return SnapshotResult.of((StateObject)incrementalKeyedStateHandle);
        }

        void stop() {
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(this.closeableRegistry)) {
                try {
                    this.closeableRegistry.close();
                }
                catch (IOException e2) {
                    LOG.warn("Could not properly close io streams.", (Throwable)e2);
                }
            }
        }

        void releaseResources(boolean canceled) {
            this.dbLease.close();
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(this.closeableRegistry)) {
                try {
                    this.closeableRegistry.close();
                }
                catch (IOException e2) {
                    LOG.warn("Exception on closing registry.", (Throwable)e2);
                }
            }
            try {
                if (this.localBackupDirectory.exists()) {
                    LOG.trace("Running cleanup for local RocksDB backup directory {}.", (Object)this.localBackupDirectory);
                    boolean cleanupOk = this.localBackupDirectory.cleanup();
                    if (!cleanupOk) {
                        LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                    }
                }
            }
            catch (IOException e3) {
                LOG.warn("Could not properly cleanup local RocksDB backup directory.", (Throwable)e3);
            }
            if (canceled) {
                ArrayList<Object> statesToDiscard = new ArrayList<Object>(1 + this.miscFiles.size() + this.sstFiles.size());
                statesToDiscard.add(this.metaStateHandle);
                statesToDiscard.addAll(this.miscFiles.values());
                statesToDiscard.addAll(this.sstFiles.values());
                try {
                    StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
                }
                catch (Exception e4) {
                    LOG.warn("Could not properly discard states.", (Throwable)e4);
                }
                if (this.localBackupDirectory.isSnapshotCompleted()) {
                    try {
                        DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                        if (directoryStateHandle != null) {
                            directoryStateHandle.discardState();
                        }
                    }
                    catch (Exception e5) {
                        LOG.warn("Could not properly discard local state.", (Throwable)e5);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    static class RocksDBFullSnapshotOperation<K>
    extends AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> {
        static final int FIRST_BIT_IN_BYTE_MASK = 128;
        static final int END_OF_KEY_GROUP_MARK = 65535;
        private final RocksDBKeyedStateBackend<K> stateBackend;
        private final KeyGroupRangeOffsets keyGroupRangeOffsets;
        private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
        private final CloseableRegistry snapshotCloseableRegistry;
        private final ResourceGuard.Lease dbLease;
        private Snapshot snapshot;
        private ReadOptions readOptions;
        private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
        private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
        private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators;
        private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
        private DataOutputView outputView;

        RocksDBFullSnapshotOperation(RocksDBKeyedStateBackend<K> stateBackend, SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, CloseableRegistry registry) throws IOException {
            this.stateBackend = stateBackend;
            this.checkpointStreamSupplier = checkpointStreamSupplier;
            this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(((RocksDBKeyedStateBackend)stateBackend).keyGroupRange);
            this.snapshotCloseableRegistry = registry;
            this.dbLease = ((RocksDBKeyedStateBackend)this.stateBackend).rocksDBResourceGuard.acquireResource();
        }

        public void takeDBSnapShot() {
            Preconditions.checkArgument(this.snapshot == null, "Only one ongoing snapshot allowed!");
            this.stateMetaInfoSnapshots = new ArrayList(((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.size());
            this.copiedColumnFamilyHandles = new ArrayList<ColumnFamilyHandle>(((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.size());
            for (Tuple2 tuple2 : ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.values()) {
                this.stateMetaInfoSnapshots.add(((RegisteredKeyedBackendStateMetaInfo)tuple2.f1).snapshot());
                this.copiedColumnFamilyHandles.add((ColumnFamilyHandle)tuple2.f0);
            }
            this.snapshot = this.stateBackend.db.getSnapshot();
        }

        public void openCheckpointStream() throws Exception {
            Preconditions.checkArgument(this.checkpointStreamWithResultProvider == null, "Output stream for snapshot is already set.");
            this.checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
            this.snapshotCloseableRegistry.registerCloseable(this.checkpointStreamWithResultProvider);
            this.outputView = new DataOutputViewStreamWrapper((OutputStream)this.checkpointStreamWithResultProvider.getCheckpointOutputStream());
        }

        public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException {
            if (null == this.snapshot) {
                throw new IOException("No snapshot available. Might be released due to cancellation.");
            }
            Preconditions.checkNotNull(this.checkpointStreamWithResultProvider, "No output stream to write snapshot.");
            this.writeKVStateMetaData();
            this.writeKVStateData();
        }

        @Nonnull
        public SnapshotResult<KeyedStateHandle> getSnapshotResultStateHandle() throws IOException {
            if (this.snapshotCloseableRegistry.unregisterCloseable(this.checkpointStreamWithResultProvider)) {
                SnapshotResult res = this.checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                this.checkpointStreamWithResultProvider = null;
                return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult((SnapshotResult)res, (KeyGroupRangeOffsets)this.keyGroupRangeOffsets);
            }
            return SnapshotResult.empty();
        }

        public void releaseSnapshotResources() {
            this.checkpointStreamWithResultProvider = null;
            if (null != this.kvStateIterators) {
                for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : this.kvStateIterators) {
                    IOUtils.closeQuietly((AutoCloseable)kvStateIterator.f0);
                }
                this.kvStateIterators = null;
            }
            if (null != this.snapshot) {
                if (null != this.stateBackend.db) {
                    this.stateBackend.db.releaseSnapshot(this.snapshot);
                }
                IOUtils.closeQuietly((AutoCloseable)this.snapshot);
                this.snapshot = null;
            }
            if (null != this.readOptions) {
                IOUtils.closeQuietly((AutoCloseable)this.readOptions);
                this.readOptions = null;
            }
            this.dbLease.close();
        }

        private void writeKVStateMetaData() throws IOException {
            this.kvStateIterators = new ArrayList<Tuple2<RocksIteratorWrapper, Integer>>(this.copiedColumnFamilyHandles.size());
            int kvStateId = 0;
            this.readOptions = new ReadOptions();
            this.readOptions.setSnapshot(this.snapshot);
            for (ColumnFamilyHandle columnFamilyHandle : this.copiedColumnFamilyHandles) {
                this.kvStateIterators.add(new Tuple2<RocksIteratorWrapper, Integer>(RocksDBKeyedStateBackend.getRocksIterator(this.stateBackend.db, columnFamilyHandle, this.readOptions), kvStateId));
                ++kvStateId;
            }
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.stateBackend.getKeySerializer(), this.stateMetaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator));
            serializationProxy.write(this.outputView);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeKVStateData() throws IOException, InterruptedException, RocksDBException {
            byte[] previousKey = null;
            byte[] previousValue = null;
            DataOutputViewStreamWrapper kgOutView = null;
            OutputStream kgOutStream = null;
            CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = this.checkpointStreamWithResultProvider.getCheckpointOutputStream();
            try {
                try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(this.kvStateIterators, ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes);){
                    this.kvStateIterators = null;
                    if (mergeIterator.isValid()) {
                        this.keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                        kgOutStream = ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                        kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                        kgOutView.writeShort(mergeIterator.kvStateId());
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                    while (mergeIterator.isValid()) {
                        assert (!RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(previousKey));
                        if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
                            RocksDBFullSnapshotOperation.checkInterrupted();
                            RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(previousKey);
                        }
                        this.writeKeyValuePair(previousKey, previousValue, kgOutView);
                        if (mergeIterator.isNewKeyGroup()) {
                            kgOutView.writeShort(65535);
                            kgOutStream.close();
                            this.keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                            kgOutStream = ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupCompressionDecorator.decorateWithCompression((OutputStream)checkpointOutputStream);
                            kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        } else if (mergeIterator.isNewKeyValueState()) {
                            kgOutView.writeShort(mergeIterator.kvStateId());
                        }
                        previousKey = mergeIterator.key();
                        previousValue = mergeIterator.value();
                        mergeIterator.next();
                    }
                }
                if (previousKey != null) {
                    assert (!RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(previousKey));
                    RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(previousKey);
                    this.writeKeyValuePair(previousKey, previousValue, kgOutView);
                    kgOutView.writeShort(65535);
                    kgOutStream.close();
                    kgOutStream = null;
                }
            }
            finally {
                IOUtils.closeQuietly(kgOutStream);
            }
        }

        private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
            BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
            BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
        }

        static void setMetaDataFollowsFlagInKey(byte[] key) {
            key[0] = (byte)(key[0] | 0x80);
        }

        static void clearMetaDataFollowsFlag(byte[] key) {
            key[0] = (byte)(key[0] & 0xFFFFFF7F);
        }

        static boolean hasMetaDataFollowsFlag(byte[] key) {
            return 0 != (key[0] & 0x80);
        }

        private static void checkInterrupted() throws InterruptedException {
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedException("RocksDB snapshot interrupted.");
            }
        }

        protected void acquireResources() throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable(this.snapshotCloseableRegistry);
            this.openCheckpointStream();
        }

        protected void releaseResources() {
            this.closeLocalRegistry();
            this.releaseSnapshotOperationResources();
        }

        private void releaseSnapshotOperationResources() {
            this.releaseSnapshotResources();
        }

        protected void stopOperation() {
            this.closeLocalRegistry();
        }

        private void closeLocalRegistry() {
            if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(this.snapshotCloseableRegistry)) {
                try {
                    this.snapshotCloseableRegistry.close();
                }
                catch (Exception ex) {
                    LOG.warn("Error closing local registry", (Throwable)ex);
                }
            }
        }

        @Nonnull
        public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
            long startTime = System.currentTimeMillis();
            if (this.isStopped()) {
                throw new IOException("RocksDB closed.");
            }
            this.writeDBSnapshot();
            LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{this.checkpointStreamSupplier, Thread.currentThread(), System.currentTimeMillis() - startTime});
            return this.getSnapshotResultStateHandle();
        }
    }

    private class IncrementalSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> savepointDelegate;

        public IncrementalSnapshotStrategy() {
            this.savepointDelegate = new FullSnapshotStrategy();
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long checkpointTimestamp, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            SnapshotDirectory snapshotDirectory;
            if (CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType()) {
                return this.savepointDelegate.performSnapshot(checkpointId, checkpointTimestamp, checkpointStreamFactory, checkpointOptions);
            }
            if (RocksDBKeyedStateBackend.this.db == null) {
                throw new IOException("RocksDB closed.");
            }
            if (RocksDBKeyedStateBackend.this.kvStateInformation.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)checkpointTimestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            if (RocksDBKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled()) {
                LocalRecoveryDirectoryProvider directoryProvider = RocksDBKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider();
                File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId);
                if (directory.exists()) {
                    FileUtils.deleteDirectory(directory);
                }
                if (!directory.mkdirs()) {
                    throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory);
                }
                File rdbSnapshotDir = new File(directory, "rocks_db");
                Path path = new Path(rdbSnapshotDir.toURI());
                snapshotDirectory = SnapshotDirectory.permanent((Path)path);
            } else {
                Path path = new Path(RocksDBKeyedStateBackend.this.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
                snapshotDirectory = SnapshotDirectory.temporary((Path)path);
            }
            final RocksDBIncrementalSnapshotOperation snapshotOperation = new RocksDBIncrementalSnapshotOperation(RocksDBKeyedStateBackend.this, checkpointStreamFactory, snapshotDirectory, checkpointId);
            try {
                snapshotOperation.takeSnapshot();
            }
            catch (Exception e2) {
                snapshotOperation.stop();
                snapshotOperation.releaseResources(true);
                throw e2;
            }
            return new FutureTask<SnapshotResult<KeyedStateHandle>>(snapshotOperation::runSnapshot){

                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    snapshotOperation.stop();
                    return super.cancel(mayInterruptIfRunning);
                }

                @Override
                protected void done() {
                    snapshotOperation.releaseResources(this.isCancelled());
                }
            };
        }
    }

    private class FullSnapshotStrategy
    implements SnapshotStrategy<SnapshotResult<KeyedStateHandle>> {
        private FullSnapshotStrategy() {
        }

        public RunnableFuture<SnapshotResult<KeyedStateHandle>> performSnapshot(long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
            long startTime = System.currentTimeMillis();
            if (RocksDBKeyedStateBackend.this.kvStateInformation.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", (Object)timestamp);
                }
                return DoneFuture.of((Object)SnapshotResult.empty());
            }
            SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplier = RocksDBKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() && CheckpointType.SAVEPOINT != checkpointOptions.getCheckpointType() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory, (LocalRecoveryDirectoryProvider)RocksDBKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream((long)checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)primaryStreamFactory);
            final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
            final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(RocksDBKeyedStateBackend.this, supplier, snapshotCloseableRegistry);
            snapshotOperation.takeDBSnapShot();
            AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){

                protected void acquireResources() throws Exception {
                    RocksDBKeyedStateBackend.this.cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
                    snapshotOperation.openCheckpointStream();
                }

                protected void releaseResources() throws Exception {
                    this.closeLocalRegistry();
                    this.releaseSnapshotOperationResources();
                }

                private void releaseSnapshotOperationResources() {
                    snapshotOperation.releaseSnapshotResources();
                }

                protected void stopOperation() throws Exception {
                    this.closeLocalRegistry();
                }

                private void closeLocalRegistry() {
                    if (RocksDBKeyedStateBackend.this.cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
                        try {
                            snapshotCloseableRegistry.close();
                        }
                        catch (Exception ex) {
                            LOG.warn("Error closing local registry", (Throwable)ex);
                        }
                    }
                }

                @Nonnull
                public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                    long startTime = System.currentTimeMillis();
                    if (this.isStopped()) {
                        throw new IOException("RocksDB closed.");
                    }
                    snapshotOperation.writeDBSnapshot();
                    LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime});
                    return snapshotOperation.getSnapshotResultStateHandle();
                }
            };
            LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", new Object[]{primaryStreamFactory, Thread.currentThread(), System.currentTimeMillis() - startTime});
            return AsyncStoppableTaskWithCallback.from((StoppableCallbackCallable)ioCallable);
        }
    }

    static class RocksIteratorForKeysWrapper<K>
    implements Iterator<K>,
    AutoCloseable {
        private final RocksIteratorWrapper iterator;
        private final String state;
        private final TypeSerializer<K> keySerializer;
        private final int keyGroupPrefixBytes;
        private final byte[] namespaceBytes;
        private final boolean ambiguousKeyPossible;
        private K nextKey;

        RocksIteratorForKeysWrapper(RocksIteratorWrapper iterator, String state, TypeSerializer<K> keySerializer, int keyGroupPrefixBytes, boolean ambiguousKeyPossible, byte[] namespaceBytes) {
            this.iterator = Preconditions.checkNotNull(iterator);
            this.state = Preconditions.checkNotNull(state);
            this.keySerializer = Preconditions.checkNotNull(keySerializer);
            this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
            this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
            this.nextKey = null;
            this.ambiguousKeyPossible = ambiguousKeyPossible;
        }

        @Override
        public boolean hasNext() {
            try {
                while (this.nextKey == null && this.iterator.isValid()) {
                    byte[] key = this.iterator.key();
                    if (this.isMatchingNameSpace(key)) {
                        ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(key, this.keyGroupPrefixBytes, key.length - this.keyGroupPrefixBytes);
                        DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
                        K value = RocksDBKeySerializationUtils.readKey(this.keySerializer, inputStream, dataInput, this.ambiguousKeyPossible);
                        this.nextKey = value;
                    }
                    this.iterator.next();
                }
            }
            catch (Exception e2) {
                throw new FlinkRuntimeException("Failed to access state [" + this.state + "]", e2);
            }
            return this.nextKey != null;
        }

        @Override
        public K next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Failed to access state [" + this.state + "]");
            }
            K tmpKey = this.nextKey;
            this.nextKey = null;
            return tmpKey;
        }

        private boolean isMatchingNameSpace(@Nonnull byte[] key) {
            int namespaceBytesLength = this.namespaceBytes.length;
            int basicLength = namespaceBytesLength + this.keyGroupPrefixBytes;
            if (key.length >= basicLength) {
                for (int i = 1; i <= namespaceBytesLength; ++i) {
                    if (key[key.length - i] == this.namespaceBytes[namespaceBytesLength - i]) continue;
                    return false;
                }
                return true;
            }
            return false;
        }

        @Override
        public void close() {
            this.iterator.close();
        }
    }

    private static final class MergeIterator
    implements AutoCloseable {
        private final RocksIteratorWrapper iterator;
        private byte[] currentKey;
        private final int kvStateId;

        MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
            this.iterator = Preconditions.checkNotNull(iterator);
            this.currentKey = iterator.key();
            this.kvStateId = kvStateId;
        }

        public byte[] getCurrentKey() {
            return this.currentKey;
        }

        public void setCurrentKey(byte[] currentKey) {
            this.currentKey = currentKey;
        }

        public RocksIteratorWrapper getIterator() {
            return this.iterator;
        }

        public int getKvStateId() {
            return this.kvStateId;
        }

        @Override
        public void close() {
            IOUtils.closeQuietly(this.iterator);
        }

        static /* synthetic */ byte[] access$3002(MergeIterator x0, byte[] x1) {
            x0.currentKey = x1;
            return x1;
        }
    }

    @VisibleForTesting
    static final class RocksDBMergeIterator
    implements AutoCloseable {
        private final PriorityQueue<MergeIterator> heap;
        private final int keyGroupPrefixByteCount;
        private boolean newKeyGroup;
        private boolean newKVState;
        private boolean valid;
        private MergeIterator currentSubIterator;
        private static final List<Comparator<MergeIterator>> COMPARATORS;

        RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, int keyGroupPrefixByteCount) throws RocksDBException {
            Preconditions.checkNotNull(kvStateIterators);
            Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
            this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
            Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount - 1);
            if (kvStateIterators.size() > 0) {
                PriorityQueue<MergeIterator> iteratorPriorityQueue = new PriorityQueue<MergeIterator>(kvStateIterators.size(), iteratorComparator);
                for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
                    RocksIteratorWrapper rocksIterator = (RocksIteratorWrapper)rocksIteratorWithKVStateId.f0;
                    rocksIterator.seekToFirst();
                    if (rocksIterator.isValid()) {
                        iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, (Integer)rocksIteratorWithKVStateId.f1));
                        continue;
                    }
                    IOUtils.closeQuietly(rocksIterator);
                }
                kvStateIterators.clear();
                this.heap = iteratorPriorityQueue;
                this.valid = !this.heap.isEmpty();
                this.currentSubIterator = this.heap.poll();
            } else {
                this.heap = null;
                this.valid = false;
            }
            this.newKeyGroup = true;
            this.newKVState = true;
        }

        public void next() throws RocksDBException {
            this.newKeyGroup = false;
            this.newKVState = false;
            RocksIteratorWrapper rocksIterator = this.currentSubIterator.getIterator();
            rocksIterator.next();
            byte[] oldKey = this.currentSubIterator.getCurrentKey();
            if (rocksIterator.isValid()) {
                MergeIterator.access$3002(this.currentSubIterator, rocksIterator.key());
                if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.getCurrentKey())) {
                    this.heap.offer(this.currentSubIterator);
                    this.currentSubIterator = this.heap.poll();
                    this.newKVState = this.currentSubIterator.getIterator() != rocksIterator;
                    this.detectNewKeyGroup(oldKey);
                }
            } else {
                IOUtils.closeQuietly(rocksIterator);
                if (this.heap.isEmpty()) {
                    this.currentSubIterator = null;
                    this.valid = false;
                } else {
                    this.currentSubIterator = this.heap.poll();
                    this.newKVState = true;
                    this.detectNewKeyGroup(oldKey);
                }
            }
        }

        private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
            return 0 != RocksDBMergeIterator.compareKeyGroupsForByteArrays(a, b, this.keyGroupPrefixByteCount);
        }

        private void detectNewKeyGroup(byte[] oldKey) {
            if (this.isDifferentKeyGroup(oldKey, this.currentSubIterator.currentKey)) {
                this.newKeyGroup = true;
            }
        }

        public int keyGroup() {
            int result = 0;
            for (int i = 0; i < this.keyGroupPrefixByteCount; ++i) {
                result <<= 8;
                result |= this.currentSubIterator.currentKey[i] & 0xFF;
            }
            return result;
        }

        public byte[] key() {
            return this.currentSubIterator.getCurrentKey();
        }

        public byte[] value() {
            return this.currentSubIterator.getIterator().value();
        }

        public int kvStateId() {
            return this.currentSubIterator.getKvStateId();
        }

        public boolean isNewKeyValueState() {
            return this.newKVState;
        }

        public boolean isNewKeyGroup() {
            return this.newKeyGroup;
        }

        public boolean isValid() {
            return this.valid;
        }

        private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
            for (int i = 0; i < len; ++i) {
                int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
                if (diff == 0) continue;
                return diff;
            }
            return 0;
        }

        @Override
        public void close() {
            IOUtils.closeQuietly(this.currentSubIterator);
            this.currentSubIterator = null;
            IOUtils.closeAllQuietly(this.heap);
            this.heap.clear();
        }

        static {
            int maxBytes = 2;
            COMPARATORS = new ArrayList<Comparator<MergeIterator>>(maxBytes);
            for (int i = 0; i < maxBytes; ++i) {
                final int currentBytes = i + 1;
                COMPARATORS.add(new Comparator<MergeIterator>(){

                    @Override
                    public int compare(MergeIterator o1, MergeIterator o2) {
                        int arrayCmpRes = RocksDBMergeIterator.compareKeyGroupsForByteArrays(o1.currentKey, o2.currentKey, currentBytes);
                        return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
                    }
                });
            }
        }
    }

    private static class RocksDBIncrementalRestoreOperation<T> {
        private final RocksDBKeyedStateBackend<T> stateBackend;

        private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
            this.stateBackend = stateBackend;
        }

        void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
            boolean hasExtraKeys;
            boolean bl = hasExtraKeys = restoreStateHandles.size() > 1 || !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange);
            if (hasExtraKeys) {
                ((RocksDBKeyedStateBackend)this.stateBackend).createDB();
            }
            for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
                if (rawStateHandle instanceof IncrementalKeyedStateHandle) {
                    this.restoreInstance((IncrementalKeyedStateHandle)rawStateHandle, hasExtraKeys);
                    continue;
                }
                if (rawStateHandle instanceof IncrementalLocalKeyedStateHandle) {
                    Preconditions.checkState(!hasExtraKeys, "Cannot recover from local state after rescaling.");
                    this.restoreInstance((IncrementalLocalKeyedStateHandle)rawStateHandle);
                    continue;
                }
                throw new IllegalStateException("Unexpected state handle type, expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void restoreInstance(IncrementalKeyedStateHandle restoreStateHandle, boolean hasExtraKeys) throws Exception {
            Path temporaryRestoreInstancePath = new Path(((RocksDBKeyedStateBackend)this.stateBackend).instanceBasePath.getAbsolutePath(), UUID.randomUUID().toString());
            try {
                this.transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
                List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = this.readMetaData(restoreStateHandle.getMetaStateHandle());
                List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
                if (hasExtraKeys) {
                    this.restoreKeyGroupsShardWithTemporaryHelperInstance(temporaryRestoreInstancePath, columnFamilyDescriptors, stateMetaInfoSnapshots);
                } else {
                    IncrementalLocalKeyedStateHandle localKeyedStateHandle = new IncrementalLocalKeyedStateHandle(restoreStateHandle.getBackendIdentifier(), restoreStateHandle.getCheckpointId(), new DirectoryStateHandle(temporaryRestoreInstancePath), restoreStateHandle.getKeyGroupRange(), restoreStateHandle.getMetaStateHandle(), restoreStateHandle.getSharedState().keySet());
                    this.restoreLocalStateIntoFullInstance(localKeyedStateHandle, columnFamilyDescriptors, stateMetaInfoSnapshots);
                }
            }
            finally {
                FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
                if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
                    restoreFileSystem.delete(temporaryRestoreInstancePath, true);
                }
            }
        }

        private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
            List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = this.readMetaData(localKeyedStateHandle.getMetaDataState());
            List<ColumnFamilyDescriptor> columnFamilyDescriptors = this.createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
            this.restoreLocalStateIntoFullInstance(localKeyedStateHandle, columnFamilyDescriptors, stateMetaInfoSnapshots);
        }

        private List<ColumnFamilyDescriptor> createAndRegisterColumnFamilyDescriptors(List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(1 + stateMetaInfoSnapshots.size());
            for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
                ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), ((RocksDBKeyedStateBackend)this.stateBackend).columnOptions);
                columnFamilyDescriptors.add(columnFamilyDescriptor);
                ((RocksDBKeyedStateBackend)this.stateBackend).restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
            }
            return columnFamilyDescriptors;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void restoreLocalStateIntoFullInstance(IncrementalLocalKeyedStateHandle restoreStateHandle, List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception {
            ((RocksDBKeyedStateBackend)this.stateBackend).backendUID = restoreStateHandle.getBackendIdentifier();
            LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", (Object)((RocksDBKeyedStateBackend)this.stateBackend).operatorIdentifier, (Object)((RocksDBKeyedStateBackend)this.stateBackend).backendUID);
            if (!((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.mkdirs()) {
                throw new IOException("Could not create RocksDB data directory.");
            }
            Path restoreSourcePath = restoreStateHandle.getDirectoryStateHandle().getDirectory();
            this.restoreInstanceDirectoryFromPath(restoreSourcePath);
            ArrayList columnFamilyHandles = new ArrayList(1 + columnFamilyDescriptors.size());
            this.stateBackend.db = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getAbsolutePath(), columnFamilyDescriptors, columnFamilyHandles);
            ((RocksDBKeyedStateBackend)this.stateBackend).defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.remove(0);
            for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
                RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
                ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)columnFamilyHandles.get(i);
                RegisteredKeyedBackendStateMetaInfo stateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(stateMetaInfoSnapshot.getStateType(), stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot.getNamespaceSerializer(), stateMetaInfoSnapshot.getStateSerializer());
                ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.put(stateMetaInfoSnapshot.getName(), new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo>(columnFamilyHandle, stateMetaInfo));
            }
            SortedMap sortedMap = ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles;
            synchronized (sortedMap) {
                ((RocksDBKeyedStateBackend)this.stateBackend).materializedSstFiles.put(restoreStateHandle.getCheckpointId(), restoreStateHandle.getSharedStateHandleIDs());
            }
            ((RocksDBKeyedStateBackend)this.stateBackend).lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
        }

        private void restoreInstanceDirectoryFromPath(Path source) throws IOException {
            FileSystem fileSystem = source.getFileSystem();
            FileStatus[] fileStatuses = fileSystem.listStatus(source);
            if (fileStatuses == null) {
                throw new IOException("Cannot list file statues. Directory " + source + " does not exist.");
            }
            for (FileStatus fileStatus : fileStatuses) {
                Path filePath = fileStatus.getPath();
                String fileName = filePath.getName();
                File restoreFile = new File(source.getPath(), fileName);
                File targetFile = new File(((RocksDBKeyedStateBackend)this.stateBackend).instanceRocksDBPath.getPath(), fileName);
                if (fileName.endsWith(RocksDBKeyedStateBackend.SST_FILE_SUFFIX)) {
                    Files.createLink(targetFile.toPath(), restoreFile.toPath());
                    continue;
                }
                Files.copy(restoreFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(StreamStateHandle metaStateHandle) throws Exception {
            FSDataInputStream inputStream = null;
            try {
                inputStream = metaStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable(inputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.stateBackend).userCodeClassLoader, false);
                DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(inputStream);
                serializationProxy.read((DataInputView)in);
                if (CompatibilityUtil.resolveCompatibilityResult(serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), ((RocksDBKeyedStateBackend)this.stateBackend).keySerializer).isRequiresMigration()) {
                    throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                }
                List list = serializationProxy.getStateMetaInfoSnapshots();
                return list;
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(inputStream)) {
                    inputStream.close();
                }
            }
        }

        private void transferAllStateDataToDirectory(IncrementalKeyedStateHandle restoreStateHandle, Path dest) throws IOException {
            Map sstFiles = restoreStateHandle.getSharedState();
            Map miscFiles = restoreStateHandle.getPrivateState();
            this.transferAllDataFromStateHandles(sstFiles, dest);
            this.transferAllDataFromStateHandles(miscFiles, dest);
        }

        private void transferAllDataFromStateHandles(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath) throws IOException {
            for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
                StateHandleID stateHandleID = entry.getKey();
                StreamStateHandle remoteFileHandle = entry.getValue();
                this.copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void copyStateDataHandleData(Path restoreFilePath, StreamStateHandle remoteFileHandle) throws IOException {
            FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
            FSDataInputStream inputStream = null;
            FSDataOutputStream outputStream = null;
            try {
                int numBytes;
                inputStream = remoteFileHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable(inputStream);
                outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
                ((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.registerCloseable(outputStream);
                byte[] buffer = new byte[8192];
                while ((numBytes = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, numBytes);
                }
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(inputStream)) {
                    inputStream.close();
                }
                if (((RocksDBKeyedStateBackend)this.stateBackend).cancelStreamRegistry.unregisterCloseable(outputStream)) {
                    outputStream.close();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void restoreKeyGroupsShardWithTemporaryHelperInstance(Path restoreInstancePath, List<ColumnFamilyDescriptor> columnFamilyDescriptors, List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) throws Exception {
            ArrayList columnFamilyHandles = new ArrayList(1 + columnFamilyDescriptors.size());
            try (RocksDB restoreDb = ((RocksDBKeyedStateBackend)this.stateBackend).openDB(restoreInstancePath.getPath(), columnFamilyDescriptors, columnFamilyHandles);
                 RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.db);){
                ColumnFamilyHandle defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.remove(0);
                Preconditions.checkState(columnFamilyHandles.size() == columnFamilyDescriptors.size());
                try {
                    for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
                        ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)columnFamilyHandles.get(i);
                        ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
                        Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo> registeredStateMetaInfoEntry = (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo>)((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.get(stateMetaInfoSnapshot.getName());
                        if (null == registeredStateMetaInfoEntry) {
                            RegisteredKeyedBackendStateMetaInfo stateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(stateMetaInfoSnapshot.getStateType(), stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot.getNamespaceSerializer(), stateMetaInfoSnapshot.getStateSerializer());
                            registeredStateMetaInfoEntry = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo>(this.stateBackend.db.createColumnFamily(columnFamilyDescriptor), stateMetaInfo);
                            ((RocksDBKeyedStateBackend)this.stateBackend).kvStateInformation.put(stateMetaInfoSnapshot.getName(), registeredStateMetaInfoEntry);
                        }
                        ColumnFamilyHandle targetColumnFamilyHandle = (ColumnFamilyHandle)registeredStateMetaInfoEntry.f0;
                        try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(restoreDb, columnFamilyHandle);){
                            int startKeyGroup = this.stateBackend.getKeyGroupRange().getStartKeyGroup();
                            byte[] startKeyGroupPrefixBytes = new byte[((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes];
                            for (int j2 = 0; j2 < ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes; ++j2) {
                                startKeyGroupPrefixBytes[j2] = (byte)(startKeyGroup >>> (((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes - j2 - 1) * 8);
                            }
                            iterator.seek(startKeyGroupPrefixBytes);
                            while (iterator.isValid()) {
                                int keyGroup = 0;
                                for (int j3 = 0; j3 < ((RocksDBKeyedStateBackend)this.stateBackend).keyGroupPrefixBytes; ++j3) {
                                    keyGroup = (keyGroup << 8) + iterator.key()[j3];
                                }
                                if (((RocksDBKeyedStateBackend)this.stateBackend).keyGroupRange.contains(keyGroup)) {
                                    writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
                                }
                                iterator.next();
                            }
                            continue;
                        }
                    }
                }
                finally {
                    IOUtils.closeQuietly((AutoCloseable)defaultColumnFamily);
                    for (ColumnFamilyHandle flinkColumnFamilyHandle : columnFamilyHandles) {
                        IOUtils.closeQuietly((AutoCloseable)flinkColumnFamilyHandle);
                    }
                }
            }
        }
    }

    private static final class RocksDBFullRestoreOperation<K> {
        private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
        private KeyGroupsStateHandle currentKeyGroupsStateHandle;
        private FSDataInputStream currentStateHandleInStream;
        private DataInputView currentStateHandleInView;
        private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
        private StreamCompressionDecorator keygroupStreamCompressionDecorator;

        public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
            this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
        }

        public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) throws IOException, StateMigrationException, RocksDBException {
            ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).createDB();
            for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                if (keyedStateHandle == null) continue;
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
                this.restoreKeyGroupsInStateHandle();
            }
        }

        private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException {
            try {
                this.currentStateHandleInStream = this.currentKeyGroupsStateHandle.openInputStream();
                ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.registerCloseable(this.currentStateHandleInStream);
                this.currentStateHandleInView = new DataInputViewStreamWrapper(this.currentStateHandleInStream);
                this.restoreKVStateMetaData();
                this.restoreKVStateData();
            }
            finally {
                if (((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).cancelStreamRegistry.unregisterCloseable(this.currentStateHandleInStream)) {
                    IOUtils.closeQuietly(this.currentStateHandleInStream);
                }
            }
        }

        private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).userCodeClassLoader, false);
            serializationProxy.read(this.currentStateHandleInView);
            if (CompatibilityUtil.resolveCompatibilityResult(serializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, serializationProxy.getKeySerializerConfigSnapshot(), ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).keySerializer).isRequiresMigration()) {
                throw new StateMigrationException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
            }
            this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
            List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
            this.currentStateHandleKVStateColumnFamilies = new ArrayList<ColumnFamilyHandle>(restoredMetaInfos.size());
            for (RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo : restoredMetaInfos) {
                Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo> registeredColumn = (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo>)((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.get(restoredMetaInfo.getName());
                if (registeredColumn == null) {
                    byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
                    ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(nameBytes, ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).columnOptions);
                    RegisteredKeyedBackendStateMetaInfo stateMetaInfo = new RegisteredKeyedBackendStateMetaInfo(restoredMetaInfo.getStateType(), restoredMetaInfo.getName(), restoredMetaInfo.getNamespaceSerializer(), restoredMetaInfo.getStateSerializer());
                    ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
                    ColumnFamilyHandle columnFamily = this.rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
                    registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo>(columnFamily, stateMetaInfo);
                    ((RocksDBKeyedStateBackend)this.rocksDBKeyedStateBackend).kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
                }
                this.currentStateHandleKVStateColumnFamilies.add((ColumnFamilyHandle)registeredColumn.f0);
            }
        }

        private void restoreKVStateData() throws IOException, RocksDBException {
            try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.rocksDBKeyedStateBackend.db);){
                for (Tuple2 keyGroupOffset : this.currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
                    int keyGroup = (Integer)keyGroupOffset.f0;
                    Preconditions.checkState(this.rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), "The key group must belong to the backend");
                    long offset = (Long)keyGroupOffset.f1;
                    if (0L == offset) continue;
                    this.currentStateHandleInStream.seek(offset);
                    InputStream compressedKgIn = this.keygroupStreamCompressionDecorator.decorateWithCompression((InputStream)this.currentStateHandleInStream);
                    Throwable throwable = null;
                    try {
                        DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                        int kvStateId = compressedKgInputView.readShort();
                        ColumnFamilyHandle handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                        boolean keyGroupHasMoreKeys = true;
                        while (keyGroupHasMoreKeys) {
                            byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
                            byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
                            if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
                                RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
                                writeBatchWrapper.put(handle, key, value);
                                kvStateId = 0xFFFF & compressedKgInputView.readShort();
                                if (65535 == kvStateId) {
                                    keyGroupHasMoreKeys = false;
                                    continue;
                                }
                                handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                                continue;
                            }
                            writeBatchWrapper.put(handle, key, value);
                        }
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (compressedKgIn == null) continue;
                        if (throwable != null) {
                            try {
                                compressedKgIn.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        compressedKgIn.close();
                    }
                }
            }
        }
    }
}

