/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBInternalBackendSerializationProxy;
import org.apache.flink.contrib.streaming.state.RocksDBInternalStateBackend;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.PriorityQueueStateMetaInfoSnapshot;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBIncrementalSnapshotOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalSnapshotOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final CheckpointStreamFactory checkpointStreamFactory;
    private SnapshotDirectory localBackupDirectory;
    private final long checkpointId;
    private Map<StateHandleID, Tuple2<String, StreamStateHandle>> baseSstFiles;
    private final CloseableRegistry closeableRegistry = new CloseableRegistry();
    private final Map<StateHandleID, Tuple2<String, StreamStateHandle>> sstFiles = new HashMap<StateHandleID, Tuple2<String, StreamStateHandle>>();
    private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<StateHandleID, StreamStateHandle>();
    private final ResourceGuard.Lease dbLease;
    private List<StateMetaInfoSnapshot> keyedStateMetaInfos;
    private List<StateMetaInfoSnapshot> subKeyedStateMetaInfos;
    private List<PriorityQueueStateMetaInfoSnapshot> priorityQueueMetaInfos;
    private SnapshotResult<StreamStateHandle> metaStateHandle = null;

    RocksDBIncrementalSnapshotOperation(RocksDBInternalStateBackend stateBackend, CheckpointStreamFactory checkpointStreamFactory, SnapshotDirectory localBackupDirectory, long checkpointId) throws IOException {
        this.stateBackend = stateBackend;
        this.checkpointStreamFactory = checkpointStreamFactory;
        this.checkpointId = checkpointId;
        this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
        this.localBackupDirectory = localBackupDirectory;
        this.keyedStateMetaInfos = new ArrayList<StateMetaInfoSnapshot>();
        this.subKeyedStateMetaInfos = new ArrayList<StateMetaInfoSnapshot>();
        this.priorityQueueMetaInfos = new ArrayList<PriorityQueueStateMetaInfoSnapshot>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void takeSnapshot() throws Exception {
        long lastCompletedCheckpoint;
        RegisteredStateMetaInfo registeredStateMetaInfo;
        String stateName;
        for (Map.Entry entry : this.stateBackend.getRegisteredStateMetaInfos().entrySet()) {
            stateName = (String)entry.getKey();
            registeredStateMetaInfo = (RegisteredStateMetaInfo)this.stateBackend.getRegisteredStateMetaInfos().get(stateName);
            if (registeredStateMetaInfo.getStateType().isKeyedState()) {
                this.keyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
                continue;
            }
            this.subKeyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
        }
        for (Map.Entry<Object, Object> entry : this.stateBackend.getPriorityQueueInformations().entrySet()) {
            stateName = (String)entry.getKey();
            registeredStateMetaInfo = this.stateBackend.getPriorityQueueInformations().get(stateName);
            this.priorityQueueMetaInfos.add(registeredStateMetaInfo.snapshot());
        }
        SortedMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>> sortedMap = this.stateBackend.materializedSstFiles;
        synchronized (sortedMap) {
            lastCompletedCheckpoint = this.stateBackend.lastCompletedCheckpointId;
            this.baseSstFiles = (Map)this.stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
        }
        LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} assuming the following (shared) files as base: {}.", new Object[]{this.checkpointId, lastCompletedCheckpoint, this.baseSstFiles});
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", (Object)this.localBackupDirectory);
        if (this.localBackupDirectory.exists()) {
            throw new IllegalStateException("Unexpected existence of the backup directory.");
        }
        this.stateBackend.takeDbSnapshot(this.localBackupDirectory.getDirectory().getPath());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    SnapshotResult<KeyedStateHandle> runSnapshot() throws Exception {
        this.stateBackend.getCancelStreamRegistry().registerCloseable(this.closeableRegistry);
        this.metaStateHandle = this.materializeMetaData();
        Preconditions.checkNotNull(this.metaStateHandle, "Metadata was not properly created.");
        Preconditions.checkState(this.localBackupDirectory.exists());
        FileStatus[] fileStatuses = this.localBackupDirectory.listStatus();
        if (fileStatuses != null) {
            for (FileStatus fileStatus : fileStatuses) {
                Path filePath = fileStatus.getPath();
                String fileName = filePath.getName();
                StateHandleID stateHandleID = new StateHandleID(fileName);
                if (fileName.endsWith(".sst")) {
                    String uniqueId;
                    boolean existsAlready;
                    boolean bl = existsAlready = this.baseSstFiles != null && this.baseSstFiles.containsKey(stateHandleID);
                    if (existsAlready) {
                        Tuple2<String, StreamStateHandle> tuple2 = this.baseSstFiles.get(stateHandleID);
                        this.sstFiles.put(stateHandleID, Tuple2.of(tuple2.f0, new PlaceholderStreamStateHandle(((StreamStateHandle)tuple2.f1).getStateSize())));
                        continue;
                    }
                    StreamStateHandle streamStateHandle = this.materializeStateData(filePath);
                    if (streamStateHandle instanceof FileStateHandle) {
                        uniqueId = ((FileStateHandle)streamStateHandle).getFilePath().toString();
                    } else if (streamStateHandle instanceof ByteStreamStateHandle) {
                        uniqueId = ((ByteStreamStateHandle)streamStateHandle).getHandleName();
                    } else {
                        throw new UnsupportedOperationException("RocksDB incremental snapshot operation cannot support non FileStateHandle/ByteStreamStateHandle");
                    }
                    this.sstFiles.put(stateHandleID, Tuple2.of(uniqueId, streamStateHandle));
                    continue;
                }
                StreamStateHandle fileHandle = this.materializeStateData(filePath);
                this.miscFiles.put(stateHandleID, fileHandle);
            }
        }
        SortedMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>> sortedMap = this.stateBackend.materializedSstFiles;
        synchronized (sortedMap) {
            this.stateBackend.materializedSstFiles.put(this.checkpointId, this.sstFiles);
        }
        IncrementalKeyedStateSnapshot incrementalKeyedStateSnapshot = new IncrementalKeyedStateSnapshot(this.stateBackend.getKeyGroupRange(), this.checkpointId, this.sstFiles, this.miscFiles, (StreamStateHandle)this.metaStateHandle.getJobManagerOwnedSnapshot());
        StreamStateHandle taskLocalSnapshotMetaDataStateHandle = (StreamStateHandle)this.metaStateHandle.getTaskLocalSnapshot();
        DirectoryStateHandle directoryStateHandle = null;
        try {
            directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
        }
        catch (IOException ex) {
            Exception collector = ex;
            try {
                taskLocalSnapshotMetaDataStateHandle.discardState();
            }
            catch (Exception discardEx) {
                collector = ExceptionUtils.firstOrSuppressed(discardEx, collector);
            }
            LOG.warn("Problem with local state snapshot.", (Throwable)collector);
        }
        if (directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null) {
            IncrementalLocalKeyedStateSnapshot localStateSnapshot = new IncrementalLocalKeyedStateSnapshot(this.stateBackend.getKeyGroupRange(), this.checkpointId, taskLocalSnapshotMetaDataStateHandle, directoryStateHandle, this.sstFiles);
            return SnapshotResult.withLocalState((StateObject)incrementalKeyedStateSnapshot, (StateObject)localStateSnapshot);
        }
        return SnapshotResult.of((StateObject)incrementalKeyedStateSnapshot);
    }

    void stop() {
        if (this.stateBackend.getCancelStreamRegistry().unregisterCloseable(this.closeableRegistry)) {
            try {
                this.closeableRegistry.close();
            }
            catch (IOException e2) {
                LOG.warn("Could not properly close io streams.", (Throwable)e2);
            }
        }
    }

    void releaseResources(boolean canceled) {
        this.dbLease.close();
        if (this.stateBackend.getCancelStreamRegistry().unregisterCloseable(this.closeableRegistry)) {
            try {
                this.closeableRegistry.close();
            }
            catch (IOException e2) {
                LOG.warn("Exception on closing registry.", (Throwable)e2);
            }
        }
        try {
            if (this.localBackupDirectory.exists()) {
                LOG.trace("Running cleanup for local RocksDB backup directory {}.", (Object)this.localBackupDirectory);
                boolean cleanupOk = this.localBackupDirectory.cleanup();
                if (!cleanupOk) {
                    LOG.debug("Could not properly cleanup local RocksDB backup directory.");
                }
            }
        }
        catch (IOException e3) {
            LOG.warn("Could not properly cleanup local RocksDB backup directory.", (Throwable)e3);
        }
        if (canceled) {
            ArrayList<Object> statesToDiscard = new ArrayList<Object>(1 + this.miscFiles.size() + this.sstFiles.size());
            statesToDiscard.add(this.metaStateHandle);
            statesToDiscard.addAll(this.miscFiles.values());
            statesToDiscard.addAll(this.sstFiles.values().stream().map(t -> (StreamStateHandle)t.f1).collect(Collectors.toSet()));
            try {
                StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
            }
            catch (Exception e4) {
                LOG.warn("Could not properly discard states.", (Throwable)e4);
            }
            if (this.localBackupDirectory.isSnapshotCompleted()) {
                try {
                    DirectoryStateHandle directoryStateHandle = this.localBackupDirectory.completeSnapshotAndGetHandle();
                    if (directoryStateHandle != null) {
                        directoryStateHandle.discardState();
                    }
                }
                catch (Exception e5) {
                    LOG.warn("Could not properly discard local state.", (Throwable)e5);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private StreamStateHandle materializeStateData(Path filePath) throws Exception {
        InputStream inputStream = null;
        CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
        try {
            int numBytes;
            byte[] buffer = new byte[8192];
            FileSystem backupFileSystem = this.localBackupDirectory.getFileSystem();
            inputStream = backupFileSystem.open(filePath);
            this.closeableRegistry.registerCloseable(inputStream);
            outputStream = this.checkpointStreamFactory.createCheckpointStateOutputStream(this.checkpointId, CheckpointedStateScope.SHARED);
            this.closeableRegistry.registerCloseable(outputStream);
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            StreamStateHandle result = null;
            if (this.closeableRegistry.unregisterCloseable(outputStream)) {
                result = outputStream.closeAndGetHandle();
                outputStream = null;
            }
            StreamStateHandle streamStateHandle = result;
            return streamStateHandle;
        }
        finally {
            if (this.closeableRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            if (this.closeableRegistry.unregisterCloseable(outputStream)) {
                outputStream.close();
            }
        }
    }

    private SnapshotResult<StreamStateHandle> materializeMetaData() throws Exception {
        LocalRecoveryConfig localRecoveryConfig = this.stateBackend.localRecoveryConfig;
        CheckpointStreamWithResultProvider streamWithResultProvider = localRecoveryConfig.isLocalRecoveryEnabled() ? CheckpointStreamWithResultProvider.createDuplicatingStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory, (LocalRecoveryDirectoryProvider)localRecoveryConfig.getLocalStateDirectoryProvider()) : CheckpointStreamWithResultProvider.createSimpleStream((long)this.checkpointId, (CheckpointedStateScope)CheckpointedStateScope.EXCLUSIVE, (CheckpointStreamFactory)this.checkpointStreamFactory);
        try {
            this.closeableRegistry.registerCloseable(streamWithResultProvider);
            CheckpointStreamFactory.CheckpointStateOutputStream outputStream = streamWithResultProvider.getCheckpointOutputStream();
            DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper((OutputStream)outputStream);
            RocksDBInternalBackendSerializationProxy backendSerializationProxy = new RocksDBInternalBackendSerializationProxy(this.keyedStateMetaInfos, this.subKeyedStateMetaInfos, this.priorityQueueMetaInfos, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.stateBackend.getKeyGroupCompressionDecorator()));
            backendSerializationProxy.write(outputView);
            if (this.closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
                SnapshotResult resultStateHandle = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                streamWithResultProvider = null;
                SnapshotResult snapshotResult = resultStateHandle;
                return snapshotResult;
            }
            throw new IOException("Stream already closed and cannot return a handle.");
        }
        finally {
            if (streamWithResultProvider != null && this.closeableRegistry.unregisterCloseable(streamWithResultProvider)) {
                IOUtils.closeQuietly((AutoCloseable)streamWithResultProvider);
            }
        }
    }
}

