package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedListStateImpl;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedMapStateImpl;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapState;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.keyed.KeyedStateBinder;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueStateImpl;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateImpl;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateImpl;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateImpl;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateImpl;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;

/* loaded from: input_file:org/apache/flink/runtime/state/AbstractInternalStateBackend.class */
public abstract class AbstractInternalStateBackend implements InternalStateBackend, Closeable, KeyedStateBinder, SubKeyedStateBinder, CheckpointListener {
    private int numberOfGroups;
    private KeyGroupRange keyGroupRange;
    private ClassLoader userClassLoader;
    protected TaskKvStateRegistry kvStateRegistry;
    protected final StreamCompressionDecorator keyGroupCompressionDecorator;
    protected CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
    protected final Map<String, StateMetaInfoSnapshot> restoredKvStateMetaInfos = new HashMap();
    protected final Map<String, StateStorage> stateStorages = new HashMap();
    protected final transient Map<String, KeyedState> keyedStates = new HashMap();
    protected final transient Map<String, SubKeyedState> subKeyedStates = new HashMap();
    protected final Map<String, RegisteredStateMetaInfo> registeredStateMetaInfos = new HashMap();

    protected void closeImpl() {
    }

    protected abstract StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo);

    protected abstract StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo);

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractInternalStateBackend(int i, KeyGroupRange keyGroupRange, ClassLoader classLoader, TaskKvStateRegistry taskKvStateRegistry, ExecutionConfig executionConfig) {
        this.numberOfGroups = i;
        this.keyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        this.userClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.kvStateRegistry = taskKvStateRegistry;
        this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
    }

    private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
        return (executionConfig == null || !executionConfig.isUseSnapshotCompression()) ? UncompressedStreamCompressionDecorator.INSTANCE : SnappyStreamCompressionDecorator.INSTANCE;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public int getNumGroups() {
        return this.numberOfGroups;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    public Map<String, StateMetaInfoSnapshot> getRestoredKvStateMetaInfos() {
        return this.restoredKvStateMetaInfos;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public Map<String, KeyedState> getKeyedStates() {
        return this.keyedStates;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public Map<String, SubKeyedState> getSubKeyedStates() {
        return this.subKeyedStates;
    }

    public Map<String, RegisteredStateMetaInfo> getRegisteredStateMetaInfos() {
        return this.registeredStateMetaInfos;
    }

    public TaskKvStateRegistry getKvStateRegistry() {
        return this.kvStateRegistry;
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
    }

    @VisibleForTesting
    public abstract int numStateEntries();

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public void dispose() {
        closeImpl();
        IOUtils.closeQuietly(this.cancelStreamRegistry);
        this.keyedStates.clear();
        this.subKeyedStates.clear();
        this.registeredStateMetaInfos.clear();
        this.stateStorages.clear();
        this.restoredKvStateMetaInfos.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public <K, V, S extends KeyedState<K, V>> S getKeyedState(KeyedStateDescriptor<K, V, S> keyedStateDescriptor) throws Exception {
        Preconditions.checkNotNull(keyedStateDescriptor);
        return keyedStateDescriptor.bind(this);
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public <K, N, V, S extends SubKeyedState<K, N, V>> S getSubKeyedState(SubKeyedStateDescriptor<K, N, V, S> subKeyedStateDescriptor) throws Exception {
        Preconditions.checkNotNull(subKeyedStateDescriptor);
        return subKeyedStateDescriptor.bind(this);
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, V> KeyedValueState<K, V> createKeyedValueState(KeyedValueStateDescriptor<K, V> keyedValueStateDescriptor) throws Exception {
        String name = keyedValueStateDescriptor.getName();
        KeyedValueState<K, V> keyedValueState = (KeyedValueState) this.keyedStates.get(name);
        if (keyedValueState == null) {
            keyedValueState = new KeyedValueStateImpl(this, keyedValueStateDescriptor, getOrCreateStateStorageForKeyedState(tryRegisterStateMetaInfo(keyedValueStateDescriptor)));
            this.keyedStates.put(name, keyedValueState);
        }
        return keyedValueState;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, E> KeyedListState<K, E> createKeyedListState(KeyedListStateDescriptor<K, E> keyedListStateDescriptor) throws Exception {
        String name = keyedListStateDescriptor.getName();
        KeyedListState<K, E> keyedListState = (KeyedListState) this.keyedStates.get(name);
        if (keyedListState == null) {
            keyedListState = new KeyedListStateImpl(this, keyedListStateDescriptor, getOrCreateStateStorageForKeyedState(tryRegisterStateMetaInfo(keyedListStateDescriptor)));
            this.keyedStates.put(name, keyedListState);
        }
        return keyedListState;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, MK, MV> KeyedMapState<K, MK, MV> createKeyedMapState(KeyedMapStateDescriptor<K, MK, MV> keyedMapStateDescriptor) throws Exception {
        String name = keyedMapStateDescriptor.getName();
        KeyedMapState<K, MK, MV> keyedMapState = (KeyedMapState) this.keyedStates.get(name);
        if (keyedMapState == null) {
            keyedMapState = new KeyedMapStateImpl(this, keyedMapStateDescriptor, getOrCreateStateStorageForKeyedState(tryRegisterStateMetaInfo(keyedMapStateDescriptor)));
            this.keyedStates.put(name, keyedMapState);
        }
        return keyedMapState;
    }

    @Override // org.apache.flink.runtime.state.keyed.KeyedStateBinder
    public <K, MK, MV> KeyedSortedMapState<K, MK, MV> createKeyedSortedMapState(KeyedSortedMapStateDescriptor<K, MK, MV> keyedSortedMapStateDescriptor) throws Exception {
        String name = keyedSortedMapStateDescriptor.getName();
        KeyedSortedMapState<K, MK, MV> keyedSortedMapState = (KeyedSortedMapState) this.keyedStates.get(name);
        if (keyedSortedMapState == null) {
            keyedSortedMapState = new KeyedSortedMapStateImpl(this, keyedSortedMapStateDescriptor, getOrCreateStateStorageForKeyedState(tryRegisterStateMetaInfo(keyedSortedMapStateDescriptor)));
            this.keyedStates.put(name, keyedSortedMapState);
        }
        return keyedSortedMapState;
    }

    @Override // org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, V> SubKeyedValueState<K, N, V> createSubKeyedValueState(SubKeyedValueStateDescriptor<K, N, V> subKeyedValueStateDescriptor) throws Exception {
        String name = subKeyedValueStateDescriptor.getName();
        SubKeyedValueState<K, N, V> subKeyedValueState = (SubKeyedValueState) this.subKeyedStates.get(name);
        if (subKeyedValueState == null) {
            subKeyedValueState = new SubKeyedValueStateImpl(this, subKeyedValueStateDescriptor, getOrCreateStateStorageForSubKeyedState(tryRegisterStateMetaInfo(subKeyedValueStateDescriptor)));
            this.subKeyedStates.put(name, subKeyedValueState);
        }
        return subKeyedValueState;
    }

    @Override // org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, E> SubKeyedListState<K, N, E> createSubKeyedListState(SubKeyedListStateDescriptor<K, N, E> subKeyedListStateDescriptor) throws Exception {
        String name = subKeyedListStateDescriptor.getName();
        SubKeyedListState<K, N, E> subKeyedListState = (SubKeyedListState) this.subKeyedStates.get(name);
        if (subKeyedListState == null) {
            subKeyedListState = new SubKeyedListStateImpl(this, subKeyedListStateDescriptor, getOrCreateStateStorageForSubKeyedState(tryRegisterStateMetaInfo(subKeyedListStateDescriptor)));
            this.subKeyedStates.put(name, subKeyedListState);
        }
        return subKeyedListState;
    }

    @Override // org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, MK, MV> SubKeyedMapState<K, N, MK, MV> createSubKeyedMapState(SubKeyedMapStateDescriptor<K, N, MK, MV> subKeyedMapStateDescriptor) throws Exception {
        String name = subKeyedMapStateDescriptor.getName();
        SubKeyedMapState<K, N, MK, MV> subKeyedMapState = (SubKeyedMapState) this.subKeyedStates.get(name);
        if (subKeyedMapState == null) {
            subKeyedMapState = new SubKeyedMapStateImpl(this, subKeyedMapStateDescriptor, getOrCreateStateStorageForSubKeyedState(tryRegisterStateMetaInfo(subKeyedMapStateDescriptor)));
            this.subKeyedStates.put(name, subKeyedMapState);
        }
        return subKeyedMapState;
    }

    @Override // org.apache.flink.runtime.state.subkeyed.SubKeyedStateBinder
    public <K, N, MK, MV> SubKeyedSortedMapState<K, N, MK, MV> createSubKeyedSortedMapState(SubKeyedSortedMapStateDescriptor<K, N, MK, MV> subKeyedSortedMapStateDescriptor) throws Exception {
        String name = subKeyedSortedMapStateDescriptor.getName();
        SubKeyedSortedMapState<K, N, MK, MV> subKeyedSortedMapState = (SubKeyedSortedMapState) this.subKeyedStates.get(name);
        if (subKeyedSortedMapState == null) {
            subKeyedSortedMapState = new SubKeyedSortedMapStateImpl(this, subKeyedSortedMapStateDescriptor, getOrCreateStateStorageForSubKeyedState(tryRegisterStateMetaInfo(subKeyedSortedMapStateDescriptor)));
            this.subKeyedStates.put(name, subKeyedSortedMapState);
        }
        return subKeyedSortedMapState;
    }

    private RegisteredStateMetaInfo tryRegisterStateMetaInfo(KeyedStateDescriptor keyedStateDescriptor) throws StateMigrationException {
        RegisteredStateMetaInfo createKeyedStateMetaInfo;
        Preconditions.checkNotNull(keyedStateDescriptor);
        String name = keyedStateDescriptor.getName();
        if (this.registeredStateMetaInfos.get(name) != null) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(name);
            Preconditions.checkState(stateMetaInfoSnapshot != null, "Requested to check compatibility of a restored StateMetaInfoSnapshot, but its corresponding restored snapshot cannot be found.");
            createKeyedStateMetaInfo = RegisteredStateMetaInfo.resolveStateCompatibility(stateMetaInfoSnapshot, keyedStateDescriptor);
        } else {
            createKeyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo(keyedStateDescriptor.getStateType(), name, keyedStateDescriptor.getKeySerializer(), keyedStateDescriptor.mo2665getValueSerializer());
        }
        keyedStateDescriptor.setKeySerializer(createKeyedStateMetaInfo.getKeySerializer());
        keyedStateDescriptor.setValueSerializer(createKeyedStateMetaInfo.getValueSerializer());
        this.registeredStateMetaInfos.put(name, createKeyedStateMetaInfo);
        return createKeyedStateMetaInfo;
    }

    private RegisteredStateMetaInfo tryRegisterStateMetaInfo(SubKeyedStateDescriptor subKeyedStateDescriptor) throws StateMigrationException {
        RegisteredStateMetaInfo createSubKeyedStateMetaInfo;
        Preconditions.checkNotNull(subKeyedStateDescriptor);
        String name = subKeyedStateDescriptor.getName();
        if (this.registeredStateMetaInfos.get(name) != null) {
            StateMetaInfoSnapshot stateMetaInfoSnapshot = this.restoredKvStateMetaInfos.get(name);
            Preconditions.checkState(stateMetaInfoSnapshot != null, "Requested to check compatibility of a restored StateMetaInfoSnapshot, but its corresponding restored snapshot cannot be found.");
            createSubKeyedStateMetaInfo = RegisteredStateMetaInfo.resolveStateCompatibility(stateMetaInfoSnapshot, subKeyedStateDescriptor);
        } else {
            createSubKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(subKeyedStateDescriptor.getStateType(), name, subKeyedStateDescriptor.getKeySerializer(), subKeyedStateDescriptor.mo2668getValueSerializer(), subKeyedStateDescriptor.getNamespaceSerializer());
        }
        this.registeredStateMetaInfos.put(name, createSubKeyedStateMetaInfo);
        return createSubKeyedStateMetaInfo;
    }

    @Override // org.apache.flink.runtime.state.InternalStateBackend
    public Map<String, StateStorage> getStateStorages() {
        return this.stateStorages;
    }

    public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
        return this.keyGroupCompressionDecorator;
    }
}
