/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateStorage;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.heap.HeapStateStorage;
import org.apache.flink.runtime.state.heap.internal.StateTable;
import org.apache.flink.runtime.state.heap.internal.StateTableSnapshot;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapInternalStateBackend
extends AbstractInternalStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(HeapInternalStateBackend.class);
    private final LocalRecoveryConfig localRecoveryConfig;
    private final boolean asynchronousSnapshot;

    public HeapInternalStateBackend(int numberOfGroups, KeyGroupRange keyGroupRange, ClassLoader userClassLoader, LocalRecoveryConfig localRecoveryConfig, TaskKvStateRegistry kvStateRegistry, boolean asynchronousSnapshot, ExecutionConfig executionConfig) {
        super(numberOfGroups, keyGroupRange, userClassLoader, kvStateRegistry, executionConfig);
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        this.asynchronousSnapshot = asynchronousSnapshot;
        LOG.info("HeapInternalStateBackend is created with {} mode.", (Object)(asynchronousSnapshot ? "async" : "sync"));
    }

    @Override
    public void closeImpl() {
    }

    @Override
    protected StateStorage getOrCreateStateStorageForKeyedState(RegisteredStateMetaInfo stateMetaInfo) {
        HeapStateStorage stateStorage = (HeapStateStorage)this.stateStorages.get(stateMetaInfo.getName());
        if (stateStorage == null) {
            stateStorage = new HeapStateStorage(this, stateMetaInfo, VoidNamespace.INSTANCE, false, this.asynchronousSnapshot);
            this.stateStorages.put(stateMetaInfo.getName(), stateStorage);
        }
        stateStorage.setStateMetaInfo(stateMetaInfo);
        return stateStorage;
    }

    @Override
    protected StateStorage getOrCreateStateStorageForSubKeyedState(RegisteredStateMetaInfo stateMetaInfo) {
        HeapStateStorage stateStorage = (HeapStateStorage)this.stateStorages.get(stateMetaInfo.getName());
        if (stateStorage == null) {
            stateStorage = new HeapStateStorage(this, stateMetaInfo, null, true, this.asynchronousSnapshot);
            this.stateStorages.put(stateMetaInfo.getName(), stateStorage);
        }
        stateStorage.setStateMetaInfo(stateMetaInfo);
        return stateStorage;
    }

    @Override
    public int numStateEntries() {
        int count = 0;
        List stateStorages = this.getKeyedStates().values().stream().map(KeyedState::getStateStorage).collect(Collectors.toList());
        stateStorages.addAll(this.getSubKeyedStates().values().stream().map(SubKeyedState::getStateStorage).collect(Collectors.toList()));
        for (Object stateStorage : stateStorages) {
            count += ((HeapStateStorage)stateStorage).getStateTable().size();
        }
        return count;
    }

    @VisibleForTesting
    public int numStateEntries(Object namespace) {
        int count = 0;
        for (SubKeyedState subKeyedState : this.getSubKeyedStates().values()) {
            count += ((HeapStateStorage)subKeyedState.getStateStorage()).getStateTable().sizeOfNamespace(namespace);
        }
        return count;
    }

    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, final CheckpointStreamFactory primaryStreamFactory, CheckpointOptions checkpointOptions) {
        if (this.registeredStateMetaInfos.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        long syncStartTime = System.currentTimeMillis();
        final ArrayList<StateMetaInfoSnapshot> keyedStateMetaSnapshots = new ArrayList<StateMetaInfoSnapshot>();
        final ArrayList<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = new ArrayList<StateMetaInfoSnapshot>();
        final HashMap<String, Integer> keyedStateToId = new HashMap<String, Integer>();
        final HashMap<String, Integer> subKeyedStateToId = new HashMap<String, Integer>();
        final HashMap<String, StateTableSnapshot> keyedStateStableSnapshots = new HashMap<String, StateTableSnapshot>(this.keyedStates.size());
        final HashMap<String, StateTableSnapshot> subKeyedStateStableSnapshots = new HashMap<String, StateTableSnapshot>(this.subKeyedStates.size());
        for (Map.Entry registeredStateMetaInfoEntry : this.registeredStateMetaInfos.entrySet()) {
            String stateName = (String)registeredStateMetaInfoEntry.getKey();
            StateTable stateTable = ((HeapStateStorage)this.stateStorages.get(stateName)).getStateTable();
            RegisteredStateMetaInfo stateMetaInfo = (RegisteredStateMetaInfo)registeredStateMetaInfoEntry.getValue();
            if (stateMetaInfo.getStateType().isKeyedState()) {
                keyedStateMetaSnapshots.add(stateMetaInfo.snapshot());
                keyedStateToId.put(stateName, keyedStateToId.size());
                keyedStateStableSnapshots.put(stateName, stateTable.createSnapshot());
                continue;
            }
            subKeyedStateMetaSnapshots.add(stateMetaInfo.snapshot());
            subKeyedStateToId.put(stateName, subKeyedStateToId.size());
            subKeyedStateStableSnapshots.put(stateName, stateTable.createSnapshot());
        }
        final SupplierWithException checkpointStreamSupplier = this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory, this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
        AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>(){
            CheckpointStreamWithResultProvider streamAndResultExtractor = null;

            @Override
            protected void acquireResources() throws Exception {
                this.streamAndResultExtractor = (CheckpointStreamWithResultProvider)checkpointStreamSupplier.get();
                HeapInternalStateBackend.this.cancelStreamRegistry.registerCloseable((Closeable)this.streamAndResultExtractor);
            }

            @Override
            protected void releaseResources() {
                this.unregisterAndCloseStreamAndResultExtractor();
                for (StateTableSnapshot tableSnapshot : keyedStateStableSnapshots.values()) {
                    tableSnapshot.release();
                }
                for (StateTableSnapshot tableSnapshot : subKeyedStateStableSnapshots.values()) {
                    tableSnapshot.release();
                }
            }

            @Override
            protected void stopOperation() {
                this.unregisterAndCloseStreamAndResultExtractor();
            }

            private void unregisterAndCloseStreamAndResultExtractor() {
                if (HeapInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                    IOUtils.closeQuietly((AutoCloseable)this.streamAndResultExtractor);
                    this.streamAndResultExtractor = null;
                }
            }

            @Override
            @Nonnull
            protected SnapshotResult<KeyedStateHandle> performOperation() throws Exception {
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream localStream = this.streamAndResultExtractor.getCheckpointOutputStream();
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)localStream));
                InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(keyedStateMetaSnapshots, subKeyedStateMetaSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapInternalStateBackend.this.keyGroupCompressionDecorator));
                serializationProxy.write((DataOutputView)outView);
                HashMap<Integer, Tuple2<Long, Integer>> metaInfos = new HashMap<Integer, Tuple2<Long, Integer>>();
                KeyGroupRange groups = HeapInternalStateBackend.this.getKeyGroupRange();
                for (int group : groups) {
                    long offset = localStream.getPos();
                    int numEntries = 0;
                    outView.writeInt(group);
                    for (Map.Entry entry : keyedStateStableSnapshots.entrySet()) {
                        numEntries += HeapInternalStateBackend.this.writeGroupStates(localStream, (StateTableSnapshot)entry.getValue(), (Integer)keyedStateToId.get(entry.getKey()), group);
                    }
                    for (Map.Entry entry : subKeyedStateStableSnapshots.entrySet()) {
                        numEntries += HeapInternalStateBackend.this.writeGroupStates(localStream, (StateTableSnapshot)entry.getValue(), (Integer)subKeyedStateToId.get(entry.getKey()), group);
                    }
                    if (numEntries == 0) continue;
                    metaInfos.put(group, (Tuple2<Long, Integer>)new Tuple2((Object)offset, (Object)numEntries));
                }
                if (HeapInternalStateBackend.this.cancelStreamRegistry.unregisterCloseable((Closeable)this.streamAndResultExtractor)) {
                    SnapshotResult<StreamStateHandle> streamSnapshotResult = this.streamAndResultExtractor.closeAndFinalizeCheckpointStreamResult();
                    this.streamAndResultExtractor = null;
                    StreamStateHandle streamStateHandle = streamSnapshotResult.getJobManagerOwnedSnapshot();
                    KeyGroupsStateSnapshot snapshot = new KeyGroupsStateSnapshot(groups, metaInfos, streamStateHandle);
                    LOG.info("Heap backend snapshot (" + primaryStreamFactory + ", asynchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - asyncStartTime) + " ms.");
                    StreamStateHandle localStreamStateHandle = streamSnapshotResult.getTaskLocalSnapshot();
                    if (localStreamStateHandle != null) {
                        KeyGroupsStateSnapshot localSnapshot = new KeyGroupsStateSnapshot(groups, metaInfos, localStreamStateHandle);
                        return SnapshotResult.withLocalState(snapshot, localSnapshot);
                    }
                    return SnapshotResult.of(snapshot);
                }
                throw new IOException("Stream already closed and cannot return a handle.");
            }
        };
        AsyncStoppableTaskWithCallback<SnapshotResult<KeyedStateHandle>> task = AsyncStoppableTaskWithCallback.from(ioCallable);
        if (!this.asynchronousSnapshot) {
            task.run();
        }
        LOG.info("Heap backend snapshot (" + primaryStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
        return task;
    }

    private int writeGroupStates(CheckpointStreamFactory.CheckpointStateOutputStream localStream, StateTableSnapshot stateTableSnapshot, int stateId, int group) throws IOException {
        DataOutputViewStreamWrapper kgCompressionView;
        int numEntries = 0;
        try (OutputStream kgCompressionOut = this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)((Object)localStream));){
            kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
            kgCompressionView.writeInt(stateId);
        }
        return numEntries += stateTableSnapshot.writeMappingsInKeyGroup((DataOutputView)kgCompressionView, group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(Collection<KeyedStateHandle> restoredSnapshots) throws Exception {
        if (restoredSnapshots == null || restoredSnapshots.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap internal state backend from snapshot.");
        for (KeyedStateHandle rawSnapshot : restoredSnapshots) {
            if (rawSnapshot == null) continue;
            Preconditions.checkState((boolean)(rawSnapshot instanceof KeyGroupsStateSnapshot));
            KeyGroupsStateSnapshot snapshot = (KeyGroupsStateSnapshot)rawSnapshot;
            StreamStateHandle snapshotHandle = snapshot.getSnapshotHandle();
            if (snapshotHandle == null) continue;
            FSDataInputStream inputStream = snapshotHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)inputStream);
            try {
                DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper((InputStream)inputStream);
                InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(this.getUserClassLoader(), true);
                serializationProxy.read((DataInputView)inputView);
                HashMap<Integer, KeyedStateDescriptor> keyedStatesById = new HashMap<Integer, KeyedStateDescriptor>();
                List<StateMetaInfoSnapshot> keyedStateMetaInfos = serializationProxy.getKeyedStateMetaSnapshots();
                for (int i = 0; i < keyedStateMetaInfos.size(); ++i) {
                    StateMetaInfoSnapshot keyedStateMetaSnapshot = keyedStateMetaInfos.get(i);
                    String stateName = keyedStateMetaSnapshot.getName();
                    this.restoredKvStateMetaInfos.put(stateName, keyedStateMetaSnapshot);
                    RegisteredStateMetaInfo keyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo(keyedStateMetaSnapshot);
                    this.registeredStateMetaInfos.put(stateName, keyedStateMetaInfo);
                    KeyedStateDescriptor keyedStateDescriptor = keyedStateMetaSnapshot.createKeyedStateDescriptor();
                    StateStorage stateStorage = this.getOrCreateStateStorageForKeyedState(keyedStateMetaInfo);
                    this.stateStorages.put(stateName, stateStorage);
                    keyedStatesById.put(i, keyedStateDescriptor);
                }
                HashMap<Integer, SubKeyedStateDescriptor> subKeyedStatesById = new HashMap<Integer, SubKeyedStateDescriptor>();
                List<StateMetaInfoSnapshot> subKeyedStateMetaSnapshots = serializationProxy.getSubKeyedStateMetaSnapshots();
                for (int i = 0; i < subKeyedStateMetaSnapshots.size(); ++i) {
                    StateMetaInfoSnapshot subKeyedStateMetaSnapshot = subKeyedStateMetaSnapshots.get(i);
                    String stateName = subKeyedStateMetaSnapshot.getName();
                    RegisteredStateMetaInfo subKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(subKeyedStateMetaSnapshot);
                    this.registeredStateMetaInfos.put(stateName, subKeyedStateMetaInfo);
                    this.restoredKvStateMetaInfos.put(stateName, subKeyedStateMetaSnapshot);
                    SubKeyedStateDescriptor subKeyedStateDescriptor = subKeyedStateMetaSnapshot.createSubKeyedStateDescriptor();
                    StateStorage stateStorage = this.getOrCreateStateStorageForSubKeyedState(subKeyedStateMetaInfo);
                    this.stateStorages.put(stateName, stateStorage);
                    subKeyedStatesById.put(i, subKeyedStateDescriptor);
                }
                Map<Integer, Tuple2<Long, Integer>> metaInfos = snapshot.getMetaInfos();
                StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                for (int group : this.getKeyGroupRange()) {
                    Tuple2<Long, Integer> tuple = metaInfos.get(group);
                    if (tuple == null) continue;
                    long offset = (Long)tuple.f0;
                    int totalEntries = (Integer)tuple.f1;
                    inputStream.seek(offset);
                    int writtenKeyGroupIndex = inputView.readInt();
                    Preconditions.checkState((writtenKeyGroupIndex == group ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
                    int numEntries = 0;
                    InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression((InputStream)inputStream);
                    Throwable throwable = null;
                    try {
                        HeapStateStorage stateStorage;
                        Serializable descriptor;
                        int stateId;
                        int i;
                        DataInputViewStreamWrapper kgCompressionInView = new DataInputViewStreamWrapper(kgCompressionInStream);
                        for (i = 0; i < keyedStateMetaInfos.size(); ++i) {
                            stateId = kgCompressionInView.readInt();
                            descriptor = (KeyedStateDescriptor)keyedStatesById.get(stateId);
                            stateStorage = (HeapStateStorage)this.stateStorages.get(((KeyedStateDescriptor)descriptor).getName());
                            numEntries += this.readMappingsInKeyGroupForKeyedState((DataInputView)kgCompressionInView, (KeyedStateDescriptor)descriptor, stateStorage);
                        }
                        for (i = 0; i < subKeyedStateMetaSnapshots.size(); ++i) {
                            stateId = kgCompressionInView.readInt();
                            descriptor = (SubKeyedStateDescriptor)subKeyedStatesById.get(stateId);
                            stateStorage = (HeapStateStorage)this.stateStorages.get(((SubKeyedStateDescriptor)descriptor).getName());
                            numEntries += this.readMappingsInKeyGroupForSubKeyedState((DataInputView)kgCompressionInView, (SubKeyedStateDescriptor)descriptor, stateStorage);
                        }
                        Preconditions.checkState((totalEntries == numEntries ? 1 : 0) != 0, (Object)"Unexpected number of entries");
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (kgCompressionInStream == null) continue;
                        if (throwable != null) {
                            try {
                                kgCompressionInStream.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        kgCompressionInStream.close();
                    }
                }
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)inputStream)) continue;
                IOUtils.closeQuietly((AutoCloseable)inputStream);
            }
        }
    }

    private int readMappingsInKeyGroupForKeyedState(DataInputView inView, KeyedStateDescriptor descriptor, HeapStateStorage stateStorage) throws Exception {
        TypeSerializer keySerializer = descriptor.getKeySerializer();
        TypeSerializer stateSerializer = descriptor.getValueSerializer();
        int numKeys = inView.readInt();
        for (int i = 0; i < numKeys; ++i) {
            Object key = keySerializer.deserialize(inView);
            Object state = stateSerializer.deserialize(inView);
            stateStorage.put(key, state);
        }
        return numKeys;
    }

    private int readMappingsInKeyGroupForSubKeyedState(DataInputView inView, SubKeyedStateDescriptor descriptor, HeapStateStorage stateStorage) throws Exception {
        TypeSerializer keySerializer = descriptor.getKeySerializer();
        TypeSerializer namespaceSerializer = descriptor.getNamespaceSerializer();
        TypeSerializer stateSerializer = descriptor.getValueSerializer();
        int numKeys = inView.readInt();
        for (int i = 0; i < numKeys; ++i) {
            Object key = keySerializer.deserialize(inView);
            Object namespace = namespaceSerializer.deserialize(inView);
            Object state = stateSerializer.deserialize(inView);
            stateStorage.setCurrentNamespace(namespace);
            stateStorage.put(key, state);
        }
        return numKeys;
    }
}

