package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.PriorityQueueStateMetaInfoSnapshot;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBIncrementalSnapshotOperation.class */
public class RocksDBIncrementalSnapshotOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalSnapshotOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final CheckpointStreamFactory checkpointStreamFactory;
    private SnapshotDirectory localBackupDirectory;
    private final long checkpointId;
    private Map<StateHandleID, Tuple2<String, StreamStateHandle>> baseSstFiles;
    private final ResourceGuard.Lease dbLease;
    private final CloseableRegistry closeableRegistry = new CloseableRegistry();
    private final Map<StateHandleID, Tuple2<String, StreamStateHandle>> sstFiles = new HashMap();
    private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap();
    private SnapshotResult<StreamStateHandle> metaStateHandle = null;
    private List<StateMetaInfoSnapshot> keyedStateMetaInfos = new ArrayList();
    private List<StateMetaInfoSnapshot> subKeyedStateMetaInfos = new ArrayList();
    private List<PriorityQueueStateMetaInfoSnapshot> priorityQueueMetaInfos = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBIncrementalSnapshotOperation(RocksDBInternalStateBackend rocksDBInternalStateBackend, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory snapshotDirectory, long j) throws IOException {
        this.stateBackend = rocksDBInternalStateBackend;
        this.checkpointStreamFactory = checkpointStreamFactory;
        this.checkpointId = j;
        this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
        this.localBackupDirectory = snapshotDirectory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void takeSnapshot() throws Exception {
        long j;
        Iterator it = this.stateBackend.getRegisteredStateMetaInfos().entrySet().iterator();
        while (it.hasNext()) {
            RegisteredStateMetaInfo registeredStateMetaInfo = (RegisteredStateMetaInfo) this.stateBackend.getRegisteredStateMetaInfos().get((String) ((Map.Entry) it.next()).getKey());
            if (registeredStateMetaInfo.getStateType().isKeyedState()) {
                this.keyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
            } else {
                this.subKeyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
            }
        }
        Iterator<Map.Entry<String, RegisteredPriorityQueueStateBackendMetaInfo>> it2 = this.stateBackend.getPriorityQueueInformations().entrySet().iterator();
        while (it2.hasNext()) {
            this.priorityQueueMetaInfos.add(this.stateBackend.getPriorityQueueInformations().get(it2.next().getKey()).snapshot());
        }
        synchronized (this.stateBackend.materializedSstFiles) {
            j = this.stateBackend.lastCompletedCheckpointId;
            this.baseSstFiles = this.stateBackend.materializedSstFiles.get(Long.valueOf(j));
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) files as base: {}.", new Object[]{Long.valueOf(this.checkpointId), Long.valueOf(j), this.baseSstFiles});
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", this.localBackupDirectory);
        if (this.localBackupDirectory.exists()) {
            throw new IllegalStateException("Unexpected existence of the backup directory.");
        }
        this.stateBackend.takeDbSnapshot(this.localBackupDirectory.getDirectory().getPath());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
        String handleName;
        this.stateBackend.getCancelStreamRegistry().registerCloseable(this.closeableRegistry);
        this.metaStateHandle = materializeMetaData();
        Preconditions.checkNotNull(this.metaStateHandle, "Metadata was not properly created.");
        Preconditions.checkState(this.localBackupDirectory.exists());
        FileStatus[] listStatus = this.localBackupDirectory.listStatus();
        if (listStatus != null) {
            for (FileStatus fileStatus : listStatus) {
                Path path = fileStatus.getPath();
                String name = path.getName();
                StateHandleID stateHandleID = new StateHandleID(name);
                if (!name.endsWith(".sst")) {
                    this.miscFiles.put(stateHandleID, materializeStateData(path));
                } else if (this.baseSstFiles != null && this.baseSstFiles.containsKey(stateHandleID)) {
                    Tuple2<String, StreamStateHandle> tuple2 = this.baseSstFiles.get(stateHandleID);
                    this.sstFiles.put(stateHandleID, Tuple2.of(tuple2.f0, new PlaceholderStreamStateHandle(tuple2.f1.getStateSize())));
                } else {
                    FileStateHandle materializeStateData = materializeStateData(path);
                    if (materializeStateData instanceof FileStateHandle) {
                        handleName = materializeStateData.getFilePath().toString();
                    } else {
                        if (!(materializeStateData instanceof ByteStreamStateHandle)) {
                            throw new UnsupportedOperationException("RocksDB incremental snapshot operation cannot support non FileStateHandle/ByteStreamStateHandle");
                        }
                        handleName = ((ByteStreamStateHandle) materializeStateData).getHandleName();
                    }
                    this.sstFiles.put(stateHandleID, Tuple2.of(handleName, materializeStateData));
                }
            }
        }
        synchronized (this.stateBackend.materializedSstFiles) {
            this.stateBackend.materializedSstFiles.put(Long.valueOf(this.checkpointId), this.sstFiles);
        }
        IncrementalKeyedStateSnapshot incrementalKeyedStateSnapshot = new IncrementalKeyedStateSnapshot(this.stateBackend.getKeyGroupRange(), this.checkpointId, this.sstFiles, this.miscFiles, this.metaStateHandle.getJobManagerOwnedSnapshot());
        StreamStateHandle taskLocalSnapshot = this.metaStateHandle.getTaskLocalSnapshot();
        DirectoryStateHandle directoryStateHandle = null;
        try {
            directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
        } catch (IOException e) {
            Throwable th = e;
            try {
                taskLocalSnapshot.discardState();
            } catch (Exception e2) {
                th = (Exception) ExceptionUtils.firstOrSuppressed(e2, th);
            }
            LOG.warn("Problem with local state snapshot.", th);
        }
        return (directoryStateHandle == null || taskLocalSnapshot == null) ? SnapshotResult.of(incrementalKeyedStateSnapshot) : SnapshotResult.withLocalState(incrementalKeyedStateSnapshot, new IncrementalLocalKeyedStateSnapshot(this.stateBackend.getKeyGroupRange(), this.checkpointId, taskLocalSnapshot, directoryStateHandle, this.sstFiles));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        if (this.stateBackend.getCancelStreamRegistry().unregisterCloseable(this.closeableRegistry)) {
            try {
                this.closeableRegistry.close();
            } catch (IOException e) {
                LOG.warn("Could not properly close io streams.", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseResources(boolean z) {
        this.dbLease.close();
        if (this.stateBackend.getCancelStreamRegistry().unregisterCloseable(this.closeableRegistry)) {
            try {
                this.closeableRegistry.close();
            } catch (IOException e) {
                LOG.warn("Exception on closing registry.", e);
            }
        }
        try {
            if (this.localBackupDirectory.exists()) {
                LOG.trace("Running cleanup for local RocksDB backup directory {}.", this.localBackupDirectory);
                if (!this.localBackupDirectory.cleanup()) {
                    LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                }
            }
        } catch (IOException e2) {
            LOG.warn("Could not properly cleanup local RocksDB backup directory.", e2);
        }
        if (z) {
            ArrayList arrayList = new ArrayList(1 + this.miscFiles.size() + this.sstFiles.size());
            arrayList.add(this.metaStateHandle);
            arrayList.addAll(this.miscFiles.values());
            arrayList.addAll((Collection) this.sstFiles.values().stream().map(tuple2 -> {
                return (StreamStateHandle) tuple2.f1;
            }).collect(Collectors.toSet()));
            try {
                StateUtil.bestEffortDiscardAllStateObjects(arrayList);
            } catch (Exception e3) {
                LOG.warn("Could not properly discard states.", e3);
            }
            if (this.localBackupDirectory.isSnapshotCompleted()) {
                try {
                    DirectoryStateHandle completeSnapshotAndGetHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                    if (completeSnapshotAndGetHandle != null) {
                        completeSnapshotAndGetHandle.discardState();
                    }
                } catch (Exception e4) {
                    LOG.warn("Could not properly discard local state.", e4);
                }
            }
        }
    }

    private StreamStateHandle materializeStateData(Path path) throws Exception {
        FSDataInputStream fSDataInputStream = null;
        CheckpointStreamFactory.CheckpointStateOutputStream checkpointStateOutputStream = null;
        try {
            byte[] bArr = new byte[8192];
            fSDataInputStream = this.localBackupDirectory.getFileSystem().open(path);
            this.closeableRegistry.registerCloseable(fSDataInputStream);
            checkpointStateOutputStream = this.checkpointStreamFactory.createCheckpointStateOutputStream(this.checkpointId, CheckpointedStateScope.SHARED);
            this.closeableRegistry.registerCloseable(checkpointStateOutputStream);
            while (true) {
                int read = fSDataInputStream.read(bArr);
                if (read == -1) {
                    break;
                }
                checkpointStateOutputStream.write(bArr, 0, read);
            }
            StreamStateHandle streamStateHandle = null;
            if (this.closeableRegistry.unregisterCloseable(checkpointStateOutputStream)) {
                streamStateHandle = checkpointStateOutputStream.closeAndGetHandle();
                checkpointStateOutputStream = null;
            }
            StreamStateHandle streamStateHandle2 = streamStateHandle;
            if (this.closeableRegistry.unregisterCloseable(fSDataInputStream)) {
                fSDataInputStream.close();
            }
            if (this.closeableRegistry.unregisterCloseable(checkpointStateOutputStream)) {
                checkpointStateOutputStream.close();
            }
            return streamStateHandle2;
        } catch (Throwable th) {
            if (this.closeableRegistry.unregisterCloseable(fSDataInputStream)) {
                fSDataInputStream.close();
            }
            if (this.closeableRegistry.unregisterCloseable(checkpointStateOutputStream)) {
                checkpointStateOutputStream.close();
            }
            throw th;
        }
    }

    private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
        LocalRecoveryConfig localRecoveryConfig = this.stateBackend.localRecoveryConfig;
        CheckpointStreamWithResultProvider createDuplicatingStream = localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream(this.checkpointId, CheckpointedStateScope.EXCLUSIVE, this.checkpointStreamFactory, localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream(this.checkpointId, CheckpointedStateScope.EXCLUSIVE, this.checkpointStreamFactory);
        try {
            this.closeableRegistry.registerCloseable(createDuplicatingStream);
            new RocksDBInternalBackendSerializationProxy(this.keyedStateMetaInfos, this.subKeyedStateMetaInfos, this.priorityQueueMetaInfos, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.stateBackend.getKeyGroupCompressionDecorator())).write(new DataOutputViewStreamWrapper(createDuplicatingStream.getCheckpointOutputStream()));
            if (!this.closeableRegistry.unregisterCloseable(createDuplicatingStream)) {
                throw new IOException("Stream already closed and cannot return a handle.");
            }
            SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult = createDuplicatingStream.closeAndFinalizeCheckpointStreamResult();
            if (0 != 0 && this.closeableRegistry.unregisterCloseable(null)) {
                IOUtils.closeQuietly(null);
            }
            return closeAndFinalizeCheckpointStreamResult;
        } catch (Throwable th) {
            if (createDuplicatingStream != null && this.closeableRegistry.unregisterCloseable(createDuplicatingStream)) {
                IOUtils.closeQuietly(createDuplicatingStream);
            }
            throw th;
        }
    }
}
