/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBInternalStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.IncrementalLocalKeyedStateSnapshot;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBIncrementalRestoreOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final CloseableRegistry closeableRegistry = new CloseableRegistry();

    RocksDBIncrementalRestoreOperation(RocksDBInternalStateBackend stateBackend) {
        this.stateBackend = stateBackend;
    }

    void restore(Collection<KeyedStateHandle> restoredSnapshots) throws Exception {
        boolean hasExtraKeys;
        boolean bl = hasExtraKeys = restoredSnapshots.size() > 1 || !Objects.equals(restoredSnapshots.iterator().next().getKeyGroupRange(), this.stateBackend.getKeyGroupRange());
        if (hasExtraKeys) {
            long startMillis = System.currentTimeMillis();
            for (KeyedStateHandle rawStateSnapshot : restoredSnapshots) {
                if (!(rawStateSnapshot instanceof IncrementalKeyedStateSnapshot)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + IncrementalKeyedStateSnapshot.class + ", but found: " + rawStateSnapshot.getClass());
                }
                IncrementalKeyedStateSnapshot stateSnapshot = (IncrementalKeyedStateSnapshot)rawStateSnapshot;
                Path temporaryRestoreInstancePath = this.stateBackend.getLocalRestorePath(this.stateBackend.getKeyGroupRange());
                this.restoreFragmentedTabletInstance(stateSnapshot, temporaryRestoreInstancePath);
            }
            long endMills = System.currentTimeMillis();
            LOG.info("Restore Fragmented Tablet using {} ms", (Object)(endMills - startMillis));
        } else {
            this.restoreIntegratedTabletInstance(restoredSnapshots.iterator().next());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreIntegratedTabletInstance(KeyedStateHandle rawStateSnapshot) throws Exception {
        long startMills = System.currentTimeMillis();
        Map sstFiles = null;
        StreamStateHandle metaStateHandle = null;
        long checkpointID = -1L;
        Path localDataPath = new Path(this.stateBackend.getInstanceRocksDBPath().getAbsolutePath());
        FileSystem localFileSystem = localDataPath.getFileSystem();
        try {
            IncrementalKeyedStateSnapshot restoredStateSnapshot;
            if (rawStateSnapshot instanceof IncrementalKeyedStateSnapshot) {
                LOG.info("Restoring from the remote file system.");
                restoredStateSnapshot = (IncrementalKeyedStateSnapshot)rawStateSnapshot;
                metaStateHandle = restoredStateSnapshot.getMetaStateHandle();
                Map sharedStateHandle = restoredStateSnapshot.getSharedState();
                for (Map.Entry sharedStateHandleEntry : sharedStateHandle.entrySet()) {
                    StateHandleID stateHandleID = (StateHandleID)sharedStateHandleEntry.getKey();
                    String fileName = stateHandleID.getKeyString();
                    StreamStateHandle stateHandle = (StreamStateHandle)((Tuple2)sharedStateHandleEntry.getValue()).f1;
                    this.restoreFile(localDataPath, fileName, stateHandle);
                }
                for (Map.Entry privateStateHandleEntry : restoredStateSnapshot.getPrivateState().entrySet()) {
                    String string = ((StateHandleID)privateStateHandleEntry.getKey()).getKeyString();
                    StreamStateHandle stateHandle = (StreamStateHandle)privateStateHandleEntry.getValue();
                    this.restoreFile(localDataPath, string, stateHandle);
                }
                sstFiles = restoredStateSnapshot.getSharedState();
                checkpointID = restoredStateSnapshot.getCheckpointId();
            } else if (rawStateSnapshot instanceof IncrementalLocalKeyedStateSnapshot) {
                restoredStateSnapshot = (IncrementalLocalKeyedStateSnapshot)rawStateSnapshot;
                LOG.info("Restoring from local recovery path {}.", (Object)restoredStateSnapshot.getDirectoryStateHandle().getDirectory());
                sstFiles = restoredStateSnapshot.getSharedStateHandles();
                metaStateHandle = restoredStateSnapshot.getMetaStateHandle();
                Path localRecoverDirectory = restoredStateSnapshot.getDirectoryStateHandle().getDirectory();
                FileStatus[] fileStatuses = localFileSystem.listStatus(localRecoverDirectory);
                if (!localFileSystem.mkdirs(localDataPath)) {
                    throw new IOException("Cannot create local base path for RocksDB.");
                }
                if (fileStatuses == null) {
                    throw new IOException("Cannot list file statues. Local recovery directory " + localRecoverDirectory + " does not exist.");
                }
                for (FileStatus fileStatus : fileStatuses) {
                    String fileName = fileStatus.getPath().getName();
                    File restoreFile = new File(localRecoverDirectory.getPath(), fileName);
                    File targetFile = new File(localDataPath.getPath(), fileName);
                    Files.createLink(targetFile.toPath(), restoreFile.toPath());
                }
                checkpointID = restoredStateSnapshot.getCheckpointId();
            }
        }
        catch (Exception e2) {
            LOG.info("Fail to restore rocksDB instance at {}, and try to remove it if existed.", (Object)localDataPath);
            try {
                if (localFileSystem.exists(localDataPath)) {
                    localFileSystem.delete(localDataPath, true);
                }
            }
            catch (IOException e1) {
                LOG.warn("Fail to remove local data path {} after restore operation failure.", (Object)localDataPath);
            }
            throw e2;
        }
        this.restoreMetaData(metaStateHandle);
        int cfLength = 1 + this.stateBackend.getRegisteredStateMetaInfos().size();
        ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(cfLength);
        ArrayList<String> descriptorNames = new ArrayList<String>(cfLength);
        columnFamilyDescriptors.add(this.stateBackend.getDefaultColumnFamilyDescriptor());
        descriptorNames.add(this.stateBackend.getDefaultColumnFamilyName());
        for (Map.Entry entry : this.stateBackend.getRegisteredStateMetaInfos().entrySet()) {
            String stateName = (String)entry.getKey();
            columnFamilyDescriptors.add(this.stateBackend.createColumnFamilyDescriptor(stateName));
            descriptorNames.add(stateName);
        }
        this.stateBackend.createDBWithColumnFamily(columnFamilyDescriptors, descriptorNames);
        this.stateBackend.registerAllStates();
        SortedMap<Long, Map<StateHandleID, Tuple2<String, StreamStateHandle>>> sortedMap = this.stateBackend.materializedSstFiles;
        synchronized (sortedMap) {
            this.stateBackend.materializedSstFiles.put(checkpointID, sstFiles);
        }
        this.stateBackend.lastCompletedCheckpointId = checkpointID;
        long endMills = System.currentTimeMillis();
        LOG.info("Restore Integrated Tablet using {} ms.", (Object)(endMills - startMills));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreFile(Path localRestorePath, String fileName, StreamStateHandle restoreStateHandle) throws IOException {
        Path localFilePath = new Path(localRestorePath, fileName);
        FileSystem localFileSystem = localFilePath.getFileSystem();
        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try {
            int numBytes;
            long startMillis = System.currentTimeMillis();
            inputStream = restoreStateHandle.openInputStream();
            this.closeableRegistry.registerCloseable(inputStream);
            outputStream = localFileSystem.create(localFilePath, FileSystem.WriteMode.OVERWRITE);
            this.closeableRegistry.registerCloseable(outputStream);
            byte[] buffer = new byte[65536];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
            long endMillis = System.currentTimeMillis();
            LOG.debug("Successfully restored file {} from {}, {} bytes, {} ms", new Object[]{localFilePath, restoreStateHandle, restoreStateHandle.getStateSize(), endMillis - startMillis});
            outputStream.close();
            this.closeableRegistry.unregisterCloseable(outputStream);
            outputStream = null;
            inputStream.close();
            this.closeableRegistry.unregisterCloseable(inputStream);
            inputStream = null;
        }
        finally {
            if (inputStream != null) {
                inputStream.close();
                this.closeableRegistry.unregisterCloseable(inputStream);
            }
            if (outputStream != null) {
                outputStream.close();
                this.closeableRegistry.unregisterCloseable(outputStream);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreMetaData(StreamStateHandle metaStateDatum) throws Exception {
        FSDataInputStream inputStream = null;
        try {
            inputStream = metaStateDatum.openInputStream();
            this.closeableRegistry.registerCloseable(inputStream);
            DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
            InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(this.stateBackend.getUserClassLoader(), false);
            serializationProxy.read((DataInputView)inputView);
            List keyedStateMetaInfos = serializationProxy.getKeyedStateMetaSnapshots();
            for (StateMetaInfoSnapshot keyedStateMetaSnapshot : keyedStateMetaInfos) {
                String stateName = keyedStateMetaSnapshot.getName();
                this.stateBackend.getRestoredKvStateMetaInfos().put(stateName, keyedStateMetaSnapshot);
                RegisteredStateMetaInfo keyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo((StateMetaInfoSnapshot)keyedStateMetaSnapshot);
                this.stateBackend.getRegisteredStateMetaInfos().put(stateName, keyedStateMetaInfo);
            }
            List subKeyedStateMetaInfos = serializationProxy.getSubKeyedStateMetaSnapshots();
            for (StateMetaInfoSnapshot subKeyedStateMetaSnapshot : subKeyedStateMetaInfos) {
                String stateName = subKeyedStateMetaSnapshot.getName();
                this.stateBackend.getRestoredKvStateMetaInfos().put(subKeyedStateMetaSnapshot.getName(), subKeyedStateMetaSnapshot);
                RegisteredStateMetaInfo subKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo((StateMetaInfoSnapshot)subKeyedStateMetaSnapshot);
                this.stateBackend.getRegisteredStateMetaInfos().put(stateName, subKeyedStateMetaInfo);
            }
            inputStream.close();
            this.closeableRegistry.unregisterCloseable(inputStream);
            inputStream = null;
        }
        finally {
            if (inputStream != null) {
                inputStream.close();
                this.closeableRegistry.unregisterCloseable(inputStream);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreFragmentedTabletInstance(IncrementalKeyedStateSnapshot stateSnapshot, Path localRestorePath) throws Exception {
        FileSystem localFileSystem = localRestorePath.getFileSystem();
        if (localFileSystem.exists(localRestorePath)) {
            localFileSystem.delete(localRestorePath, true);
        }
        localFileSystem.mkdirs(localRestorePath);
        try {
            this.transferAllStateDataToDirectory(stateSnapshot, localRestorePath);
            this.restoreMetaData(stateSnapshot.getMetaStateHandle());
            int cfSize = 1 + this.stateBackend.getRegisteredStateMetaInfos().size();
            ArrayList<String> cfName = new ArrayList<String>(cfSize);
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<ColumnFamilyDescriptor>(cfSize);
            columnFamilyDescriptors.add(this.stateBackend.getDefaultColumnFamilyDescriptor());
            cfName.add(this.stateBackend.getDefaultColumnFamilyName());
            for (Map.Entry stateMetaInfoEntry : this.stateBackend.getRegisteredStateMetaInfos().entrySet()) {
                String stateName = (String)stateMetaInfoEntry.getKey();
                columnFamilyDescriptors.add(this.stateBackend.createColumnFamilyDescriptor(stateName));
                cfName.add(stateName);
            }
            ArrayList columnFamilyHandles = new ArrayList(cfSize);
            try (RocksDB db = RocksDB.open((String)localRestorePath.getPath(), columnFamilyDescriptors, columnFamilyHandles);
                 RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.stateBackend.getDbInstance());){
                ColumnFamilyHandle defaultColumnFamily = (ColumnFamilyHandle)columnFamilyHandles.get(0);
                Preconditions.checkState(columnFamilyHandles.size() == columnFamilyDescriptors.size());
                try {
                    ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(2);
                    int startGroup = this.stateBackend.getKeyGroupRange().getIntersection(stateSnapshot.getKeyGroupRange()).getStartKeyGroup();
                    StateSerializerUtil.writeGroup((ByteArrayOutputStreamWithPos)outputStream, (int)startGroup);
                    byte[] startGroupBytes = outputStream.toByteArray();
                    block35: for (int i = 1; i < columnFamilyDescriptors.size(); ++i) {
                        ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)columnFamilyHandles.get(i);
                        ColumnFamilyHandle targetFamilyHandle = this.stateBackend.getOrCreateColumnFamily((String)cfName.get(i));
                        try (RocksIterator iterator = db.newIterator(columnFamilyHandle);){
                            iterator.seek(startGroupBytes);
                            while (iterator.isValid()) {
                                int keyGroup = StateSerializerUtil.getGroupFromSerializedKey((byte[])iterator.key());
                                if (!this.stateBackend.getKeyGroupRange().contains(keyGroup)) continue block35;
                                writeBatchWrapper.put(targetFamilyHandle, iterator.key(), iterator.value());
                                iterator.next();
                            }
                            continue;
                        }
                    }
                }
                finally {
                    IOUtils.closeQuietly((AutoCloseable)defaultColumnFamily);
                    for (ColumnFamilyHandle flinkColumnFamilyHandle : columnFamilyHandles) {
                        IOUtils.closeQuietly((AutoCloseable)flinkColumnFamilyHandle);
                    }
                }
            }
        }
        catch (Exception e2) {
            if (localFileSystem.exists(localRestorePath)) {
                try {
                    localFileSystem.delete(localRestorePath, true);
                }
                catch (IOException e1) {
                    LOG.warn("Delete local path failed.", (Throwable)e2);
                }
            }
            throw e2;
        }
    }

    private void transferAllStateDataToDirectory(IncrementalKeyedStateSnapshot stateSnapshot, Path localRestorePath) throws IOException {
        Map sharedState = stateSnapshot.getSharedState();
        for (Map.Entry stateHandleEntry : sharedState.entrySet()) {
            String stateName = ((StateHandleID)stateHandleEntry.getKey()).getKeyString();
            StreamStateHandle stateHandle = (StreamStateHandle)((Tuple2)stateHandleEntry.getValue()).f1;
            this.restoreFile(localRestorePath, stateName, stateHandle);
        }
        Map privateState = stateSnapshot.getPrivateState();
        for (Map.Entry privateFileEntry : privateState.entrySet()) {
            String privateFileName = ((StateHandleID)privateFileEntry.getKey()).getKeyString();
            StreamStateHandle privateFileStateHandle = (StreamStateHandle)privateFileEntry.getValue();
            this.restoreFile(localRestorePath, privateFileName, privateFileStateHandle);
        }
    }
}

