package org.apache.flink.runtime.state;

import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FileSegmentStateHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/IncrementalSegmentStateSnapshot.class */
public class IncrementalSegmentStateSnapshot extends IncrementalKeyedStateSnapshot {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSegmentStateSnapshot.class);
    private static final long serialVersionUID = 1;
    private transient SharedStateRegistry sharedStateRegistry;

    public IncrementalSegmentStateSnapshot(KeyGroupRange keyGroupRange, long j, Map<StateHandleID, Tuple2<String, StreamStateHandle>> map, Map<StateHandleID, StreamStateHandle> map2, StreamStateHandle streamStateHandle) {
        super(keyGroupRange, j, map, map2, streamStateHandle);
        this.sharedStateRegistry = null;
    }

    @Override // org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot, org.apache.flink.runtime.state.CompositeStateHandle
    public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
        Preconditions.checkState(this.sharedStateRegistry != sharedStateRegistry, "The state handle has already registered its shared states to the given registry.");
        this.sharedStateRegistry = (SharedStateRegistry) Preconditions.checkNotNull(sharedStateRegistry);
        LOG.trace("Registering IncrementalSegmentStateSnapshot for checkpoint {} from backend.", Long.valueOf(getCheckpointId()));
        registerSegmentHandle(sharedStateRegistry, getMetaStateHandle());
        for (Map.Entry<StateHandleID, Tuple2<String, StreamStateHandle>> entry : getSharedState().entrySet()) {
            StreamStateHandle streamStateHandle = (StreamStateHandle) entry.getValue().f1;
            if (streamStateHandle instanceof FileSegmentStateHandle) {
                registerSegmentHandle(sharedStateRegistry, (StreamStateHandle) entry.getValue().f1);
            } else if (streamStateHandle instanceof PlaceholderSegmentStateHandle) {
                PlaceholderSegmentStateHandle placeholderSegmentStateHandle = (PlaceholderSegmentStateHandle) streamStateHandle;
                String path = placeholderSegmentStateHandle.getFilePath().toString();
                sharedStateRegistry.registerReference(createSharedStateRegistryKeyFromUniqueId(path), placeholderSegmentStateHandle);
                entry.setValue(Tuple2.of(path, placeholderSegmentStateHandle.toFileSegmentStateHandle()));
            }
        }
        Iterator<Map.Entry<StateHandleID, StreamStateHandle>> it = getPrivateState().entrySet().iterator();
        while (it.hasNext()) {
            registerSegmentHandle(sharedStateRegistry, it.next().getValue());
        }
    }

    private void registerSegmentHandle(SharedStateRegistry sharedStateRegistry, StreamStateHandle streamStateHandle) {
        Preconditions.checkState(streamStateHandle instanceof FileSegmentStateHandle, "The state handle to register should be a FileSegmentStateHandle.");
        sharedStateRegistry.registerReference(((FileSegmentStateHandle) streamStateHandle).getRegistryKey(), streamStateHandle);
    }

    @Override // org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot, org.apache.flink.runtime.state.StateObject
    public void discardState() {
        SharedStateRegistry sharedStateRegistry = this.sharedStateRegistry;
        boolean z = sharedStateRegistry != null;
        LOG.trace("Discarding IncrementalKeyedStateSnapshot (registered = {}) for checkpoint {} from backend.", Boolean.valueOf(z), Long.valueOf(getCheckpointId()));
        if (z) {
            sharedStateRegistry.unregisterReference(createSharedStateRegistryKeyFromSegment(getMetaStateHandle()));
            Iterator<Map.Entry<StateHandleID, Tuple2<String, StreamStateHandle>>> it = getSharedState().entrySet().iterator();
            while (it.hasNext()) {
                sharedStateRegistry.unregisterReference(createSharedStateRegistryKeyFromUniqueId((String) it.next().getValue().f0));
            }
            Iterator<Map.Entry<StateHandleID, StreamStateHandle>> it2 = getPrivateState().entrySet().iterator();
            while (it2.hasNext()) {
                sharedStateRegistry.unregisterReference(createSharedStateRegistryKeyFromSegment(it2.next().getValue()));
            }
        }
    }

    public String toString() {
        return "IncrementalSegmentStateSnapshot{keyGroupRange=" + getKeyGroupRange() + ", checkpointId=" + getCheckpointId() + ", sharedState=" + getSharedState() + ", privateState=" + getPrivateState() + ", metaStateHandle=" + getMetaStateHandle() + ", registered=" + (this.sharedStateRegistry != null) + '}';
    }

    SharedStateRegistryKey createSharedStateRegistryKeyFromUniqueId(String str) {
        return new SharedStateRegistryKey(str);
    }

    private SharedStateRegistryKey createSharedStateRegistryKeyFromSegment(StreamStateHandle streamStateHandle) {
        Preconditions.checkState(streamStateHandle instanceof FileSegmentStateHandle, "Incremental segment state snapshot only supports FileSegmentStateHandle.");
        return new SharedStateRegistryKey(((FileSegmentStateHandle) streamStateHandle).getFilePath().toString());
    }
}
