/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.BackendWritableBroadcastState;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.HeapBroadcastState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorBackendSerializationProxy;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredBroadcastBackendStateMetaInfo;
import org.apache.flink.runtime.state.RegisteredOperatorBackendStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultOperatorStateBackend
implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer;
    private final ClassLoader userClassloader;
    private final ExecutionConfig executionConfig;
    private final boolean asynchronousSnapshots;
    private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorStateMetaInfos;
    private final Map<String, RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastStateMetaInfos;
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName;
    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;

    public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig, boolean asynchronousSnapshots) {
        this.userClassloader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
        this.executionConfig = executionConfig;
        this.javaSerializer = new JavaSerializer();
        this.registeredOperatorStates = new HashMap();
        this.registeredBroadcastStates = new HashMap();
        this.asynchronousSnapshots = asynchronousSnapshots;
        this.accessedStatesByName = new HashMap();
        this.accessedBroadcastStatesByName = new HashMap();
        this.restoredOperatorStateMetaInfos = new HashMap();
        this.restoredBroadcastStateMetaInfos = new HashMap();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredOperatorStates.keySet();
    }

    public Set<String> getRegisteredBroadcastStateNames() {
        return this.registeredBroadcastStates.keySet();
    }

    @Override
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly((Closeable)this.closeStreamOnCancelRegistry);
        this.registeredOperatorStates.clear();
        this.registeredBroadcastStates.clear();
    }

    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        BackendWritableBroadcastState<?, ?> previous = this.accessedBroadcastStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer broadcastStateKeySerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getKeySerializer());
        TypeSerializer broadcastStateValueSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getValueSerializer());
        BackendWritableBroadcastState<?, ?> broadcastState = this.registeredBroadcastStates.get(name);
        if (broadcastState == null) {
            broadcastState = new HeapBroadcastState(new RegisteredBroadcastBackendStateMetaInfo(name, OperatorStateHandle.Mode.BROADCAST, broadcastStateKeySerializer, broadcastStateValueSerializer));
            this.registeredBroadcastStates.put(name, broadcastState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(broadcastState.getStateMetaInfo().getName(), name, broadcastState.getStateMetaInfo().getAssignmentMode(), OperatorStateHandle.Mode.BROADCAST);
            RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo = this.restoredBroadcastStateMetaInfos.get(name);
            CompatibilityResult keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getKeySerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)restoredMetaInfo.getKeySerializerConfigSnapshot(), (TypeSerializer)broadcastStateKeySerializer);
            CompatibilityResult valueCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getValueSerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)restoredMetaInfo.getValueSerializerConfigSnapshot(), (TypeSerializer)broadcastStateValueSerializer);
            if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
                broadcastState.setStateMetaInfo(new RegisteredBroadcastBackendStateMetaInfo(name, OperatorStateHandle.Mode.BROADCAST, broadcastStateKeySerializer, broadcastStateValueSerializer));
            } else {
                throw new StateMigrationException("State migration isn't supported, yet.");
            }
        }
        this.accessedBroadcastStatesByName.put(name, broadcastState);
        return broadcastState;
    }

    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Deprecated
    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor);
    }

    @Deprecated
    public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
        return this.getListState(new ListStateDescriptor(stateName, this.javaSerializer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(final long checkpointId, long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        long syncStartTime = System.currentTimeMillis();
        if (this.registeredOperatorStates.isEmpty() && this.registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        final HashMap registeredOperatorStatesDeepCopies = new HashMap(this.registeredOperatorStates.size());
        final HashMap registeredBroadcastStatesDeepCopies = new HashMap(this.registeredBroadcastStates.size());
        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassloader);
        try {
            if (!this.registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }
            if (!this.registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, Object> entry : this.registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState broadcastState = (BackendWritableBroadcastState)entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        }
        finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
        AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>> ioCallable = new AbstractAsyncCallableWithResources<SnapshotResult<OperatorStateHandle>>(){
            CheckpointStreamFactory.CheckpointStateOutputStream out = null;

            @Override
            protected void acquireResources() throws Exception {
                this.openOutStream();
            }

            @Override
            protected void releaseResources() {
                this.closeOutStream();
            }

            @Override
            protected void stopOperation() {
                this.closeOutStream();
            }

            private void openOutStream() throws Exception {
                this.out = streamFactory.createCheckpointStateOutputStream(checkpointId, CheckpointedStateScope.EXCLUSIVE);
                DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.registerCloseable((Closeable)((Object)this.out));
            }

            private void closeOutStream() {
                if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)((Object)this.out))) {
                    IOUtils.closeQuietly((OutputStream)((Object)this.out));
                }
            }

            @Override
            @Nonnull
            public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {
                StreamStateHandle stateHandle;
                OperatorStateHandle.Mode mode;
                long[] partitionOffsets;
                Object value;
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
                ArrayList operatorMetaInfoSnapshots = new ArrayList(registeredOperatorStatesDeepCopies.size());
                for (Map.Entry entry : registeredOperatorStatesDeepCopies.entrySet()) {
                    operatorMetaInfoSnapshots.add(((PartitionableListState)entry.getValue()).getStateMetaInfo().snapshot());
                }
                ArrayList broadcastMetaInfoSnapshots = new ArrayList(registeredBroadcastStatesDeepCopies.size());
                for (Map.Entry entry : registeredBroadcastStatesDeepCopies.entrySet()) {
                    broadcastMetaInfoSnapshots.add(((BackendWritableBroadcastState)entry.getValue()).getStateMetaInfo().snapshot());
                }
                DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream)((Object)localOut));
                OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
                operatorBackendSerializationProxy.write((DataOutputView)dataOutputViewStreamWrapper);
                HashMap<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<String, OperatorStateHandle.StateMetaInfo>(registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size());
                for (Map.Entry entry : registeredOperatorStatesDeepCopies.entrySet()) {
                    value = (PartitionableListState)entry.getValue();
                    partitionOffsets = ((PartitionableListState)value).write(localOut);
                    mode = ((PartitionableListState)value).getStateMetaInfo().getAssignmentMode();
                    writtenStatesMetaData.put((String)entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                }
                for (Map.Entry entry : registeredBroadcastStatesDeepCopies.entrySet()) {
                    value = (BackendWritableBroadcastState)entry.getValue();
                    partitionOffsets = new long[]{value.write(localOut)};
                    mode = value.getStateMetaInfo().getAssignmentMode();
                    writtenStatesMetaData.put((String)entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                }
                OperatorStreamStateHandle retValue = null;
                if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)((Object)this.out)) && (stateHandle = this.out.closeAndGetHandle()) != null) {
                    retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                }
                if (DefaultOperatorStateBackend.this.asynchronousSnapshots) {
                    LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - asyncStartTime});
                }
                return SnapshotResult.of(retValue);
            }
        };
        AsyncStoppableTaskWithCallback<SnapshotResult<OperatorStateHandle>> asyncStoppableTaskWithCallback = AsyncStoppableTaskWithCallback.from(ioCallable);
        if (!this.asynchronousSnapshots) {
            asyncStoppableTaskWithCallback.run();
        }
        LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - syncStartTime});
        return asyncStoppableTaskWithCallback;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception {
        if (null == restoreSnapshots || restoreSnapshots.isEmpty()) {
            return;
        }
        for (OperatorStateHandle stateHandle : restoreSnapshots) {
            if (stateHandle == null) continue;
            FSDataInputStream in = stateHandle.openInputStream();
            this.closeStreamOnCancelRegistry.registerCloseable((Closeable)in);
            ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(this.userClassloader);
                OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                backendSerializationProxy.read((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
                List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredOperatorMetaInfoSnapshots = backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
                for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredOperatorMetaInfoSnapshots) {
                    if (restoredMetaInfo.getPartitionStateSerializer() == null || restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
                        throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                    }
                    this.restoredOperatorStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
                    PartitionableListState<?> partitionableListState = this.registeredOperatorStates.get(restoredMetaInfo.getName());
                    if (null != partitionableListState) continue;
                    PartitionableListState partitionableListState2 = new PartitionableListState(new RegisteredOperatorBackendStateMetaInfo(restoredMetaInfo.getName(), restoredMetaInfo.getPartitionStateSerializer(), restoredMetaInfo.getAssignmentMode()));
                    this.registeredOperatorStates.put(partitionableListState2.getStateMetaInfo().getName(), partitionableListState2);
                }
                List<RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?>> restoredBroadcastMetaInfoSnapshots = backendSerializationProxy.getBroadcastStateMetaInfoSnapshots();
                for (RegisteredBroadcastBackendStateMetaInfo.Snapshot<?, ?> snapshot : restoredBroadcastMetaInfoSnapshots) {
                    if (snapshot.getKeySerializer() == null || snapshot.getValueSerializer() == null || snapshot.getKeySerializer() instanceof UnloadableDummyTypeSerializer || snapshot.getValueSerializer() instanceof UnloadableDummyTypeSerializer) {
                        throw new IOException("Unable to restore broadcast state [" + snapshot.getName() + "]. The previous key and value serializers of the state must be present; the serializers could have been removed from the classpath, or their implementations have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                    }
                    this.restoredBroadcastStateMetaInfos.put(snapshot.getName(), snapshot);
                    BackendWritableBroadcastState<?, ?> broadcastState = this.registeredBroadcastStates.get(snapshot.getName());
                    if (broadcastState != null) continue;
                    broadcastState = new HeapBroadcastState(new RegisteredBroadcastBackendStateMetaInfo(snapshot.getName(), snapshot.getAssignmentMode(), snapshot.getKeySerializer(), snapshot.getValueSerializer()));
                    this.registeredBroadcastStates.put(broadcastState.getStateMetaInfo().getName(), broadcastState);
                }
                for (Map.Entry entry : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
                    String stateName = (String)entry.getKey();
                    PartitionableListState<?> listStateForName = this.registeredOperatorStates.get(stateName);
                    if (listStateForName == null) {
                        BackendWritableBroadcastState<?, ?> broadcastStateForName = this.registeredBroadcastStates.get(stateName);
                        Preconditions.checkState((broadcastStateForName != null ? 1 : 0) != 0, (Object)("Found state without corresponding meta info: " + stateName));
                        DefaultOperatorStateBackend.deserializeBroadcastStateValues(broadcastStateForName, in, (OperatorStateHandle.StateMetaInfo)entry.getValue());
                        continue;
                    }
                    DefaultOperatorStateBackend.deserializeOperatorStateValues(listStateForName, in, (OperatorStateHandle.StateMetaInfo)entry.getValue());
                }
            }
            finally {
                Thread.currentThread().setContextClassLoader(restoreClassLoader);
                if (!this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)in)) continue;
                IOUtils.closeQuietly((InputStream)in);
            }
        }
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        PartitionableListState<?> previous = this.accessedStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo().getName(), name, previous.getStateMetaInfo().getAssignmentMode(), mode);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer partitionStateSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getElementSerializer());
        PartitionableListState<Object> partitionableListState = this.registeredOperatorStates.get(name);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState(new RegisteredOperatorBackendStateMetaInfo(name, partitionStateSerializer, mode));
            this.registeredOperatorStates.put(name, partitionableListState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(partitionableListState.getStateMetaInfo().getName(), name, partitionableListState.getStateMetaInfo().getAssignmentMode(), mode);
            RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo = this.restoredOperatorStateMetaInfos.get(name);
            TypeSerializer newPartitionStateSerializer = partitionStateSerializer.duplicate();
            CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getPartitionStateSerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(), (TypeSerializer)newPartitionStateSerializer);
            if (!stateCompatibility.isRequiresMigration()) {
                partitionableListState.setStateMetaInfo(new RegisteredOperatorBackendStateMetaInfo(name, newPartitionStateSerializer, mode));
            } else {
                throw new StateMigrationException("State migration isn't supported, yet.");
            }
        }
        this.accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }

    private static <S> void deserializeOperatorStateValues(PartitionableListState<S> stateListForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws IOException {
        long[] offsets;
        if (null != metaInfo && null != (offsets = metaInfo.getOffsets())) {
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer();
            for (long offset : offsets) {
                in.seek(offset);
                stateListForName.add(serializer.deserialize((DataInputView)div));
            }
        }
    }

    private static <K, V> void deserializeBroadcastStateValues(BackendWritableBroadcastState<K, V> broadcastStateForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws Exception {
        long[] offsets;
        if (metaInfo != null && (offsets = metaInfo.getOffsets()) != null) {
            TypeSerializer<K> keySerializer = broadcastStateForName.getStateMetaInfo().getKeySerializer();
            TypeSerializer<V> valueSerializer = broadcastStateForName.getStateMetaInfo().getValueSerializer();
            in.seek(offsets[0]);
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            int size = div.readInt();
            for (int i = 0; i < size; ++i) {
                broadcastStateForName.put(keySerializer.deserialize((DataInputView)div), valueSerializer.deserialize((DataInputView)div));
            }
        }
    }

    private static void checkStateNameAndMode(String actualName, String expectedName, OperatorStateHandle.Mode actualMode, OperatorStateHandle.Mode expectedMode) {
        Preconditions.checkState((boolean)actualName.equals(expectedName), (Object)("Incompatible state names. Was [" + actualName + "], registered with [" + expectedName + "]."));
        Preconditions.checkState((boolean)actualMode.equals((Object)expectedMode), (Object)("Incompatible state assignment modes. Was [" + (Object)((Object)actualMode) + "], registered with [" + (Object)((Object)expectedMode) + "]."));
    }

    static final class PartitionableListState<S>
    implements ListState<S> {
        private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
        private final ArrayList<S> internalList;
        private final ArrayListSerializer<S> internalListCopySerializer;

        PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
            this(stateMetaInfo, new ArrayList());
        }

        private PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo, ArrayList<S> internalList) {
            this.stateMetaInfo = (RegisteredOperatorBackendStateMetaInfo)Preconditions.checkNotNull(stateMetaInfo);
            this.internalList = (ArrayList)Preconditions.checkNotNull(internalList);
            this.internalListCopySerializer = new ArrayListSerializer<S>(stateMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> toCopy) {
            this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
            this.stateMetaInfo = stateMetaInfo;
        }

        public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
            return this.stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<S>(this);
        }

        public void clear() {
            this.internalList.clear();
        }

        public Iterable<S> get() {
            return this.internalList;
        }

        public void add(S value) {
            Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
            this.internalList.add(value);
        }

        public String toString() {
            return "PartitionableListState{stateMetaInfo=" + this.stateMetaInfo + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream out) throws IOException {
            long[] partitionOffsets = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
            for (int i = 0; i < this.internalList.size(); ++i) {
                S element = this.internalList.get(i);
                partitionOffsets[i] = out.getPos();
                this.getStateMetaInfo().getPartitionStateSerializer().serialize(element, (DataOutputView)dov);
            }
            return partitionOffsets;
        }

        public void update(List<S> values) throws Exception {
            this.internalList.clear();
            this.addAll(values);
        }

        public void addAll(List<S> values) throws Exception {
            if (values != null && !values.isEmpty()) {
                this.internalList.addAll(values);
            }
        }
    }
}

