/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;

class StateDataTransfer {
    StateDataTransfer() {
    }

    static void transferAllStateDataToDirectory(IncrementalKeyedStateSnapshot restoreStateHandle, Path dest, int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception {
        HashMap<StateHandleID, StreamStateHandle> sstFiles = new HashMap<StateHandleID, StreamStateHandle>(restoreStateHandle.getSharedState().size());
        for (Map.Entry entry : restoreStateHandle.getSharedState().entrySet()) {
            sstFiles.put((StateHandleID)entry.getKey(), (StreamStateHandle)((Tuple2)entry.getValue()).f1);
        }
        Map miscFiles = restoreStateHandle.getPrivateState();
        StateDataTransfer.downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry);
        StateDataTransfer.downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry);
    }

    private static void downloadDataForAllStateHandles(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath, int restoringThreadNum, CloseableRegistry closeableRegistry) throws Exception {
        ExecutorService executorService = StateDataTransfer.createExecutorService(restoringThreadNum);
        try {
            List<Runnable> runnables = StateDataTransfer.createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>(runnables.size());
            for (Runnable runnable : runnables) {
                futures.add(CompletableFuture.runAsync(runnable, executorService));
            }
            FutureUtils.waitForAll(futures).get();
        }
        catch (ExecutionException e2) {
            Throwable throwable = ExceptionUtils.stripExecutionException(e2);
            throwable = ExceptionUtils.stripException(throwable, RuntimeException.class);
            if (throwable instanceof IOException) {
                throw (IOException)throwable;
            }
            throw new FlinkRuntimeException("Failed to download data for state handles.", e2);
        }
        finally {
            executorService.shutdownNow();
        }
    }

    private static ExecutorService createExecutorService(int threadNum) {
        if (threadNum > 1) {
            return Executors.newFixedThreadPool(threadNum);
        }
        return org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService();
    }

    private static List<Runnable> createDownloadRunnables(Map<StateHandleID, StreamStateHandle> stateHandleMap, Path restoreInstancePath, CloseableRegistry closeableRegistry) {
        ArrayList<Runnable> runnables = new ArrayList<Runnable>(stateHandleMap.size());
        for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
            StateHandleID stateHandleID = entry.getKey();
            StreamStateHandle remoteFileHandle = entry.getValue();
            Path path = new Path(restoreInstancePath, stateHandleID.toString());
            runnables.add(ThrowingRunnable.unchecked(() -> StateDataTransfer.downloadDataForStateHandle(path, remoteFileHandle, closeableRegistry)));
        }
        return runnables;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void downloadDataForStateHandle(Path restoreFilePath, StreamStateHandle remoteFileHandle, CloseableRegistry closeableRegistry) throws IOException {
        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try {
            int numBytes;
            FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
            inputStream = remoteFileHandle.openInputStream();
            closeableRegistry.registerCloseable(inputStream);
            outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
            closeableRegistry.registerCloseable(outputStream);
            byte[] buffer = new byte[65536];
            while ((numBytes = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, numBytes);
            }
        }
        finally {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                inputStream.close();
            }
            if (closeableRegistry.unregisterCloseable(outputStream)) {
                outputStream.close();
            }
        }
    }
}

