package org.apache.flink.runtime.state.gemini.engine.restore;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GRegionIDImpl;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileMeta;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndexHashImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.DBSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotMetaFile;
import org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/restore/GeminiRestoreOperation.class */
public class GeminiRestoreOperation {
    private static final Logger LOG = LoggerFactory.getLogger(GeminiRestoreOperation.class);
    private final GContext context;
    private final GConfiguration configuration;
    private final Supervisor supervisor;
    private final CloseableRegistry closeableRegistry;

    public GeminiRestoreOperation(GContext gContext, GConfiguration gConfiguration, CloseableRegistry closeableRegistry) {
        this.context = gContext;
        this.configuration = gConfiguration;
        this.supervisor = this.context.getSupervisor();
        this.closeableRegistry = closeableRegistry;
    }

    public void restore(Collection<DBSnapshotMeta> collection, Map<String, GTable> map, int i, int i2) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> fileMappingSerializer = getFileMappingSerializer();
        long j = 0;
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        LOG.info("Start to restore from snapshot for GeminiDB, snapshotMetas {}, tables {}, region from {} to {}.", new Object[]{collection, map, Integer.valueOf(i), Integer.valueOf(i2)});
        boolean z = false;
        boolean isEnableRestorePreFetch = this.configuration.isEnableRestorePreFetch();
        for (DBSnapshotMeta dBSnapshotMeta : collection) {
            j = dBSnapshotMeta.getCheckPointId();
            long[] regionOffsets = dBSnapshotMeta.getRegionOffsets();
            int startRegionId = dBSnapshotMeta.getStartRegionId();
            int endRegionId = dBSnapshotMeta.getEndRegionId();
            int max = Math.max(i, startRegionId);
            int min = Math.min(i2, endRegionId);
            if (!z && !this.supervisor.getSnapshotManager().getNameSpace().equals(dBSnapshotMeta.getNameSpace())) {
                z = true;
                LOG.info("Current name space {} differs from previous snapshot's name space {}, enforce the first snapshot to flush all pages out.", this.supervisor.getSnapshotManager().getNameSpace(), dBSnapshotMeta.getNameSpace());
            }
            Preconditions.checkArgument(max <= min, String.format("Useless meta (key-group range [%s, %s]) should not be restored within this state-backend (key-group range [%s, %s])", Integer.valueOf(startRegionId), Integer.valueOf(endRegionId), Integer.valueOf(i), Integer.valueOf(i2)));
            SnapshotMetaFile.Reader reader = SnapshotMetaFile.getReader(new Path(dBSnapshotMeta.getSnapshotMetaPath()));
            Throwable th = null;
            try {
                try {
                    int readInt = reader.readInt();
                    reader.seek(regionOffsets[max - startRegionId]);
                    int i3 = max;
                    while (i3 <= min) {
                        Preconditions.checkState(reader.readInt() == i3, "Unexpected key-group in restore.");
                        for (int i4 = 0; i4 < readInt; i4++) {
                            String readUTF = reader.readUTF();
                            int readInt2 = reader.readInt();
                            for (int i5 = 0; i5 < readInt2; i5++) {
                                GRegionIDImpl gRegionIDImpl = new GRegionIDImpl(reader.readInt());
                                int id = gRegionIDImpl.getId();
                                long readLong = reader.readLong();
                                long readLong2 = reader.readLong();
                                GTable gTable = map.get(readUTF);
                                PageIndex build = new PageIndexHashImpl.Builder(reader, new GRegionContext(this.context, readUTF, gRegionIDImpl, gTable.getTableDescription().getPageSerde(), readLong, readLong2)).build();
                                if (gRegionIDImpl.getIndexID() == 0) {
                                    gTable.setRegion(id, gTable.getTableDescription().createRegion(this.context, gTable, gRegionIDImpl, build));
                                } else {
                                    gTable.setIndexRegion(id, gTable.getIndexDescription().createRegion(this.context, gTable, gRegionIDImpl, build));
                                }
                            }
                        }
                        i3++;
                    }
                    reader.seek(dBSnapshotMeta.getDfsFileMappingOffset());
                    Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> restoreFileMapping = restoreFileMapping(reader, hashMap, fileMappingSerializer);
                    Preconditions.checkNotNull(restoreFileMapping, "dfs meta should always has file mapping");
                    if (restoreFileMapping.f0 != null) {
                        loadFromRestoredFileMapping((Map) restoreFileMapping.f0, hashMap3);
                    }
                    hashSet.add(restoreFileMapping.f1);
                    Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> restoreFileMapping2 = restoreFileMapping(reader, hashMap2, fileMappingSerializer);
                    if (restoreFileMapping2 != null) {
                        isEnableRestorePreFetch = false;
                        if (restoreFileMapping2.f0 != null) {
                            loadFromRestoredFileMapping((Map) restoreFileMapping2.f0, hashMap4);
                        }
                        hashSet2.add(restoreFileMapping2.f1);
                    }
                    if (reader != null) {
                        if (0 != 0) {
                            try {
                                reader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            reader.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (reader != null) {
                    if (th != null) {
                        try {
                            reader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        reader.close();
                    }
                }
                throw th4;
            }
        }
        Preconditions.checkState(!hashSet.isEmpty(), "restored dfs base path should not be empty.");
        SnapshotManager snapshotManager = this.supervisor.getSnapshotManager();
        snapshotManager.setNeedToBreakLineage(z);
        Map<Integer, FileMeta.RestoredFileMeta> hashMap5 = hashMap3.size() == 0 ? new HashMap<>() : getRestoredFileMetaUsedByDB(hashMap, hashMap3, i, i2, !z);
        this.supervisor.getDfsFileManager().restore(mergeDbAndSnapshotFileMeta(hashMap5, snapshotManager.restore(j, getMappingForFileIdToPath(hashMap5), (String) hashSet.iterator().next()), !z));
        FileManager localFileManager = this.supervisor.getLocalFileManager();
        Path basePath = localFileManager.getBasePath();
        File file = new File(basePath.getPath());
        if (hashMap4.size() > 0) {
            createHardLinkForRestoredLocalFile((String) hashSet2.iterator().next(), hashMap2, basePath);
            localFileManager.restore(getRestoredFileMetaUsedByDB(hashMap2, hashMap4, i, i2, true));
        }
        LOG.info("Restored successfully from {} for {}, region from {} to {}, consumed {} ms.", new Object[]{collection, map, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        if (isEnableRestorePreFetch) {
            long currentTimeMillis2 = System.currentTimeMillis();
            int fetchFilesThreadNum = this.configuration.getFetchFilesThreadNum();
            try {
                HashMap hashMap6 = new HashMap(hashMap5.size());
                HashMap hashMap7 = new HashMap(hashMap5.size());
                for (Map.Entry<Integer, FileMeta.RestoredFileMeta> entry : hashMap5.entrySet()) {
                    FileMeta.RestoredFileMeta value = entry.getValue();
                    String str = value.filePath;
                    Path path = new Path(basePath, extractFileName(str));
                    hashMap6.put(str, path);
                    hashMap7.put(entry.getKey(), FileMeta.RestoredFileMeta.of(value.id, path.toString(), value.fileSize, value.dataSize, value.dbReference, 0, true));
                }
                downloadDataForAllFiles(hashMap6, fetchFilesThreadNum, this.closeableRegistry);
                localFileManager.restore(hashMap7);
                Iterator<GTable> it = this.supervisor.getAllTables().values().iterator();
                while (it.hasNext()) {
                    Iterator<GRegion> regionIterator = it.next().regionIterator();
                    while (regionIterator.hasNext()) {
                        Iterator<PageAddress> pageIterator = regionIterator.next().getPageStore().getPageIndex().pageIterator();
                        while (pageIterator.hasNext()) {
                            PageAddress next = pageIterator.next();
                            next.setLocalAddress(next.getDfsAddress());
                            next.setLocalStatus(true);
                        }
                    }
                }
                if (1 != 0) {
                    LOG.info("Successfully fetch files to local and re-construct the local address, consumed {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                    return;
                }
                if (basePath.getFileSystem().listStatus(basePath) != null) {
                    LOG.info("Cleaning local downloaded files when not successful to restore.");
                    FileUtils.deleteDirectoryQuietly(file);
                }
                LOG.info("Failed to fetch files to local or re-construct the local address, consumed {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            } catch (Throwable th6) {
                if (0 == 0) {
                    if (basePath.getFileSystem().listStatus(basePath) != null) {
                        LOG.info("Cleaning local downloaded files when not successful to restore.");
                        FileUtils.deleteDirectoryQuietly(file);
                    }
                    LOG.info("Failed to fetch files to local or re-construct the local address, consumed {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                } else {
                    LOG.info("Successfully fetch files to local and re-construct the local address, consumed {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                }
                throw th6;
            }
        }
    }

    private void downloadDataForAllFiles(Map<String, Path> map, int i, CloseableRegistry closeableRegistry) throws Exception {
        ExecutorService newFixedThreadPool = i > 1 ? Executors.newFixedThreadPool(i) : org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService();
        try {
            try {
                List<Runnable> createDownloadRunnables = createDownloadRunnables(map, closeableRegistry);
                ArrayList arrayList = new ArrayList(createDownloadRunnables.size());
                Iterator<Runnable> it = createDownloadRunnables.iterator();
                while (it.hasNext()) {
                    arrayList.add(CompletableFuture.runAsync(it.next(), newFixedThreadPool));
                }
                FutureUtils.waitForAll(arrayList).get();
                newFixedThreadPool.shutdownNow();
            } catch (ExecutionException e) {
                Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(e), RuntimeException.class);
                if (!(stripException instanceof IOException)) {
                    throw new FlinkRuntimeException(e);
                }
                throw ((IOException) stripException);
            }
        } catch (Throwable th) {
            newFixedThreadPool.shutdownNow();
            throw th;
        }
    }

    private List<Runnable> createDownloadRunnables(Map<String, Path> map, CloseableRegistry closeableRegistry) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<String, Path> entry : map.entrySet()) {
            arrayList.add(ThrowingRunnable.unchecked(() -> {
                downloadData(new Path((String) entry.getKey()), (Path) entry.getValue(), closeableRegistry);
            }));
        }
        return arrayList;
    }

    private String extractFileName(String str) throws IOException {
        String substring = str.substring(str.lastIndexOf(ZKPaths.PATH_SEPARATOR) + 1);
        if (StringUtils.isNullOrWhitespaceOnly(substring)) {
            throw new IOException("Fail to extract file name from given file path " + str);
        }
        return substring;
    }

    private void downloadData(Path path, Path path2, CloseableRegistry closeableRegistry) throws IOException {
        FileSystem fileSystem = path2.getFileSystem();
        Closeable closeable = null;
        Closeable closeable2 = null;
        try {
            closeable = path.getFileSystem().open(path);
            closeableRegistry.registerCloseable(closeable);
            closeable2 = fileSystem.create(path2, FileSystem.WriteMode.OVERWRITE);
            closeableRegistry.registerCloseable(closeable2);
            byte[] bArr = new byte[65536];
            while (true) {
                int read = closeable.read(bArr);
                if (read == -1) {
                    break;
                } else {
                    closeable2.write(bArr, 0, read);
                }
            }
            if (closeableRegistry.unregisterCloseable(closeable)) {
                closeable.close();
            }
            if (closeableRegistry.unregisterCloseable(closeable2)) {
                closeable2.close();
            }
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(closeable)) {
                closeable.close();
            }
            if (closeableRegistry.unregisterCloseable(closeable2)) {
                closeable2.close();
            }
            throw th;
        }
    }

    private MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> getFileMappingSerializer() {
        return new MapSerializer<>(IntSerializer.INSTANCE, new MapSerializer(IntSerializer.INSTANCE, new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, LongSerializer.INSTANCE})));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Tuple2<Map<Integer, Map<Integer, Tuple2<Integer, Long>>>, String> restoreFileMapping(SnapshotMetaFile.Reader reader, Map<Integer, FileMeta.RestoredFileMeta> map, MapSerializer<Integer, Map<Integer, Tuple2<Integer, Long>>> mapSerializer) throws IOException {
        if (!reader.readBoolean()) {
            return null;
        }
        int readInt = reader.readInt();
        String readUTF = reader.readUTF();
        if (readInt <= 0) {
            return Tuple2.of((Object) null, readUTF);
        }
        for (int i = 0; i < readInt; i++) {
            String readUTF2 = reader.readUTF();
            int readInt2 = reader.readInt();
            map.put(Integer.valueOf(readInt2), FileMeta.RestoredFileMeta.of(readInt2, readUTF2, reader.readLong()));
        }
        return Tuple2.of(mapSerializer.deserialize(new DataInputViewStreamWrapper(reader)), readUTF);
    }

    private void loadFromRestoredFileMapping(Map<Integer, Map<Integer, Tuple2<Integer, Long>>> map, Map<Integer, Map<Integer, Tuple2<Integer, Long>>> map2) {
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> entry : map.entrySet()) {
            Map<Integer, Tuple2<Integer, Long>> map3 = map2.get(entry.getKey());
            if (map3 == null) {
                map2.put(entry.getKey(), entry.getValue());
            } else {
                map3.putAll(entry.getValue());
            }
        }
    }

    private Map<Integer, FileMeta.RestoredFileMeta> getRestoredFileMetaUsedByDB(Map<Integer, FileMeta.RestoredFileMeta> map, Map<Integer, Map<Integer, Tuple2<Integer, Long>>> map2, int i, int i2, boolean z) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<Integer, Map<Integer, Tuple2<Integer, Long>>> entry : map2.entrySet()) {
            int intValue = entry.getKey().intValue();
            FileMeta.RestoredFileMeta restoredFileMeta = map.get(Integer.valueOf(intValue));
            long j = 0;
            int i3 = 0;
            for (Map.Entry<Integer, Tuple2<Integer, Long>> entry2 : entry.getValue().entrySet()) {
                if (entry2.getKey().intValue() >= i && entry2.getKey().intValue() <= i2) {
                    i3 += ((Integer) entry2.getValue().f0).intValue();
                    j += ((Long) entry2.getValue().f1).longValue();
                }
            }
            if (i3 > 0) {
                restoredFileMeta.setUsedDataSizeAndReference(j, i3, 0, z);
                hashMap.put(Integer.valueOf(intValue), restoredFileMeta);
            }
        }
        return hashMap;
    }

    private Map<Integer, FileMeta.RestoredFileMeta> mergeDbAndSnapshotFileMeta(Map<Integer, FileMeta.RestoredFileMeta> map, Map<Long, SnapshotManager.RestoredSnapshot> map2, boolean z) {
        HashMap hashMap = new HashMap(map);
        Iterator<SnapshotManager.RestoredSnapshot> it = map2.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<Integer, String> entry : it.next().getFileMapping().entrySet()) {
                int intValue = entry.getKey().intValue();
                String value = entry.getValue();
                FileMeta.RestoredFileMeta restoredFileMeta = (FileMeta.RestoredFileMeta) hashMap.get(Integer.valueOf(intValue));
                if (restoredFileMeta != null) {
                    restoredFileMeta.snapshotReference++;
                } else {
                    hashMap.put(Integer.valueOf(intValue), FileMeta.RestoredFileMeta.of(intValue, value, 0L, 0L, 0, 1, z));
                }
            }
        }
        return hashMap;
    }

    private Map<Integer, String> getMappingForFileIdToPath(Map<Integer, FileMeta.RestoredFileMeta> map) {
        HashMap hashMap = new HashMap();
        map.forEach((num, restoredFileMeta) -> {
        });
        return hashMap;
    }

    private void createHardLinkForRestoredLocalFile(String str, Map<Integer, FileMeta.RestoredFileMeta> map, Path path) throws Exception {
        File file = new File(new Path(str).toUri().getPath());
        File file2 = new File(path.toUri().getPath());
        if (file2.exists()) {
            FileUtils.deleteDirectory(file2);
        }
        if (!file2.mkdirs()) {
            throw new IOException("Local working directory " + file2 + " already exists");
        }
        Iterator<Map.Entry<Integer, FileMeta.RestoredFileMeta>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            FileMeta.RestoredFileMeta value = it.next().getValue();
            String str2 = value.filePath;
            File file3 = new File(file, str2);
            File file4 = new File(file2, str2);
            try {
                Files.createLink(file4.toPath(), file3.toPath());
                value.filePath = file4.getAbsolutePath();
            } catch (Exception e) {
                LOG.error("Fail to create hard link from {} to {}.", new Object[]{file3.getAbsolutePath(), file4.getAbsolutePath(), e});
                throw e;
            }
        }
    }
}
