package org.apache.flink.runtime.state.gemini;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.SortedMapSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.InternalStateType;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.GTableDescription;
import org.apache.flink.runtime.state.gemini.engine.GeminiDB;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedListImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableKeyedSortedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableOneKeyImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedListImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedSortedMapImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.GTableSubKeyedValueImpl;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KListTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KSortedMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.KVTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKListTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKSortedMapTableDescription;
import org.apache.flink.runtime.state.gemini.engine.hashtable.SubKVTableDescription;
import org.apache.flink.runtime.state.gemini.engine.page.PKey2Serializer;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlink2KeyImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlinkImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageSerdeFlinkListImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotResult;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.utils.ThreadLocalTypeSerializer;
import org.apache.flink.runtime.state.gemini.internal.AbstractGeminiKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.internal.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.gemini.internal.GeminiKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.internal.GeminiLocalKeyedStateHandle;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedListStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedMapStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.gemini.keyed.GeminiKeyedValueStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedListStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedMapStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.gemini.subkeyed.GeminiSubKeyedValueStateImpl;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapState;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/GeminiInternalStateBackend.class */
public class GeminiInternalStateBackend extends AbstractInternalStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiInternalStateBackend.class);
    private GeminiDB db;
    private Map<String, GTable> tables;
    private MetricGroup dbMetricGroup;
    private String operatorIdentifier;
    private GConfiguration gConfiguration;
    private final LocalRecoveryConfig localRecoveryConfig;

    public GeminiInternalStateBackend(int i, KeyGroupRange keyGroupRange, ClassLoader classLoader, LocalRecoveryConfig localRecoveryConfig, TaskKvStateRegistry taskKvStateRegistry, String str, ExecutionConfig executionConfig, GConfiguration gConfiguration, MetricGroup metricGroup) throws Exception {
        super(i, keyGroupRange, classLoader, taskKvStateRegistry, executionConfig);
        this.localRecoveryConfig = (LocalRecoveryConfig) Preconditions.checkNotNull(localRecoveryConfig);
        this.dbMetricGroup = metricGroup.addGroup("geminiDB");
        this.operatorIdentifier = (String) Preconditions.checkNotNull(str);
        this.gConfiguration = (GConfiguration) Preconditions.checkNotNull(gConfiguration);
        this.tables = new HashMap();
        LOG.info("GeminiInternalStateBackend is created for operator {}, with backend UUID {}", str, gConfiguration.getBackendUID());
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    protected void closeImpl() {
        if (this.db != null) {
            this.db.close();
            this.db = null;
        }
        this.tables.clear();
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    protected StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    protected StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, V> KeyedValueState<K, V> createKeyedValueState(KeyedValueStateDescriptor<K, V> keyedValueStateDescriptor) throws Exception {
        String name = keyedValueStateDescriptor.getName();
        KeyedValueState<K, V> keyedValueState = (KeyedValueState) this.keyedStates.get(name);
        if (keyedValueState == null) {
            tryRegisterStateMetaInfo(keyedValueStateDescriptor);
            GTableOneKeyImpl gTableOneKeyImpl = (GTableOneKeyImpl) getOrCreateTable(new KVTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlinkImpl.of(getSafeSerializer(keyedValueStateDescriptor.getKeySerializer()), getSafeSerializer(keyedValueStateDescriptor.mo2744getValueSerializer()))));
            keyedValueState = new GeminiKeyedValueStateImpl(keyedValueStateDescriptor, gTableOneKeyImpl);
            this.keyedStates.put(name, keyedValueState);
            this.tables.put(name, gTableOneKeyImpl);
        }
        return keyedValueState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, E> KeyedListState<K, E> createKeyedListState(KeyedListStateDescriptor<K, E> keyedListStateDescriptor) throws Exception {
        String name = keyedListStateDescriptor.getName();
        KeyedListState<K, E> keyedListState = (KeyedListState) this.keyedStates.get(name);
        if (keyedListState == null) {
            tryRegisterStateMetaInfo(keyedListStateDescriptor);
            GTableKeyedListImpl gTableKeyedListImpl = (GTableKeyedListImpl) getOrCreateTable(new KListTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlinkListImpl.of(getSafeSerializer(keyedListStateDescriptor.getKeySerializer()), getSafeSerializer(keyedListStateDescriptor.getElementSerializer()))));
            keyedListState = new GeminiKeyedListStateImpl(keyedListStateDescriptor, gTableKeyedListImpl);
            this.keyedStates.put(name, keyedListState);
            this.tables.put(name, gTableKeyedListImpl);
        }
        return keyedListState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, MK, MV> KeyedMapState<K, MK, MV> createKeyedMapState(KeyedMapStateDescriptor<K, MK, MV> keyedMapStateDescriptor) throws Exception {
        String name = keyedMapStateDescriptor.getName();
        KeyedMapState<K, MK, MV> keyedMapState = (KeyedMapState) this.keyedStates.get(name);
        if (keyedMapState == null) {
            tryRegisterStateMetaInfo(keyedMapStateDescriptor);
            GTableKeyedMapImpl gTableKeyedMapImpl = (GTableKeyedMapImpl) getOrCreateTable(new KMapTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlink2KeyImpl.of(getSafeSerializer(keyedMapStateDescriptor.getKeySerializer()), getSafeSerializer(keyedMapStateDescriptor.getMapKeySerializer()), getSafeSerializer(keyedMapStateDescriptor.getMapValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable())));
            keyedMapState = new GeminiKeyedMapStateImpl(keyedMapStateDescriptor, gTableKeyedMapImpl);
            this.keyedStates.put(name, keyedMapState);
            this.tables.put(name, gTableKeyedMapImpl);
        }
        return keyedMapState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, MK, MV> KeyedSortedMapState<K, MK, MV> createKeyedSortedMapState(KeyedSortedMapStateDescriptor<K, MK, MV> keyedSortedMapStateDescriptor) throws Exception {
        String name = keyedSortedMapStateDescriptor.getName();
        KeyedSortedMapState<K, MK, MV> keyedSortedMapState = (KeyedSortedMapState) this.keyedStates.get(name);
        if (keyedSortedMapState == null) {
            tryRegisterStateMetaInfo(keyedSortedMapStateDescriptor);
            GTableKeyedSortedMapImpl gTableKeyedSortedMapImpl = (GTableKeyedSortedMapImpl) getOrCreateTable(new KSortedMapTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlink2KeyImpl.of(getSafeSerializer(keyedSortedMapStateDescriptor.getKeySerializer()), getSafeSerializer(keyedSortedMapStateDescriptor.getMapKeySerializer()), getSafeSerializer(keyedSortedMapStateDescriptor.getMapValueSerializer()), keyedSortedMapStateDescriptor.getMapKeyComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable())));
            keyedSortedMapState = new GeminiKeyedSortedMapStateImpl(keyedSortedMapStateDescriptor, gTableKeyedSortedMapImpl);
            this.keyedStates.put(name, keyedSortedMapState);
            this.tables.put(name, gTableKeyedSortedMapImpl);
        }
        return keyedSortedMapState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, V> SubKeyedValueState<K, N, V> createSubKeyedValueState(SubKeyedValueStateDescriptor<K, N, V> subKeyedValueStateDescriptor) throws Exception {
        String name = subKeyedValueStateDescriptor.getName();
        SubKeyedValueState<K, N, V> subKeyedValueState = (SubKeyedValueState) this.subKeyedStates.get(name);
        if (subKeyedValueState == null) {
            tryRegisterStateMetaInfo(subKeyedValueStateDescriptor);
            GTableSubKeyedValueImpl gTableSubKeyedValueImpl = (GTableSubKeyedValueImpl) getOrCreateTable(new SubKVTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlink2KeyImpl.of(getSafeSerializer(subKeyedValueStateDescriptor.getKeySerializer()), getSafeSerializer(subKeyedValueStateDescriptor.getNamespaceSerializer()), getSafeSerializer(subKeyedValueStateDescriptor.mo2747getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable())));
            subKeyedValueState = new GeminiSubKeyedValueStateImpl(subKeyedValueStateDescriptor, gTableSubKeyedValueImpl);
            this.subKeyedStates.put(name, subKeyedValueState);
            this.tables.put(name, gTableSubKeyedValueImpl);
        }
        return subKeyedValueState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, E> SubKeyedListState<K, N, E> createSubKeyedListState(SubKeyedListStateDescriptor<K, N, E> subKeyedListStateDescriptor) throws Exception {
        String name = subKeyedListStateDescriptor.getName();
        SubKeyedListState<K, N, E> subKeyedListState = (SubKeyedListState) this.subKeyedStates.get(name);
        if (subKeyedListState == null) {
            tryRegisterStateMetaInfo(subKeyedListStateDescriptor);
            GTableSubKeyedListImpl gTableSubKeyedListImpl = (GTableSubKeyedListImpl) getOrCreateTable(new SubKListTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlinkListImpl.of((TypeSerializer) new PKey2Serializer(getSafeSerializer(subKeyedListStateDescriptor.getKeySerializer()), getSafeSerializer(subKeyedListStateDescriptor.getNamespaceSerializer())), getSafeSerializer(subKeyedListStateDescriptor.getElementSerializer()))));
            subKeyedListState = new GeminiSubKeyedListStateImpl(subKeyedListStateDescriptor, gTableSubKeyedListImpl);
            this.subKeyedStates.put(name, subKeyedListState);
            this.tables.put(name, gTableSubKeyedListImpl);
        }
        return subKeyedListState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, MK, MV> SubKeyedMapState<K, N, MK, MV> createSubKeyedMapState(SubKeyedMapStateDescriptor<K, N, MK, MV> subKeyedMapStateDescriptor) throws Exception {
        String name = subKeyedMapStateDescriptor.getName();
        SubKeyedMapState<K, N, MK, MV> subKeyedMapState = (SubKeyedMapState) this.subKeyedStates.get(name);
        if (subKeyedMapState == null) {
            tryRegisterStateMetaInfo(subKeyedMapStateDescriptor);
            TypeSerializer<K> keySerializer = subKeyedMapStateDescriptor.getKeySerializer();
            TypeSerializer<N> namespaceSerializer = subKeyedMapStateDescriptor.getNamespaceSerializer();
            MapSerializer<MK, MV> mo2747getValueSerializer = subKeyedMapStateDescriptor.mo2747getValueSerializer();
            GTableSubKeyedMapImpl gTableSubKeyedMapImpl = (GTableSubKeyedMapImpl) getOrCreateTable(new SubKMapTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlink2KeyImpl.of(new PKey2Serializer(getSafeSerializer(keySerializer), getSafeSerializer(namespaceSerializer)), getSafeSerializer(mo2747getValueSerializer.getKeySerializer()), getSafeSerializer(mo2747getValueSerializer.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable())));
            subKeyedMapState = new GeminiSubKeyedMapStateImpl(subKeyedMapStateDescriptor, gTableSubKeyedMapImpl);
            this.subKeyedStates.put(name, subKeyedMapState);
            this.tables.put(name, gTableSubKeyedMapImpl);
        }
        return subKeyedMapState;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, MK, MV> SubKeyedSortedMapState<K, N, MK, MV> createSubKeyedSortedMapState(SubKeyedSortedMapStateDescriptor<K, N, MK, MV> subKeyedSortedMapStateDescriptor) throws Exception {
        String name = subKeyedSortedMapStateDescriptor.getName();
        SubKeyedSortedMapState<K, N, MK, MV> subKeyedSortedMapState = (SubKeyedSortedMapState) this.subKeyedStates.get(name);
        if (subKeyedSortedMapState == null) {
            tryRegisterStateMetaInfo(subKeyedSortedMapStateDescriptor);
            GTableSubKeyedSortedMapImpl gTableSubKeyedSortedMapImpl = (GTableSubKeyedSortedMapImpl) getOrCreateTable(new SubKSortedMapTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlink2KeyImpl.of(new PKey2Serializer(getSafeSerializer(subKeyedSortedMapStateDescriptor.getKeySerializer()), getSafeSerializer(subKeyedSortedMapStateDescriptor.getNamespaceSerializer())), getSafeSerializer(subKeyedSortedMapStateDescriptor.getMapKeySerializer()), getSafeSerializer(subKeyedSortedMapStateDescriptor.getMapValueSerializer()), subKeyedSortedMapStateDescriptor.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable())));
            subKeyedSortedMapState = new GeminiSubKeyedSortedMapStateImpl(subKeyedSortedMapStateDescriptor, gTableSubKeyedSortedMapImpl);
            this.subKeyedStates.put(name, subKeyedSortedMapState);
            this.tables.put(name, gTableSubKeyedSortedMapImpl);
        }
        return subKeyedSortedMapState;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(final long j, long j2, final CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
        if (this.registeredStateMetaInfos.isEmpty()) {
            LOG.info("Snapshot done with empty states for {}/{}.", Long.valueOf(j), Long.valueOf(j2));
            return DoneFuture.of(SnapshotResult.empty());
        }
        LOG.info("Start to snapshot for {}/{}.", Long.valueOf(j), Long.valueOf(j2));
        long currentTimeMillis = System.currentTimeMillis();
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        Iterator<Map.Entry<String, RegisteredStateMetaInfo>> it = this.registeredStateMetaInfos.entrySet().iterator();
        while (it.hasNext()) {
            RegisteredStateMetaInfo value = it.next().getValue();
            if (value.getStateType().isKeyedState()) {
                arrayList.add(value.snapshot());
            } else {
                arrayList2.add(value.snapshot());
            }
        }
        this.db.startSnapshot(new BackendSnapshotMeta(j, j2, getLocalSnapshotDirectory(j)));
        final SupplierWithException supplierWithException = this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider());
        } : () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        };
        AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { // from class: org.apache.flink.runtime.state.gemini.GeminiInternalStateBackend.1
            CheckpointStreamWithResultProvider streamAndResultExtractor = null;

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void acquireResources() throws Exception {
                this.streamAndResultExtractor = (CheckpointStreamWithResultProvider) supplierWithException.get();
                GeminiInternalStateBackend.this.cancelStreamRegistry.registerCloseable(this.streamAndResultExtractor);
            }

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void releaseResources() {
                unregisterAndCloseStreamAndResultExtractor();
            }

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void stopOperation() {
                unregisterAndCloseStreamAndResultExtractor();
            }

            private void unregisterAndCloseStreamAndResultExtractor() {
                if (GeminiInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                    IOUtils.closeQuietly(this.streamAndResultExtractor);
                    this.streamAndResultExtractor = null;
                }
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            @Nonnull
            public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                long currentTimeMillis2 = System.currentTimeMillis();
                new InternalBackendSerializationProxy(arrayList, arrayList2, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, GeminiInternalStateBackend.this.keyGroupCompressionDecorator)).write(new DataOutputViewStreamWrapper(this.streamAndResultExtractor.getCheckpointOutputStream()));
                DBSnapshotResult dBSnapshotResult = GeminiInternalStateBackend.this.db.getSnapshotResult(j).get();
                if (!GeminiInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                    throw new IOException("Stream already closed and cannot return a handle.");
                }
                SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                this.streamAndResultExtractor = null;
                GeminiKeyedStateHandle geminiKeyedStateHandle = new GeminiKeyedStateHandle(j, GeminiInternalStateBackend.this.getKeyGroupRange(), closeAndFinalizeCheckpointStreamResult.getJobManagerOwnedSnapshot(), GeminiInternalStateBackend.this.convertDirectoryStateHandleToStreamHandle(dBSnapshotResult.getDfsSnapshotDirectory().completeSnapshotAndGetHandle()), dBSnapshotResult.getDfsSnapshotMeta());
                GeminiInternalStateBackend.LOG.info("Gemini backend snapshot (" + checkpointStreamFactory + ", asynchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis2) + " ms.");
                StreamStateHandle taskLocalSnapshot = closeAndFinalizeCheckpointStreamResult.getTaskLocalSnapshot();
                if (taskLocalSnapshot != null) {
                    return SnapshotResult.withLocalState(geminiKeyedStateHandle, new GeminiLocalKeyedStateHandle(j, GeminiInternalStateBackend.this.getKeyGroupRange(), taskLocalSnapshot, GeminiInternalStateBackend.this.convertDirectoryStateHandleToStreamHandle(dBSnapshotResult.getLocalSnapshotDirectory().completeSnapshotAndGetHandle()), dBSnapshotResult.getLocalSnapshotMeta()));
                }
                return SnapshotResult.of(geminiKeyedStateHandle);
            }
        });
        LOG.info("Gemini backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        return from;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        this.db = new GeminiDB("GeminiDB_" + this.operatorIdentifier, this.gConfiguration, getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getEndKeyGroup(), this.dbMetricGroup);
        if (collection != null && !collection.isEmpty()) {
            LOG.info("Initializing gemini internal state backend from snapshots {}.", collection);
            ArrayList arrayList = new ArrayList();
            HashMap hashMap = new HashMap();
            for (KeyedStateHandle keyedStateHandle : collection) {
                Preconditions.checkState(keyedStateHandle instanceof AbstractGeminiKeyedStateHandle);
                AbstractGeminiKeyedStateHandle abstractGeminiKeyedStateHandle = (AbstractGeminiKeyedStateHandle) keyedStateHandle;
                StreamStateHandle metaStateHandle = abstractGeminiKeyedStateHandle.getMetaStateHandle();
                if (metaStateHandle != null) {
                    FSDataInputStream openInputStream = metaStateHandle.openInputStream();
                    this.cancelStreamRegistry.registerCloseable(openInputStream);
                    try {
                        DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                        InternalBackendSerializationProxy internalBackendSerializationProxy = new InternalBackendSerializationProxy(getUserClassLoader(), true);
                        internalBackendSerializationProxy.read(dataInputViewStreamWrapper);
                        List<StateMetaInfoSnapshot> keyedStateMetaSnapshots = internalBackendSerializationProxy.getKeyedStateMetaSnapshots();
                        for (int i = 0; i < keyedStateMetaSnapshots.size(); i++) {
                            StateMetaInfoSnapshot stateMetaInfoSnapshot = keyedStateMetaSnapshots.get(i);
                            String name = stateMetaInfoSnapshot.getName();
                            this.restoredKvStateMetaInfos.put(name, stateMetaInfoSnapshot);
                            RegisteredStateMetaInfo createKeyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo(stateMetaInfoSnapshot);
                            this.registeredStateMetaInfos.put(name, createKeyedStateMetaInfo);
                            GTableDescription createGTableDescription = createGTableDescription(createKeyedStateMetaInfo);
                            hashMap.put(createGTableDescription.getTableName(), getOrCreateTable(createGTableDescription));
                        }
                        List<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = internalBackendSerializationProxy.getSubKeyedStateMetaSnapshots();
                        for (int i2 = 0; i2 < subKeyedStateMetaSnapshots.size(); i2++) {
                            StateMetaInfoSnapshot stateMetaInfoSnapshot2 = subKeyedStateMetaSnapshots.get(i2);
                            String name2 = stateMetaInfoSnapshot2.getName();
                            RegisteredStateMetaInfo createSubKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(stateMetaInfoSnapshot2);
                            this.registeredStateMetaInfos.put(name2, createSubKeyedStateMetaInfo);
                            this.restoredKvStateMetaInfos.put(name2, stateMetaInfoSnapshot2);
                            GTableDescription createGTableDescription2 = createGTableDescription(createSubKeyedStateMetaInfo);
                            hashMap.put(createGTableDescription2.getTableName(), getOrCreateTable(createGTableDescription2));
                        }
                        arrayList.add(abstractGeminiKeyedStateHandle.getDBSnapshotMeta());
                        if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                            IOUtils.closeQuietly(openInputStream);
                        }
                    } catch (Throwable th) {
                        if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                            IOUtils.closeQuietly(openInputStream);
                        }
                        throw th;
                    }
                }
            }
            this.db.restoreFromSnapshot(arrayList, hashMap, getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getEndKeyGroup());
        }
        try {
            this.db.open();
        } catch (Exception e) {
            LOG.error("Failed to open GeminiDB, {}", e);
            throw e;
        }
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend, org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotComplete(j);
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointAbort(long j) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotAbort(j);
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointSubsume(long j) {
        this.db.getGContext().getSupervisor().getSnapshotManager().notifySnapshotSubsume(j);
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    public int numStateEntries() {
        return 0;
    }

    private String getTableName(String str) {
        return str;
    }

    private GTableDescription createGTableDescription(RegisteredStateMetaInfo registeredStateMetaInfo) {
        InternalStateType stateType = registeredStateMetaInfo.getStateType();
        String name = registeredStateMetaInfo.getName();
        String tableName = getTableName(name);
        int startKeyGroup = getKeyGroupRange().getStartKeyGroup();
        int numberOfKeyGroups = getKeyGroupRange().getNumberOfKeyGroups();
        int numGroups = getNumGroups();
        switch (stateType) {
            case KEYED_VALUE:
                return new KVTableDescription(tableName, startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlinkImpl.of(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getValueSerializer())));
            case KEYED_MAP:
                MapSerializer valueSerializer = registeredStateMetaInfo.getValueSerializer();
                return new KMapTableDescription(tableName, startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlink2KeyImpl.of(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(valueSerializer.getKeySerializer()), getSafeSerializer(valueSerializer.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            case KEYED_LIST:
                return new KListTableDescription(tableName, startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlinkImpl.of(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getValueSerializer().getElementSerializer())));
            case KEYED_SORTEDMAP:
                SortedMapSerializer valueSerializer2 = registeredStateMetaInfo.getValueSerializer();
                return new KSortedMapTableDescription(tableName, startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlink2KeyImpl.of(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(valueSerializer2.getKeySerializer()), getSafeSerializer(valueSerializer2.getValueSerializer()), valueSerializer2.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable()));
            case SUBKEYED_LIST:
                return new SubKListTableDescription(getTableName(name), getKeyGroupRange().getStartKeyGroup(), getKeyGroupRange().getNumberOfKeyGroups(), getNumGroups(), PageSerdeFlinkListImpl.of((TypeSerializer) new PKey2Serializer(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getNamespaceSerializer())), getSafeSerializer(registeredStateMetaInfo.getValueSerializer().getElementSerializer())));
            case SUBKEYED_VALUE:
                return new SubKVTableDescription(getTableName(name), startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlink2KeyImpl.of(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getNamespaceSerializer()), getSafeSerializer(registeredStateMetaInfo.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            case SUBKEYED_MAP:
                MapSerializer valueSerializer3 = registeredStateMetaInfo.getValueSerializer();
                return new SubKMapTableDescription(getTableName(name), startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlink2KeyImpl.of(new PKey2Serializer(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getNamespaceSerializer())), getSafeSerializer(valueSerializer3.getKeySerializer()), getSafeSerializer(valueSerializer3.getValueSerializer()), null, null, this.db.getConfiguration().isChecksumEnable()));
            case SUBKEYED_SORTEDMAP:
                SortedMapSerializer valueSerializer4 = registeredStateMetaInfo.getValueSerializer();
                return new SubKSortedMapTableDescription(getTableName(name), startKeyGroup, numberOfKeyGroups, numGroups, PageSerdeFlink2KeyImpl.of(new PKey2Serializer(getSafeSerializer(registeredStateMetaInfo.getKeySerializer()), getSafeSerializer(registeredStateMetaInfo.getNamespaceSerializer())), getSafeSerializer(valueSerializer4.getKeySerializer()), getSafeSerializer(valueSerializer4.getValueSerializer()), valueSerializer4.getComparator(), this.db.getConfiguration().getComparatorType(), this.db.getConfiguration().isChecksumEnable()));
            default:
                throw new RuntimeException("Unknown internal type");
        }
    }

    private GTable getOrCreateTable(GTableDescription gTableDescription) {
        String tableName = gTableDescription.getTableName();
        GTable gTable = this.tables.get(tableName);
        if (gTable == null) {
            gTable = this.db.getTableOrCreate(gTableDescription);
            this.tables.put(tableName, gTable);
        }
        return gTable;
    }

    private static <T> TypeSerializer<T> getSafeSerializer(TypeSerializer<T> typeSerializer) {
        return new ThreadLocalTypeSerializer(typeSerializer);
    }

    @Nullable
    private SnapshotDirectory getLocalSnapshotDirectory(long j) throws IOException {
        if (!this.localRecoveryConfig.isLocalRecoveryEnabled()) {
            return null;
        }
        File file = new File(this.localRecoveryConfig.getLocalStateDirectoryProvider().subtaskSpecificCheckpointDirectory(j), UUID.randomUUID().toString().replace(SnapshotManagerImpl.SNAPSHOT_FILE_SEPERATOR, ""));
        if (file.exists()) {
            FileUtils.deleteDirectory(file);
        }
        if (file.mkdirs()) {
            return SnapshotDirectory.permanent(new Path(new File(file, "geminiDB").toURI()));
        }
        throw new IOException("Local state base directory for checkpoint " + j + " already exists: " + file);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DirectoryStreamStateHandle convertDirectoryStateHandleToStreamHandle(DirectoryStateHandle directoryStateHandle) {
        return new DirectoryStreamStateHandle(directoryStateHandle.getDirectory());
    }
}
