/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.SortedMapSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.InternalStateType;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.GTableDescription;
import org.apache.flink.runtime.state.gemini.engine.GeminiDB;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedListImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedSortedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableOneKeyImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedListImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedSortedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedValueImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KListTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KSortedMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KVTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKListTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKSortedMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKVTableDescription;
import org.apache.flink.runtime.state.gemini.engine.page.PKey2Serializer;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink2KeyImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlinkImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlinkListImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotResult;
import org.apache.flink.runtime.state.gemini.engine.utils.ThreadLocalTypeSerializer;
import org.apache.flink.runtime.state.gemini.internal.AbstractGeminiKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.internal.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.gemini.internal.GeminiKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.internal.GeminiLocalKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedListStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedMapStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedValueStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedListStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedMapStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedValueStateImpl;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapState;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeminiInternalStateBackend
extends AbstractInternalStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiInternalStateBackend.class);
    private GeminiDB db;
    private Map<String, GTable> tables;
    private MetricGroup dbMetricGroup;
    private String operatorIdentifier;
    private GConfiguration gConfiguration;
    private final LocalRecoveryConfig localRecoveryConfig;

    public GeminiInternalStateBackend(int numberOfGroups, KeyGroupRange groups, ClassLoader userClassLoader, LocalRecoveryConfig localRecoveryConfig, TaskKvStateRegistry kvStateRegistry, String operatorIdentifier, ExecutionConfig executionConfig, GConfiguration gConfiguration, MetricGroup operatorMetricGroup) throws Exception {
        super(numberOfGroups, groups, userClassLoader, kvStateRegistry, executionConfig);
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        this.dbMetricGroup = operatorMetricGroup.addGroup("geminiDB");
        this.operatorIdentifier = (String)Preconditions.checkNotNull((Object)operatorIdentifier);
        this.gConfiguration = (GConfiguration)Preconditions.checkNotNull((Object)gConfiguration);
        this.tables = new HashMap<String, GTable>();
        LOG.info("GeminiInternalStateBackend is created for operator {}, with backend UUID {}", (Object)operatorIdentifier, (Object)gConfiguration.getBackendUID());
    }

    @Override
    protected void closeImpl() {
        if (this.db != null) {
            this.db.close();
            this.db = null;
        }
        this.tables.clear();
    }

    @Override
    protected StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo stateMetaInfo) {
        throw new UnsupportedOperationException();
    }

    @Override
    protected StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo stateMetaInfo) {
        throw new UnsupportedOperationException();
    }

    @Override
    public <K, V> KeyedValueState<K, V> createKeyedValueState(KeyedValueStateDescriptor<K, V> keyedStateDescriptor) throws Exception {
        String stateName = keyedStateDescriptor.getName();
        GeminiKeyedValueStateImpl<K, V> keyedState = (GeminiKeyedValueStateImpl<K, V>)this.keyedStates.get(stateName);
        if (keyedState == null) {
            this.tryRegisterStateMetaInfo(keyedStateDescriptor);
            KVTableDescription tableDescription = new KVTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlinkImpl.of(GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getValueSerializer())));
            GTableOneKeyImpl table = (GTableOneKeyImpl)this.getOrCreateTable(tableDescription);
            keyedState = new GeminiKeyedValueStateImpl<K, V>(keyedStateDescriptor, table);
            this.keyedStates.put(stateName, keyedState);
            this.tables.put(stateName, table);
        }
        return keyedState;
    }

    @Override
    public <K, E> KeyedListState<K, E> createKeyedListState(KeyedListStateDescriptor<K, E> keyedStateDescriptor) throws Exception {
        String stateName = keyedStateDescriptor.getName();
        GeminiKeyedListStateImpl<K, E> keyedState = (GeminiKeyedListStateImpl<K, E>)this.keyedStates.get(stateName);
        if (keyedState == null) {
            this.tryRegisterStateMetaInfo(keyedStateDescriptor);
            KListTableDescription tableDescription = new KListTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlinkListImpl.of(GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getElementSerializer())));
            GTableKeyedListImpl table = (GTableKeyedListImpl)this.getOrCreateTable(tableDescription);
            keyedState = new GeminiKeyedListStateImpl<K, E>(keyedStateDescriptor, table);
            this.keyedStates.put(stateName, keyedState);
            this.tables.put(stateName, table);
        }
        return keyedState;
    }

    @Override
    public <K, MK, MV> KeyedMapState<K, MK, MV> createKeyedMapState(KeyedMapStateDescriptor<K, MK, MV> keyedStateDescriptor) throws Exception {
        String stateName = keyedStateDescriptor.getName();
        GeminiKeyedMapStateImpl<K, MK, MV> keyedState = (GeminiKeyedMapStateImpl<K, MK, MV>)this.keyedStates.get(stateName);
        if (keyedState == null) {
            this.tryRegisterStateMetaInfo(keyedStateDescriptor);
            KMapTableDescription tableDescription = new KMapTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getMapKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getMapValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            GTableKeyedMapImpl table = (GTableKeyedMapImpl)this.getOrCreateTable(tableDescription);
            keyedState = new GeminiKeyedMapStateImpl<K, MK, MV>(keyedStateDescriptor, table);
            this.keyedStates.put(stateName, keyedState);
            this.tables.put(stateName, table);
        }
        return keyedState;
    }

    @Override
    public <K, MK, MV> KeyedSortedMapState<K, MK, MV> createKeyedSortedMapState(KeyedSortedMapStateDescriptor<K, MK, MV> keyedStateDescriptor) throws Exception {
        String stateName = keyedStateDescriptor.getName();
        GeminiKeyedSortedMapStateImpl<K, MK, MV> keyedSortedMapState = (GeminiKeyedSortedMapStateImpl<K, MK, MV>)this.keyedStates.get(stateName);
        if (keyedSortedMapState == null) {
            this.tryRegisterStateMetaInfo(keyedStateDescriptor);
            PageSerdeFlink2KeyImpl pageSerdeFlink2Key = PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getMapKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(keyedStateDescriptor.getMapValueSerializer()), keyedStateDescriptor.getMapKeyComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable());
            KSortedMapTableDescription tableDescription = new KSortedMapTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), pageSerdeFlink2Key);
            GTableKeyedSortedMapImpl table = (GTableKeyedSortedMapImpl)this.getOrCreateTable(tableDescription);
            keyedSortedMapState = new GeminiKeyedSortedMapStateImpl<K, MK, MV>(keyedStateDescriptor, table);
            this.keyedStates.put(stateName, keyedSortedMapState);
            this.tables.put(stateName, table);
        }
        return keyedSortedMapState;
    }

    @Override
    public <K, N, V> SubKeyedValueState<K, N, V> createSubKeyedValueState(SubKeyedValueStateDescriptor<K, N, V> subKeyedStateDescriptor) throws Exception {
        String stateName = subKeyedStateDescriptor.getName();
        GeminiSubKeyedValueStateImpl subKeyedValueState = (GeminiSubKeyedValueStateImpl)this.subKeyedStates.get(stateName);
        if (subKeyedValueState == null) {
            this.tryRegisterStateMetaInfo(subKeyedStateDescriptor);
            SubKVTableDescription tableDescription = new SubKVTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getNamespaceSerializer()), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            GTableSubKeyedValueImpl table = (GTableSubKeyedValueImpl)this.getOrCreateTable(tableDescription);
            subKeyedValueState = new GeminiSubKeyedValueStateImpl(subKeyedStateDescriptor, table);
            this.subKeyedStates.put(stateName, subKeyedValueState);
            this.tables.put(stateName, table);
        }
        return subKeyedValueState;
    }

    @Override
    public <K, N, E> SubKeyedListState<K, N, E> createSubKeyedListState(SubKeyedListStateDescriptor<K, N, E> subKeyedStateDescriptor) throws Exception {
        String stateName = subKeyedStateDescriptor.getName();
        GeminiSubKeyedListStateImpl<K, N, E> subKeyedListState = (GeminiSubKeyedListStateImpl<K, N, E>)this.subKeyedStates.get(stateName);
        if (subKeyedListState == null) {
            this.tryRegisterStateMetaInfo(subKeyedStateDescriptor);
            SubKListTableDescription tableDescription = new SubKListTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlinkListImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getNamespaceSerializer())), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getElementSerializer())));
            GTableSubKeyedListImpl table = (GTableSubKeyedListImpl)this.getOrCreateTable(tableDescription);
            subKeyedListState = new GeminiSubKeyedListStateImpl<K, N, E>(subKeyedStateDescriptor, table);
            this.subKeyedStates.put(stateName, subKeyedListState);
            this.tables.put(stateName, table);
        }
        return subKeyedListState;
    }

    @Override
    public <K, N, MK, MV> SubKeyedMapState<K, N, MK, MV> createSubKeyedMapState(SubKeyedMapStateDescriptor<K, N, MK, MV> subKeyedStateDescriptor) throws Exception {
        String stateName = subKeyedStateDescriptor.getName();
        GeminiSubKeyedMapStateImpl subKeyedMapState = (GeminiSubKeyedMapStateImpl)this.subKeyedStates.get(stateName);
        if (subKeyedMapState == null) {
            this.tryRegisterStateMetaInfo(subKeyedStateDescriptor);
            TypeSerializer keySerializer = subKeyedStateDescriptor.getKeySerializer();
            TypeSerializer namespaceSerializer = subKeyedStateDescriptor.getNamespaceSerializer();
            MapSerializer<MK, MV> mapSerializer = subKeyedStateDescriptor.getValueSerializer();
            PageSerdeFlink2KeyImpl pageSerdeFlink2Key = PageSerdeFlink2KeyImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(keySerializer), GeminiInternalStateBackend.getSafeSerializer(namespaceSerializer)), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable());
            SubKMapTableDescription tableDescription = new SubKMapTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), pageSerdeFlink2Key);
            GTableSubKeyedMapImpl table = (GTableSubKeyedMapImpl)this.getOrCreateTable(tableDescription);
            subKeyedMapState = new GeminiSubKeyedMapStateImpl(subKeyedStateDescriptor, table);
            this.subKeyedStates.put(stateName, subKeyedMapState);
            this.tables.put(stateName, table);
        }
        return subKeyedMapState;
    }

    @Override
    public <K, N, MK, MV> SubKeyedSortedMapState<K, N, MK, MV> createSubKeyedSortedMapState(SubKeyedSortedMapStateDescriptor<K, N, MK, MV> subKeyedStateDescriptor) throws Exception {
        String stateName = subKeyedStateDescriptor.getName();
        GeminiSubKeyedSortedMapStateImpl<K, N, MK, MV> subKeyedSortedMapState = (GeminiSubKeyedSortedMapStateImpl<K, N, MK, MV>)this.subKeyedStates.get(stateName);
        if (subKeyedSortedMapState == null) {
            this.tryRegisterStateMetaInfo(subKeyedStateDescriptor);
            PageSerdeFlink2KeyImpl pageSerdeFlink2Key = PageSerdeFlink2KeyImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getNamespaceSerializer())), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getMapKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(subKeyedStateDescriptor.getMapValueSerializer()), subKeyedStateDescriptor.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable());
            SubKSortedMapTableDescription tableDescription = new SubKSortedMapTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), pageSerdeFlink2Key);
            GTableSubKeyedSortedMapImpl table = (GTableSubKeyedSortedMapImpl)this.getOrCreateTable(tableDescription);
            subKeyedSortedMapState = new GeminiSubKeyedSortedMapStateImpl<K, N, MK, MV>(subKeyedStateDescriptor, table);
            this.subKeyedStates.put(stateName, subKeyedSortedMapState);
            this.tables.put(stateName, table);
        }
        return subKeyedSortedMapState;
    }

    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(final long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
        if (this.registeredStateMetaInfos.isEmpty()) {
            LOG.info("Snapshot done with empty states for {}/{}.", (Object)checkpointId, (Object)timestamp);
            return DoneFuture.of(SnapshotResult.empty());
        }
        LOG.info("Start to snapshot for {}/{}.", (Object)checkpointId, (Object)timestamp);
        long syncStartTime = System.currentTimeMillis();
        final ArrayList<StateMetaInfoSnapshot> keyedStateMetaSnapshots = new ArrayList<StateMetaInfoSnapshot>();
        final ArrayList<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = new ArrayList<StateMetaInfoSnapshot>();
        for (Map.Entry registeredStateMetaInfoEntry : this.registeredStateMetaInfos.entrySet()) {
            RegisteredStateMetaInfo stateMetaInfo = (RegisteredStateMetaInfo)registeredStateMetaInfoEntry.getValue();
            if (stateMetaInfo.getStateType().isKeyedState()) {
                keyedStateMetaSnapshots.add(stateMetaInfo.snapshot());
                continue;
            }
            subKeyedStateMetaSnapshots.add(stateMetaInfo.snapshot());
        }
        BackendSnapshotMeta backendSnapshotMeta = new BackendSnapshotMeta(checkpointId, timestamp, this.getLocalSnapshotDirectory(checkpointId));
        this.db.startSnapshot(backendSnapshotMeta);
        final SupplierWithException checkpointStreamSupplier = this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
        AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){
            CheckpointStreamWithResultProvider streamAndResultExtractor = null;

            @Override
            protected void acquireResources() throws Exception {
                this.streamAndResultExtractor = (CheckpointStreamWithResultProvider)checkpointStreamSupplier.get();
                GeminiInternalStateBackend.this.cancelStreamRegistry.registerCloseable((Closeable)this.streamAndResultExtractor);
            }

            @Override
            protected void releaseResources() {
                this.unregisterAndCloseStreamAndResultExtractor();
            }

            @Override
            protected void stopOperation() {
                this.unregisterAndCloseStreamAndResultExtractor();
            }

            private void unregisterAndCloseStreamAndResultExtractor() {
                if (GeminiInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                    IOUtils.closeQuietly((AutoCloseable)this.streamAndResultExtractor);
                    this.streamAndResultExtractor = null;
                }
            }

            @Override
            @Nonnull
            protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)localStream));
                InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(keyedStateMetaSnapshots, subKeyedStateMetaSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, GeminiInternalStateBackend.this.keyGroupCompressionDecorator));
                serializationProxy.write((DataOutputView)outView);
                Future<DBSnapshotResult> future = GeminiInternalStateBackend.this.db.getSnapshotResult(checkpointId);
                DBSnapshotResult dbSnapshotResult = future.get();
                if (GeminiInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                    SnapshotResult<StreamStateHandle> streamSnapshotResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                    this.streamAndResultExtractor = null;
                    StreamStateHandle streamStateHandle = streamSnapshotResult.getJobManagerOwnedSnapshot();
                    DirectoryStateHandle directoryStateHandle = dbSnapshotResult.getDfsSnapshotDirectory().completeSnapshotAndGetHandle();
                    GeminiKeyedStateHandle snapshot = new GeminiKeyedStateHandle(checkpointId, GeminiInternalStateBackend.this.getKeyGroupRange(), streamStateHandle, GeminiInternalStateBackend.this.convertDirectoryStateHandleToStreamHandle(directoryStateHandle), dbSnapshotResult.getDfsSnapshotMeta());
                    LOG.info("Gemini backend snapshot (" + primaryStreamFactory + ", asynchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - asyncStartTime) + " ms.");
                    StreamStateHandle localStreamStateHandle = streamSnapshotResult.getTaskLocalSnapshot();
                    if (localStreamStateHandle != null) {
                        DirectoryStateHandle localDirectoryStateHandle = dbSnapshotResult.getLocalSnapshotDirectory().completeSnapshotAndGetHandle();
                        GeminiLocalKeyedStateHandle localSnapshot = new GeminiLocalKeyedStateHandle(checkpointId, GeminiInternalStateBackend.this.getKeyGroupRange(), localStreamStateHandle, GeminiInternalStateBackend.this.convertDirectoryStateHandleToStreamHandle(localDirectoryStateHandle), dbSnapshotResult.getLocalSnapshotMeta());
                        return SnapshotResult.withLocalState(snapshot, localSnapshot);
                    }
                    return SnapshotResult.of(snapshot);
                }
                throw new IOException("Stream already closed and cannot return a handle.");
            }
        };
        AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable);
        LOG.info("Gemini backend snapshot (" + primaryStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(Collection<KeyedStateHandle> restoredSnapshots) throws Exception {
        this.db = new GeminiDB("GeminiDB_" + this.operatorIdentifier, this.gConfiguration, this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getEndKeyGroup(), this.dbMetricGroup);
        if (restoredSnapshots != null && !restoredSnapshots.isEmpty()) {
            LOG.info("Initializing gemini internal state backend from snapshots {}.", restoredSnapshots);
            ArrayList<DBSnapshotMeta> restoredDBSnapshotMeta = new ArrayList<DBSnapshotMeta>();
            HashMap<String, GTable> restoredTables = new HashMap<String, GTable>();
            for (KeyedStateHandle rawSnapshot : restoredSnapshots) {
                Preconditions.checkState((boolean)(rawSnapshot instanceof AbstractGeminiKeyedStateHandle));
                AbstractGeminiKeyedStateHandle snapshot = (AbstractGeminiKeyedStateHandle)rawSnapshot;
                StreamStateHandle snapshotHandle = snapshot.getMetaStateHandle();
                if (snapshotHandle == null) continue;
                FSDataInputStream inputStream = snapshotHandle.openInputStream();
                this.cancelStreamRegistry.registerCloseable((Closeable)inputStream);
                try {
                    DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper((InputStream)inputStream);
                    InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(this.getUserClassLoader(), true);
                    serializationProxy.read((DataInputView)inputView);
                    List<StateMetaInfoSnapshot> keyedStateMetaInfos = serializationProxy.getKeyedStateMetaSnapshots();
                    for (int i = 0; i < keyedStateMetaInfos.size(); ++i) {
                        StateMetaInfoSnapshot keyedStateMetaSnapshot = keyedStateMetaInfos.get(i);
                        String stateName = keyedStateMetaSnapshot.getName();
                        this.restoredKvStateMetaInfos.put(stateName, keyedStateMetaSnapshot);
                        RegisteredStateMetaInfo keyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo(keyedStateMetaSnapshot);
                        this.registeredStateMetaInfos.put(stateName, keyedStateMetaInfo);
                        GTableDescription tableDescription = this.createGTableDescription(keyedStateMetaInfo);
                        GTable table = this.getOrCreateTable(tableDescription);
                        restoredTables.put(tableDescription.getTableName(), table);
                    }
                    List<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = serializationProxy.getSubKeyedStateMetaSnapshots();
                    for (int i = 0; i < subKeyedStateMetaSnapshots.size(); ++i) {
                        StateMetaInfoSnapshot subKeyedStateMetaSnapshot = subKeyedStateMetaSnapshots.get(i);
                        String stateName = subKeyedStateMetaSnapshot.getName();
                        RegisteredStateMetaInfo subKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(subKeyedStateMetaSnapshot);
                        this.registeredStateMetaInfos.put(stateName, subKeyedStateMetaInfo);
                        this.restoredKvStateMetaInfos.put(stateName, subKeyedStateMetaSnapshot);
                        GTableDescription tableDescription = this.createGTableDescription(subKeyedStateMetaInfo);
                        GTable table = this.getOrCreateTable(tableDescription);
                        restoredTables.put(tableDescription.getTableName(), table);
                    }
                    restoredDBSnapshotMeta.add(snapshot.getDBSnapshotMeta());
                }
                finally {
                    if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) continue;
                    IOUtils.closeQuietly((AutoCloseable)inputStream);
                }
            }
            this.db.restoreFromSnapshot(restoredDBSnapshotMeta, restoredTables, this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getEndKeyGroup());
        }
        try {
            this.db.open();
        }
        catch (Exception e) {
            LOG.error("Failed to open GeminiDB, {}", (Throwable)e);
            throw e;
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAbort(long checkpointId) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotAbort(checkpointId);
    }

    @Override
    public void notifyCheckpointSubsume(long checkpointId) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotSubsume(checkpointId);
    }

    @Override
    public int numStateEntries() {
        return 0;
    }

    private String getTableName(String stateName) {
        return stateName;
    }

    private GTableDescription createGTableDescription(RegisteredStateMetaInfo metaInfo) {
        InternalStateType stateType = metaInfo.getStateType();
        String stateName = metaInfo.getName();
        String tableName = this.getTableName(stateName);
        int startGroup = this.getKeyGroupRange().getStartKeyGroup();
        int numGroups = this.getKeyGroupRange().getNumberOfKeyGroups();
        int maxParallelism = this.getNumGroups();
        switch (stateType) {
            case KEYED_VALUE: {
                PageSerdeFlinkImpl pageSerdeFlink = PageSerdeFlinkImpl.of(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getValueSerializer()));
                return new KVTableDescription(tableName, startGroup, numGroups, maxParallelism, pageSerdeFlink);
            }
            case KEYED_MAP: {
                MapSerializer mapSerializer = (MapSerializer)metaInfo.getValueSerializer();
                PageSerdeFlink2KeyImpl pageSerdeFlink2Key = PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable());
                return new KMapTableDescription(tableName, startGroup, numGroups, maxParallelism, pageSerdeFlink2Key);
            }
            case KEYED_LIST: {
                ListSerializer listSerializer = (ListSerializer)metaInfo.getValueSerializer();
                PageSerdeFlinkImpl pageSerdeFlinkList = PageSerdeFlinkImpl.of(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(listSerializer.getElementSerializer()));
                return new KListTableDescription(tableName, startGroup, numGroups, maxParallelism, pageSerdeFlinkList);
            }
            case KEYED_SORTEDMAP: {
                SortedMapSerializer sortedMapSerializer = (SortedMapSerializer)metaInfo.getValueSerializer();
                PageSerdeFlink2KeyImpl sortedPageSerdeFlink2Key = PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(sortedMapSerializer.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(sortedMapSerializer.getValueSerializer()), sortedMapSerializer.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable());
                return new KSortedMapTableDescription(tableName, startGroup, numGroups, maxParallelism, sortedPageSerdeFlink2Key);
            }
            case SUBKEYED_LIST: {
                ListSerializer listSerializer = (ListSerializer)metaInfo.getValueSerializer();
                return new SubKListTableDescription(this.getTableName(stateName), this.getKeyGroupRange().getStartKeyGroup(), this.getKeyGroupRange().getNumberOfKeyGroups(), this.getNumGroups(), PageSerdeFlinkListImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getNamespaceSerializer())), GeminiInternalStateBackend.getSafeSerializer(listSerializer.getElementSerializer())));
            }
            case SUBKEYED_VALUE: {
                return new SubKVTableDescription(this.getTableName(stateName), startGroup, numGroups, maxParallelism, PageSerdeFlink2KeyImpl.of(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getNamespaceSerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            }
            case SUBKEYED_MAP: {
                MapSerializer mapSerializer = (MapSerializer)metaInfo.getValueSerializer();
                return new SubKMapTableDescription(this.getTableName(stateName), startGroup, numGroups, maxParallelism, PageSerdeFlink2KeyImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getNamespaceSerializer())), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(mapSerializer.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            }
            case SUBKEYED_SORTEDMAP: {
                SortedMapSerializer sortedMapSerializer = (SortedMapSerializer)metaInfo.getValueSerializer();
                return new SubKSortedMapTableDescription(this.getTableName(stateName), startGroup, numGroups, maxParallelism, PageSerdeFlink2KeyImpl.of(new PKey2Serializer(GeminiInternalStateBackend.getSafeSerializer(metaInfo.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(metaInfo.getNamespaceSerializer())), GeminiInternalStateBackend.getSafeSerializer(sortedMapSerializer.getKeySerializer()), GeminiInternalStateBackend.getSafeSerializer(sortedMapSerializer.getValueSerializer()), sortedMapSerializer.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable()));
            }
        }
        throw new RuntimeException("Unknown internal type");
    }

    private GTable getOrCreateTable(GTableDescription tableDescription) {
        String tableName = tableDescription.getTableName();
        GTable table = this.tables.get(tableName);
        if (table == null) {
            table = this.db.getTableOrCreate(tableDescription);
            this.tables.put(tableName, table);
        }
        return table;
    }

    private static <T> TypeSerializer<T> getSafeSerializer(TypeSerializer<T> serializer) {
        return new ThreadLocalTypeSerializer(serializer);
    }

    @Nullable
    private SnapshotDirectory getLocalSnapshotDirectory(long checkpointId) throws IOException {
        if (!this.localRecoveryConfig.isLocalRecoveryEnabled()) {
            return null;
        }
        LocalRecoveryDirectoryProvider directoryProvider = this.localRecoveryConfig.getLocalStateDirectoryProvider();
        File directory = new File(directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId), UUID.randomUUID().toString().replace("-", ""));
        if (directory.exists()) {
            FileUtils.deleteDirectory((File)directory);
        }
        if (!directory.mkdirs()) {
            throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory);
        }
        File dbSnapshotDir = new File(directory, "geminiDB");
        Path path = new Path(dbSnapshotDir.toURI());
        return SnapshotDirectory.permanent(path);
    }

    private DirectoryStreamStateHandle convertDirectoryStateHandleToStreamHandle(DirectoryStateHandle directoryStateHandle) {
        return new DirectoryStreamStateHandle(directoryStateHandle.getDirectory());
    }
}

