/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.restore;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GRegionIDImpl;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexHashImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GeminiRestoreOperation {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiRestoreOperation.class);
    private final GContext context;
    private final GConfiguration configuration;
    private final Supervisor supervisor;
    private final CloseableRegistry closeableRegistry;

    public GeminiRestoreOperation(GContext context, GConfiguration configuration, CloseableRegistry closeableRegistry) {
        this.context = context;
        this.configuration = configuration;
        this.supervisor = this.context.getSupervisor();
        this.closeableRegistry = closeableRegistry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restore(Collection<DBSnapshotMeta> snapshotMetas, Map<String, GTable> restoredTables, int startRegionId, int endRegionId) throws Exception {
        long startTime = System.currentTimeMillis();
        MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMappingSerializer = this.getFileMappingSerializer();
        long restoredCheckpointId = 0L;
        HashSet<Object> restoredDfsBasePaths = new HashSet<Object>();
        HashSet<Object> restoredLocalBasePaths = new HashSet<Object>();
        HashMap<Integer, FileMeta.RestoredFileMeta> restoredDfsFileMetas = new HashMap<Integer, FileMeta.RestoredFileMeta>();
        HashMap<Integer, FileMeta.RestoredFileMeta> restoredLocalFileMetas = new HashMap<Integer, FileMeta.RestoredFileMeta>();
        HashMap<Integer, Map<Integer, Tuple2<Integer, Long>>> dfsFileMapping = new HashMap<Integer, Map<Integer, Tuple2<Integer, Long>>>();
        HashMap<Integer, Map<Integer, Tuple2<Integer, Long>>> localFileMapping = new HashMap<Integer, Map<Integer, Tuple2<Integer, Long>>>();
        LOG.info("Start to restore from snapshot for GeminiDB, snapshotMetas {}, tables {}, region from {} to {}.", new Object[]{snapshotMetas, restoredTables, startRegionId, endRegionId});
        boolean needToBreakLineage = false;
        boolean needToFetchFiles = this.configuration.isEnableRestorePreFetch();
        for (DBSnapshotMeta meta : snapshotMetas) {
            restoredCheckpointId = meta.getCheckPointId();
            long[] regionOffsets = meta.getRegionOffsets();
            int metaStartRegionId = meta.getStartRegionId();
            int metaEndRegionId = meta.getEndRegionId();
            int startGroup = Math.max(startRegionId, metaStartRegionId);
            int endGroup = Math.min(endRegionId, metaEndRegionId);
            if (!needToBreakLineage && !this.supervisor.getSnapshotManager().getNameSpace().equals(meta.getNameSpace())) {
                needToBreakLineage = true;
                LOG.info("Current name space {} differs from previous snapshot's name space {}, enforce the first snapshot to flush all pages out.", (Object)this.supervisor.getSnapshotManager().getNameSpace(), (Object)meta.getNameSpace());
            }
            Preconditions.checkArgument((startGroup <= endGroup ? 1 : 0) != 0, (Object)String.format("Useless meta (key-group range [%s, %s]) should not be restored within this state-backend (key-group range [%s, %s])", metaStartRegionId, metaEndRegionId, startRegionId, endRegionId));
            SnapshotMetaFile.Reader reader = SnapshotMetaFile.getReader(new Path(meta.getSnapshotMetaPath()));
            Throwable throwable = null;
            try {
                int tableSize = reader.readInt();
                reader.seek(regionOffsets[startGroup - metaStartRegionId]);
                for (int group = startGroup; group <= endGroup; ++group) {
                    int writtenKeyGroupIndex = reader.readInt();
                    Preconditions.checkState((writtenKeyGroupIndex == group ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
                    for (int i = 0; i < tableSize; ++i) {
                        String tableName = reader.readUTF();
                        int expectedRegions = reader.readInt();
                        for (int j = 0; j < expectedRegions; ++j) {
                            int regionCode = reader.readInt();
                            GRegionIDImpl gRegionId = new GRegionIDImpl(regionCode);
                            int regionId = gRegionId.getId();
                            long lastSeqId = reader.readLong();
                            long removeAllSeqId = reader.readLong();
                            GTable table = restoredTables.get(tableName);
                            GRegionContext regionContext = new GRegionContext(this.context, tableName, gRegionId, table.getTableDescription().getPageSerde(), lastSeqId, removeAllSeqId);
                            PageIndex pageIndex = new PageIndexHashImpl.Builder(reader, regionContext).build();
                            if (gRegionId.getIndexID() == 0) {
                                table.setRegion(regionId, table.getTableDescription().createRegion(this.context, table, gRegionId, pageIndex));
                                continue;
                            }
                            table.setIndexRegion(regionId, table.getIndexDescription().createRegion(this.context, table, gRegionId, pageIndex));
                        }
                    }
                }
                reader.seek(meta.getDfsFileMappingOffset());
                Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> dfsFileMappingAndPath = this.restoreFileMapping(reader, restoredDfsFileMetas, fileMappingSerializer);
                Preconditions.checkNotNull(dfsFileMappingAndPath, (String)"dfs meta should always has file mapping");
                if (dfsFileMappingAndPath.f0 != null) {
                    this.loadFromRestoredFileMapping((Map)dfsFileMappingAndPath.f0, dfsFileMapping);
                }
                restoredDfsBasePaths.add(dfsFileMappingAndPath.f1);
                Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> localFileMappingAndPath = this.restoreFileMapping(reader, restoredLocalFileMetas, fileMappingSerializer);
                if (localFileMappingAndPath == null) continue;
                needToFetchFiles = false;
                if (localFileMappingAndPath.f0 != null) {
                    this.loadFromRestoredFileMapping((Map)localFileMappingAndPath.f0, localFileMapping);
                }
                restoredLocalBasePaths.add(localFileMappingAndPath.f1);
            }
            catch (Throwable tableSize) {
                throwable = tableSize;
                throw tableSize;
            }
            finally {
                if (reader == null) continue;
                if (throwable != null) {
                    try {
                        reader.close();
                    }
                    catch (Throwable tableSize) {
                        throwable.addSuppressed(tableSize);
                    }
                    continue;
                }
                reader.close();
            }
        }
        Preconditions.checkState((!restoredDfsBasePaths.isEmpty() ? 1 : 0) != 0, (Object)"restored dfs base path should not be empty.");
        SnapshotManager snapshotManager = this.supervisor.getSnapshotManager();
        snapshotManager.setNeedToBreakLineage(needToBreakLineage);
        HashMap<Integer, FileMeta.RestoredFileMeta> dbUsedFileMeta = dfsFileMapping.size() == 0 ? new HashMap<Integer, FileMeta.RestoredFileMeta>() : this.getRestoredFileMetaUsedByDB(restoredDfsFileMetas, dfsFileMapping, startRegionId, endRegionId, !needToBreakLineage);
        FileManager dfsFileManager = this.supervisor.getDfsFileManager();
        Map<Long, SnapshotManager.RestoredSnapshot> restoredSnapshots = snapshotManager.restore(restoredCheckpointId, this.getMappingForFileIdToPath(dbUsedFileMeta), (String)restoredDfsBasePaths.iterator().next());
        dfsFileManager.restore(this.mergeDbAndSnapshotFileMeta(dbUsedFileMeta, restoredSnapshots, !needToBreakLineage));
        FileManager localFileManager = this.supervisor.getLocalFileManager();
        Path localBasePath = localFileManager.getBasePath();
        File localBaseFile = new File(localBasePath.getPath());
        if (localFileMapping.size() > 0) {
            this.createHardLinkForRestoredLocalFile((String)restoredLocalBasePaths.iterator().next(), restoredLocalFileMetas, localBasePath);
            localFileManager.restore(this.getRestoredFileMetaUsedByDB(restoredLocalFileMetas, localFileMapping, startRegionId, endRegionId, true));
        }
        LOG.info("Restored successfully from {} for {}, region from {} to {}, consumed {} ms.", new Object[]{snapshotMetas, restoredTables, startRegionId, endRegionId, System.currentTimeMillis() - startTime});
        if (needToFetchFiles) {
            long startToFetchTime = System.currentTimeMillis();
            boolean success = false;
            int threadNum = this.configuration.getFetchFilesThreadNum();
            try {
                HashMap<String, Path> remoteToLocalFilePaths = new HashMap<String, Path>(dbUsedFileMeta.size());
                HashMap<Integer, FileMeta.RestoredFileMeta> dbUsedLocalFileMeta = new HashMap<Integer, FileMeta.RestoredFileMeta>(dbUsedFileMeta.size());
                for (Map.Entry entry : dbUsedFileMeta.entrySet()) {
                    FileMeta.RestoredFileMeta fileMeta = (FileMeta.RestoredFileMeta)entry.getValue();
                    String remotePath = fileMeta.filePath;
                    Path localPath = new Path(localBasePath, this.extractFileName(remotePath));
                    remoteToLocalFilePaths.put(remotePath, localPath);
                    dbUsedLocalFileMeta.put((Integer)entry.getKey(), FileMeta.RestoredFileMeta.of(fileMeta.id, localPath.toString(), fileMeta.fileSize, fileMeta.dataSize, fileMeta.dbReference, 0, true));
                }
                this.downloadDataForAllFiles(remoteToLocalFilePaths, threadNum, this.closeableRegistry);
                localFileManager.restore(dbUsedLocalFileMeta);
                for (GTable gTable : this.supervisor.getAllTables().values()) {
                    Iterator<GRegion> iterator = gTable.regionIterator();
                    while (iterator.hasNext()) {
                        GRegion region = iterator.next();
                        Iterator<PageAddress> pageAddressIterator = region.getPageStore().getPageIndex().pageIterator();
                        while (pageAddressIterator.hasNext()) {
                            PageAddress pageAddress = pageAddressIterator.next();
                            long dfsAddress = pageAddress.getDfsAddress();
                            pageAddress.setLocalAddress(dfsAddress);
                            pageAddress.setLocalStatus(true);
                        }
                    }
                }
                success = true;
            }
            finally {
                if (!success) {
                    FileStatus[] fileStatuses = localBasePath.getFileSystem().listStatus(localBasePath);
                    if (fileStatuses != null) {
                        LOG.info("Cleaning local downloaded files when not successful to restore.");
                        FileUtils.deleteDirectoryQuietly((File)localBaseFile);
                    }
                    LOG.info("Failed to fetch files to local or re-construct the local address, consumed {} ms.", (Object)(System.currentTimeMillis() - startToFetchTime));
                } else {
                    LOG.info("Successfully fetch files to local and re-construct the local address, consumed {} ms.", (Object)(System.currentTimeMillis() - startToFetchTime));
                }
            }
        }
    }

    private void downloadDataForAllFiles(Map<String, Path> remoteFilePaths, int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception {
        ExecutorService executorService = restoringThreadNum > 1 ? Executors.newFixedThreadPool(restoringThreadNum) : org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService();
        try {
            List<Runnable> runnables = this.createDownloadRunnables(remoteFilePaths, closeableRegistry);
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(runnables.size());
            for (Runnable runnable : runnables) {
                futures.add(CompletableFuture.runAsync(runnable, executorService));
            }
            FutureUtils.waitForAll(futures).get();
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            throwable = ExceptionUtils.stripException((Throwable)throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException((Throwable)e);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    private List<Runnable> createDownloadRunnables(Map<String, Path> remoteFilePaths, CloseableRegistry closeableRegistry) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>(remoteFilePaths.size());
        for (Map.Entry<String, Path> entry : remoteFilePaths.entrySet()) {
            runnables.add(ThrowingRunnable.unchecked(() -> this.downloadData(new Path((String)entry.getKey()), (Path)entry.getValue(), closeableRegistry)));
        }
        return runnables;
    }

    private String extractFileName(String filePath) throws IOException {
        int lastIndexOfSeparator = filePath.lastIndexOf("/");
        String fileName = filePath.substring(lastIndexOfSeparator + 1);
        if (StringUtils.isNullOrWhitespaceOnly((String)fileName)) {
            throw new IOException("Fail to extract file name from given file path " + filePath);
        }
        return fileName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void downloadData(Path remoteFilePath, Path restoreFilePath, CloseableRegistry closeableRegistry) throws IOException {
        FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try {
            int numBytes;
            inputStream = remoteFilePath.getFileSystem().open(remoteFilePath);
            closeableRegistry.registerCloseable((Closeable)inputStream);
            outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
            closeableRegistry.registerCloseable((Closeable)outputStream);
            byte[] buffer = new byte[65536];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
        }
        finally {
            if (closeableRegistry.unregisterCloseable((Closeable)inputStream)) {
                inputStream.close();
            }
            if (closeableRegistry.unregisterCloseable((Closeable)outputStream)) {
                outputStream.close();
            }
        }
    }

    private MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> getFileMappingSerializer() {
        TupleSerializer tuple2Serializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, LongSerializer.INSTANCE});
        MapSerializer groupMapSerializer = new MapSerializer((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)tuple2Serializer);
        return new MapSerializer((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)groupMapSerializer);
    }

    private Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> restoreFileMapping(SnapshotMetaFile.Reader reader, Map<Integer, FileMeta.RestoredFileMeta> restoredFileMetas, MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMappingSerializer) throws IOException {
        boolean hasFileMapping = reader.readBoolean();
        if (hasFileMapping) {
            int fileMappingSize = reader.readInt();
            String restoredBasePath = reader.readUTF();
            if (fileMappingSize > 0) {
                for (int i = 0; i < fileMappingSize; ++i) {
                    String filePath = reader.readUTF();
                    int id = reader.readInt();
                    long fileSize = reader.readLong();
                    restoredFileMetas.put(id, FileMeta.RestoredFileMeta.of(id, filePath, fileSize));
                }
                DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper((InputStream)((Object)reader));
                return Tuple2.of((Object)fileMappingSerializer.deserialize((DataInputView)dataInputView), (Object)restoredBasePath);
            }
            return Tuple2.of(null, (Object)restoredBasePath);
        }
        return null;
    }

    private void loadFromRestoredFileMapping(Map<Integer, Map<Integer, Tuple2<Integer, Long>>> restoredFileMapping, Map<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMapping) {
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> mapEntry : restoredFileMapping.entrySet()) {
            Map<Integer, Tuple2<Integer, Long>> map = fileMapping.get(mapEntry.getKey());
            if (map == null) {
                fileMapping.put(mapEntry.getKey(), mapEntry.getValue());
                continue;
            }
            map.putAll(mapEntry.getValue());
        }
    }

    private Map<Integer, FileMeta.RestoredFileMeta> getRestoredFileMetaUsedByDB(Map<Integer, FileMeta.RestoredFileMeta> restoredFileMetas, Map<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMapping, int startRegionId, int endRegionId, boolean canDeleteFile) {
        HashMap<Integer, FileMeta.RestoredFileMeta> resultFileMetas = new HashMap<Integer, FileMeta.RestoredFileMeta>(restoredFileMetas.size());
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> entry : fileMapping.entrySet()) {
            int fileId = entry.getKey();
            FileMeta.RestoredFileMeta fileMeta = restoredFileMetas.get(fileId);
            long dataSize = 0L;
            int dbReference = 0;
            Map<Integer, Tuple2<Integer, Long>> groupInfo = entry.getValue();
            for (Map.Entry<Integer, Tuple2<Integer, Long>> e : groupInfo.entrySet()) {
                if (e.getKey() < startRegionId || e.getKey() > endRegionId) continue;
                dbReference += ((Integer)e.getValue().f0).intValue();
                dataSize += ((Long)e.getValue().f1).longValue();
            }
            if (dbReference <= 0) continue;
            fileMeta.setUsedDataSizeAndReference(dataSize, dbReference, 0, canDeleteFile);
            resultFileMetas.put(fileId, fileMeta);
        }
        return resultFileMetas;
    }

    private Map<Integer, FileMeta.RestoredFileMeta> mergeDbAndSnapshotFileMeta(Map<Integer, FileMeta.RestoredFileMeta> dbUsedFileMeta, Map<Long, SnapshotManager.RestoredSnapshot> restoredSnapshots, boolean canDeleteFile) {
        HashMap<Integer, FileMeta.RestoredFileMeta> mergedFileMetas = new HashMap<Integer, FileMeta.RestoredFileMeta>(dbUsedFileMeta);
        for (SnapshotManager.RestoredSnapshot restoredSnapshot : restoredSnapshots.values()) {
            Map<Integer, String> fileMapping = restoredSnapshot.getFileMapping();
            for (Map.Entry<Integer, String> entry : fileMapping.entrySet()) {
                int fileId = entry.getKey();
                String path = entry.getValue();
                FileMeta.RestoredFileMeta fileMeta = (FileMeta.RestoredFileMeta)mergedFileMetas.get(fileId);
                if (fileMeta != null) {
                    ++fileMeta.snapshotReference;
                    continue;
                }
                fileMeta = FileMeta.RestoredFileMeta.of(fileId, path, 0L, 0L, 0, 1, canDeleteFile);
                mergedFileMetas.put(fileId, fileMeta);
            }
        }
        return mergedFileMetas;
    }

    private Map<Integer, String> getMappingForFileIdToPath(Map<Integer, FileMeta.RestoredFileMeta> restoredFileMetas) {
        HashMap<Integer, String> mapping = new HashMap<Integer, String>();
        restoredFileMetas.forEach((k, v) -> mapping.put((Integer)k, v.filePath));
        return mapping;
    }

    private void createHardLinkForRestoredLocalFile(String restoredLocalBasePath, Map<Integer, FileMeta.RestoredFileMeta> restoredFileMetas, Path workingBasePath) throws Exception {
        File restoredLocalBaseDir = new File(new Path(restoredLocalBasePath).toUri().getPath());
        File workingBaseDir = new File(workingBasePath.toUri().getPath());
        if (workingBaseDir.exists()) {
            FileUtils.deleteDirectory((File)workingBaseDir);
        }
        if (!workingBaseDir.mkdirs()) {
            throw new IOException("Local working directory " + workingBaseDir + " already exists");
        }
        for (Map.Entry<Integer, FileMeta.RestoredFileMeta> entry : restoredFileMetas.entrySet()) {
            FileMeta.RestoredFileMeta fileMeta = entry.getValue();
            String fileName = fileMeta.filePath;
            File src = new File(restoredLocalBaseDir, fileName);
            File target = new File(workingBaseDir, fileName);
            try {
                Files.createLink(target.toPath(), src.toPath());
            }
            catch (Exception e) {
                LOG.error("Fail to create hard link from {} to {}.", new Object[]{src.getAbsolutePath(), target.getAbsolutePath(), e});
                throw e;
            }
            fileMeta.filePath = target.getAbsolutePath();
        }
    }
}

