package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapInternalStateBackend.class */
public class HeapInternalStateBackend extends AbstractInternalStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(HeapInternalStateBackend.class);
    private final LocalRecoveryConfig localRecoveryConfig;
    private final boolean asynchronousSnapshot;
    private final PriorityQueueSetFactory priorityQueueSetFactory;

    public HeapInternalStateBackend(int i, KeyGroupRange keyGroupRange, ClassLoader classLoader, LocalRecoveryConfig localRecoveryConfig, PriorityQueueSetFactory priorityQueueSetFactory, TaskKvStateRegistry taskKvStateRegistry, boolean z, ExecutionConfig executionConfig) {
        super(i, keyGroupRange, classLoader, taskKvStateRegistry, executionConfig);
        this.localRecoveryConfig = (LocalRecoveryConfig) Preconditions.checkNotNull(localRecoveryConfig);
        this.asynchronousSnapshot = z;
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        LOG.info("HeapInternalStateBackend is created with {} mode.", z ? "async" : "sync");
    }

    @Override // org.apache.flink.runtime.state.PriorityQueueSetFactory
    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        return this.priorityQueueSetFactory.create(str, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    public void closeImpl() {
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    protected StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo) {
        HeapStateStorage heapStateStorage = (HeapStateStorage) this.stateStorages.get(registeredStateMetaInfo.getName());
        if (heapStateStorage == null) {
            heapStateStorage = new HeapStateStorage(this, registeredStateMetaInfo, VoidNamespace.INSTANCE, false, this.asynchronousSnapshot);
            this.stateStorages.put(registeredStateMetaInfo.getName(), heapStateStorage);
        }
        heapStateStorage.setStateMetaInfo(registeredStateMetaInfo);
        return heapStateStorage;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    protected StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo registeredStateMetaInfo) {
        HeapStateStorage heapStateStorage = (HeapStateStorage) this.stateStorages.get(registeredStateMetaInfo.getName());
        if (heapStateStorage == null) {
            heapStateStorage = new HeapStateStorage(this, registeredStateMetaInfo, null, true, this.asynchronousSnapshot);
            this.stateStorages.put(registeredStateMetaInfo.getName(), heapStateStorage);
        }
        heapStateStorage.setStateMetaInfo(registeredStateMetaInfo);
        return heapStateStorage;
    }

    @Override // org.apache.flink.runtime.state.AbstractInternalStateBackend
    public int numStateEntries() {
        int i = 0;
        List list = (List) getKeyedStates().values().stream().map((v0) -> {
            return v0.getStateStorage();
        }).collect(Collectors.toList());
        list.addAll((Collection) getSubKeyedStates().values().stream().map((v0) -> {
            return v0.getStateStorage();
        }).collect(Collectors.toList()));
        Iterator it = list.iterator();
        while (it.hasNext()) {
            i += ((HeapStateStorage) it.next()).getStateTable().size();
        }
        return i;
    }

    @VisibleForTesting
    public int numStateEntries(Object obj) {
        int i = 0;
        Iterator<SubKeyedState> it = getSubKeyedStates().values().iterator();
        while (it.hasNext()) {
            i += ((HeapStateStorage) it.next().getStateStorage()).getStateTable().sizeOfNamespace(obj);
        }
        return i;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, final CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) {
        if (this.registeredStateMetaInfos.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        long currentTimeMillis = System.currentTimeMillis();
        final ArrayList arrayList = new ArrayList();
        final ArrayList arrayList2 = new ArrayList();
        final HashMap hashMap = new HashMap();
        final HashMap hashMap2 = new HashMap();
        final HashMap hashMap3 = new HashMap(this.keyedStates.size());
        final HashMap hashMap4 = new HashMap(this.subKeyedStates.size());
        for (Map.Entry<String, RegisteredStateMetaInfo> entry : this.registeredStateMetaInfos.entrySet()) {
            String key = entry.getKey();
            org.apache.flink.runtime.state.heap.internal.StateTable stateTable = ((HeapStateStorage) this.stateStorages.get(key)).getStateTable();
            RegisteredStateMetaInfo value = entry.getValue();
            if (value.getStateType().isKeyedState()) {
                arrayList.add(value.snapshot());
                hashMap.put(key, Integer.valueOf(hashMap.size()));
                hashMap3.put(key, stateTable.createSnapshot());
            } else {
                arrayList2.add(value.snapshot());
                hashMap2.put(key, Integer.valueOf(hashMap2.size()));
                hashMap4.put(key, stateTable.createSnapshot());
            }
        }
        final SupplierWithException supplierWithException = this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> {
            return CheckpointStreamWithResultProvider.createDuplicatingStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider());
        } : () -> {
            return CheckpointStreamWithResultProvider.createSimpleStream(j, CheckpointedStateScope.EXCLUSIVE, checkpointStreamFactory);
        };
        AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>() { // from class: org.apache.flink.runtime.state.heap.HeapInternalStateBackend.1
            CheckpointStreamWithResultProvider streamAndResultExtractor = null;

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void acquireResources() throws Exception {
                this.streamAndResultExtractor = (CheckpointStreamWithResultProvider) supplierWithException.get();
                HeapInternalStateBackend.this.cancelStreamRegistry.registerCloseable(this.streamAndResultExtractor);
            }

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void releaseResources() {
                unregisterAndCloseStreamAndResultExtractor();
                Iterator it = hashMap3.values().iterator();
                while (it.hasNext()) {
                    ((StateSnapshot) it.next()).release();
                }
                Iterator it2 = hashMap4.values().iterator();
                while (it2.hasNext()) {
                    ((StateSnapshot) it2.next()).release();
                }
            }

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            protected void stopOperation() {
                unregisterAndCloseStreamAndResultExtractor();
            }

            private void unregisterAndCloseStreamAndResultExtractor() {
                if (HeapInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                    IOUtils.closeQuietly(this.streamAndResultExtractor);
                    this.streamAndResultExtractor = null;
                }
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Type inference failed for: r0v3, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
            @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
            @Nonnull
            public SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                long currentTimeMillis2 = System.currentTimeMillis();
                ?? checkpointOutputStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream) checkpointOutputStream);
                new InternalBackendSerializationProxy(arrayList, arrayList2, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapInternalStateBackend.this.keyGroupCompressionDecorator)).write(dataOutputViewStreamWrapper);
                HashMap hashMap5 = new HashMap();
                KeyGroupRange keyGroupRange = HeapInternalStateBackend.this.getKeyGroupRange();
                Iterator<Integer> it = keyGroupRange.iterator();
                while (it.hasNext()) {
                    int intValue = it.next().intValue();
                    long pos = checkpointOutputStream.getPos();
                    int i = 0;
                    dataOutputViewStreamWrapper.writeInt(intValue);
                    for (Map.Entry entry2 : hashMap3.entrySet()) {
                        i += HeapInternalStateBackend.this.writeGroupStates(checkpointOutputStream, (StateSnapshot) entry2.getValue(), ((Integer) hashMap.get(entry2.getKey())).intValue(), intValue);
                    }
                    for (Map.Entry entry3 : hashMap4.entrySet()) {
                        i += HeapInternalStateBackend.this.writeGroupStates(checkpointOutputStream, (StateSnapshot) entry3.getValue(), ((Integer) hashMap2.get(entry3.getKey())).intValue(), intValue);
                    }
                    if (i != 0) {
                        hashMap5.put(Integer.valueOf(intValue), new Tuple2(Long.valueOf(pos), Integer.valueOf(i)));
                    }
                }
                if (!HeapInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable(this.streamAndResultExtractor)) {
                    throw new IOException("Stream already closed and cannot return a handle.");
                }
                SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                this.streamAndResultExtractor = null;
                KeyGroupsStateSnapshot keyGroupsStateSnapshot = new KeyGroupsStateSnapshot(keyGroupRange, hashMap5, closeAndFinalizeCheckpointStreamResult.getJobManagerOwnedSnapshot());
                HeapInternalStateBackend.LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", asynchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis2) + " ms.");
                StreamStateHandle taskLocalSnapshot = closeAndFinalizeCheckpointStreamResult.getTaskLocalSnapshot();
                return taskLocalSnapshot != null ? SnapshotResult.withLocalState(keyGroupsStateSnapshot, new KeyGroupsStateSnapshot(keyGroupRange, hashMap5, taskLocalSnapshot)) : SnapshotResult.of(keyGroupsStateSnapshot);
            }
        });
        if (!this.asynchronousSnapshot) {
            from.run();
        }
        LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        return from;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public int writeGroupStates(CheckpointStreamFactory.CheckpointStateOutputStream checkpointStateOutputStream, StateSnapshot stateSnapshot, int i, int i2) throws IOException {
        OutputStream decorateWithCompression = this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream) checkpointStateOutputStream);
        Throwable th = null;
        try {
            try {
                DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(decorateWithCompression);
                dataOutputViewStreamWrapper.writeInt(i);
                int writeMappingsInKeyGroup = 0 + stateSnapshot.partitionByKeyGroup().writeMappingsInKeyGroup(dataOutputViewStreamWrapper, i2);
                if (decorateWithCompression != null) {
                    if (0 != 0) {
                        try {
                            decorateWithCompression.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        decorateWithCompression.close();
                    }
                }
                return writeMappingsInKeyGroup;
            } finally {
            }
        } catch (Throwable th3) {
            if (decorateWithCompression != null) {
                if (th != null) {
                    try {
                        decorateWithCompression.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    decorateWithCompression.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap internal state backend from snapshot.");
        for (KeyedStateHandle keyedStateHandle : collection) {
            if (keyedStateHandle != null) {
                Preconditions.checkState(keyedStateHandle instanceof KeyGroupsStateSnapshot);
                KeyGroupsStateSnapshot keyGroupsStateSnapshot = (KeyGroupsStateSnapshot) keyedStateHandle;
                StreamStateHandle snapshotHandle = keyGroupsStateSnapshot.getSnapshotHandle();
                if (snapshotHandle == null) {
                    continue;
                } else {
                    InputStream openInputStream = snapshotHandle.openInputStream();
                    this.cancelStreamRegistry.registerCloseable(openInputStream);
                    try {
                        DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                        InternalBackendSerializationProxy internalBackendSerializationProxy = new InternalBackendSerializationProxy(getUserClassLoader(), true);
                        internalBackendSerializationProxy.read(dataInputViewStreamWrapper);
                        HashMap hashMap = new HashMap();
                        List<StateMetaInfoSnapshot> keyedStateMetaSnapshots = internalBackendSerializationProxy.getKeyedStateMetaSnapshots();
                        for (int i = 0; i < keyedStateMetaSnapshots.size(); i++) {
                            StateMetaInfoSnapshot stateMetaInfoSnapshot = keyedStateMetaSnapshots.get(i);
                            String name = stateMetaInfoSnapshot.getName();
                            this.restoredKvStateMetaInfos.put(name, stateMetaInfoSnapshot);
                            RegisteredStateMetaInfo createKeyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo(stateMetaInfoSnapshot);
                            this.registeredStateMetaInfos.put(name, createKeyedStateMetaInfo);
                            KeyedStateDescriptor createKeyedStateDescriptor = stateMetaInfoSnapshot.createKeyedStateDescriptor();
                            this.stateStorages.put(name, getOrCreateStateStorageForKeyedState(createKeyedStateMetaInfo));
                            hashMap.put(Integer.valueOf(i), createKeyedStateDescriptor);
                        }
                        HashMap hashMap2 = new HashMap();
                        List<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = internalBackendSerializationProxy.getSubKeyedStateMetaSnapshots();
                        for (int i2 = 0; i2 < subKeyedStateMetaSnapshots.size(); i2++) {
                            StateMetaInfoSnapshot stateMetaInfoSnapshot2 = subKeyedStateMetaSnapshots.get(i2);
                            String name2 = stateMetaInfoSnapshot2.getName();
                            RegisteredStateMetaInfo createSubKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(stateMetaInfoSnapshot2);
                            this.registeredStateMetaInfos.put(name2, createSubKeyedStateMetaInfo);
                            this.restoredKvStateMetaInfos.put(name2, stateMetaInfoSnapshot2);
                            SubKeyedStateDescriptor createSubKeyedStateDescriptor = stateMetaInfoSnapshot2.createSubKeyedStateDescriptor();
                            this.stateStorages.put(name2, getOrCreateStateStorageForSubKeyedState(createSubKeyedStateMetaInfo));
                            hashMap2.put(Integer.valueOf(i2), createSubKeyedStateDescriptor);
                        }
                        Map<Integer, Tuple2<Long, Integer>> metaInfos = keyGroupsStateSnapshot.getMetaInfos();
                        StreamCompressionDecorator streamCompressionDecorator = internalBackendSerializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                        Iterator<Integer> it = getKeyGroupRange().iterator();
                        while (it.hasNext()) {
                            int intValue = it.next().intValue();
                            Tuple2<Long, Integer> tuple2 = metaInfos.get(Integer.valueOf(intValue));
                            if (tuple2 != null) {
                                long longValue = ((Long) tuple2.f0).longValue();
                                int intValue2 = ((Integer) tuple2.f1).intValue();
                                openInputStream.seek(longValue);
                                Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
                                int i3 = 0;
                                InputStream decorateWithCompression = streamCompressionDecorator.decorateWithCompression(openInputStream);
                                Throwable th = null;
                                try {
                                    try {
                                        DataInputViewStreamWrapper dataInputViewStreamWrapper2 = new DataInputViewStreamWrapper(decorateWithCompression);
                                        for (int i4 = 0; i4 < keyedStateMetaSnapshots.size(); i4++) {
                                            KeyedStateDescriptor keyedStateDescriptor = (KeyedStateDescriptor) hashMap.get(Integer.valueOf(dataInputViewStreamWrapper2.readInt()));
                                            i3 += readMappingsInKeyGroupForKeyedState(dataInputViewStreamWrapper2, keyedStateDescriptor, (HeapStateStorage) this.stateStorages.get(keyedStateDescriptor.getName()));
                                        }
                                        for (int i5 = 0; i5 < subKeyedStateMetaSnapshots.size(); i5++) {
                                            SubKeyedStateDescriptor subKeyedStateDescriptor = (SubKeyedStateDescriptor) hashMap2.get(Integer.valueOf(dataInputViewStreamWrapper2.readInt()));
                                            i3 += readMappingsInKeyGroupForSubKeyedState(dataInputViewStreamWrapper2, subKeyedStateDescriptor, (HeapStateStorage) this.stateStorages.get(subKeyedStateDescriptor.getName()));
                                        }
                                        Preconditions.checkState(intValue2 == i3, "Unexpected number of entries");
                                        if (decorateWithCompression != null) {
                                            if (0 != 0) {
                                                try {
                                                    decorateWithCompression.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                decorateWithCompression.close();
                                            }
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                }
                            }
                        }
                        if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                            IOUtils.closeQuietly(openInputStream);
                        }
                    } catch (Throwable th4) {
                        if (this.cancelStreamRegistry.unregisterCloseable(openInputStream)) {
                            IOUtils.closeQuietly(openInputStream);
                        }
                        throw th4;
                    }
                }
            }
        }
    }

    private int readMappingsInKeyGroupForKeyedState(DataInputView dataInputView, KeyedStateDescriptor keyedStateDescriptor, HeapStateStorage heapStateStorage) throws Exception {
        TypeSerializer keySerializer = keyedStateDescriptor.getKeySerializer();
        TypeSerializer mo2781getValueSerializer = keyedStateDescriptor.mo2781getValueSerializer();
        int readInt = dataInputView.readInt();
        for (int i = 0; i < readInt; i++) {
            heapStateStorage.put(keySerializer.deserialize(dataInputView), mo2781getValueSerializer.deserialize(dataInputView));
        }
        return readInt;
    }

    private int readMappingsInKeyGroupForSubKeyedState(DataInputView dataInputView, SubKeyedStateDescriptor subKeyedStateDescriptor, HeapStateStorage heapStateStorage) throws Exception {
        TypeSerializer keySerializer = subKeyedStateDescriptor.getKeySerializer();
        TypeSerializer namespaceSerializer = subKeyedStateDescriptor.getNamespaceSerializer();
        TypeSerializer mo2784getValueSerializer = subKeyedStateDescriptor.mo2784getValueSerializer();
        int readInt = dataInputView.readInt();
        for (int i = 0; i < readInt; i++) {
            Object deserialize = keySerializer.deserialize(dataInputView);
            Object deserialize2 = namespaceSerializer.deserialize(dataInputView);
            Object deserialize3 = mo2784getValueSerializer.deserialize(dataInputView);
            heapStateStorage.setCurrentNamespace(deserialize2);
            heapStateStorage.put(deserialize, deserialize3);
        }
        return readInt;
    }
}
