/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.context.ContextStateHelper;
import org.apache.flink.runtime.state.heap.HeapInternalStateBackend;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.util.Preconditions;

public class KeyedStateBackendWrapper<K>
extends AbstractKeyedStateBackend<K> {
    private final AbstractInternalStateBackend internalStateBackend;
    protected transient ContextStateHelper contextStateHelper;

    public KeyedStateBackendWrapper(ContextStateHelper contextStateHelper) {
        super(contextStateHelper.getInternalStateBackend().getKvStateRegistry(), contextStateHelper.getKeyContext().getKeySerializer(), contextStateHelper.getInternalStateBackend().getUserClassLoader(), contextStateHelper.getInternalStateBackend().getNumGroups(), contextStateHelper.getInternalStateBackend().getKeyGroupRange(), contextStateHelper.getExecutionConfig());
        this.contextStateHelper = contextStateHelper;
        this.internalStateBackend = contextStateHelper.getInternalStateBackend();
    }

    @Override
    protected <N, T> InternalValueState<K, N, T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
        return (InternalValueState)this.contextStateHelper.createValueState(namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, T> InternalListState<K, N, T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        return (InternalListState)this.contextStateHelper.createListState(namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, T> InternalReducingState<K, N, T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        return (InternalReducingState)this.contextStateHelper.createReducingState(namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(TypeSerializer<N> namespaceSerializer, AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
        return (InternalAggregatingState)this.contextStateHelper.createAggregatingState(namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        return (InternalFoldingState)this.contextStateHelper.createFoldingState(namespaceSerializer, stateDesc);
    }

    @Override
    protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer, MapStateDescriptor<UK, UV> stateDesc) throws Exception {
        return (InternalMapState)this.contextStateHelper.createMapState(namespaceSerializer, stateDesc);
    }

    @Override
    public int numStateEntries() {
        return this.internalStateBackend.numStateEntries();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.internalStateBackend.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        return this.internalStateBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    @Override
    public void restore(Collection<KeyedStateHandle> state) throws Exception {
        this.internalStateBackend.restore(state);
    }

    @Override
    public void setCurrentKey(K newKey) {
        this.contextStateHelper.getKeyContext().setCurrentKey(newKey);
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        try (Stream<K> keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            Object state = this.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
            if (this.internalStateBackend instanceof HeapInternalStateBackend) {
                List keys = keyStream.collect(Collectors.toList());
                for (Object key2 : keys) {
                    this.setCurrentKey(key2);
                    function.process(key2, state);
                }
            } else {
                keyStream.forEach(key -> {
                    this.setCurrentKey(key);
                    try {
                        function.process(key, state);
                    }
                    catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }
    }

    @Override
    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        return this.contextStateHelper.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
    }

    @Override
    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.contextStateHelper.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
    }

    @Override
    public void dispose() {
        this.internalStateBackend.dispose();
        this.contextStateHelper.dispose();
    }

    @Override
    public void close() throws IOException {
        this.internalStateBackend.close();
    }

    @Override
    public K getCurrentKey() {
        return this.contextStateHelper.getKeyContext().getCurrentKey();
    }

    @Override
    public int getCurrentKeyGroupIndex() {
        return this.contextStateHelper.getKeyContext().getCurrentKeyGroupIndex();
    }

    @Override
    public int getNumberOfKeyGroups() {
        return this.internalStateBackend.getNumGroups();
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.internalStateBackend.getKeyGroupRange();
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.contextStateHelper.getKeyContext().getKeySerializer();
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        KeyedState keyedState = this.internalStateBackend.getKeyedStates().get(state);
        if (keyedState != null) {
            Preconditions.checkState((boolean)VoidNamespace.get().equals(namespace), (Object)"Expected VoidNamespace when getKeys over keyedState.");
            Iterable iterable = keyedState.keys();
            return StreamSupport.stream(iterable.spliterator(), false);
        }
        SubKeyedState subKeyedState = this.internalStateBackend.getSubKeyedStates().get(state);
        if (subKeyedState != null) {
            return StreamSupport.stream(subKeyedState.keys(namespace).spliterator(), false);
        }
        return Stream.empty();
    }

    @Override
    public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
        return UncompressedStreamCompressionDecorator.INSTANCE;
    }
}

