package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/StateDataTransfer.class */
class StateDataTransfer {
    StateDataTransfer() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void transferAllStateDataToDirectory(IncrementalKeyedStateSnapshot incrementalKeyedStateSnapshot, Path path, int i, CloseableRegistry closeableRegistry) throws Exception {
        HashMap hashMap = new HashMap(incrementalKeyedStateSnapshot.getSharedState().size());
        for (Map.Entry entry : incrementalKeyedStateSnapshot.getSharedState().entrySet()) {
            hashMap.put(entry.getKey(), ((Tuple2) entry.getValue()).f1);
        }
        Map privateState = incrementalKeyedStateSnapshot.getPrivateState();
        downloadDataForAllStateHandles(hashMap, path, i, closeableRegistry);
        downloadDataForAllStateHandles(privateState, path, i, closeableRegistry);
    }

    private static void downloadDataForAllStateHandles(Map<StateHandleID, StreamStateHandle> map, Path path, int i, CloseableRegistry closeableRegistry) throws Exception {
        ExecutorService createExecutorService = createExecutorService(i);
        try {
            try {
                List<Runnable> createDownloadRunnables = createDownloadRunnables(map, path, closeableRegistry);
                ArrayList arrayList = new ArrayList(createDownloadRunnables.size());
                Iterator<Runnable> it = createDownloadRunnables.iterator();
                while (it.hasNext()) {
                    arrayList.add(CompletableFuture.runAsync(it.next(), createExecutorService));
                }
                FutureUtils.waitForAll(arrayList).get();
                createExecutorService.shutdownNow();
            } catch (ExecutionException e) {
                Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(e), RuntimeException.class);
                if (!(stripException instanceof IOException)) {
                    throw new FlinkRuntimeException("Failed to download data for state handles.", e);
                }
                throw ((IOException) stripException);
            }
        } catch (Throwable th) {
            createExecutorService.shutdownNow();
            throw th;
        }
    }

    private static ExecutorService createExecutorService(int i) {
        return i > 1 ? Executors.newFixedThreadPool(i) : org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService();
    }

    private static List<Runnable> createDownloadRunnables(Map<StateHandleID, StreamStateHandle> map, Path path, CloseableRegistry closeableRegistry) {
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
            StateHandleID key = entry.getKey();
            StreamStateHandle value = entry.getValue();
            Path path2 = new Path(path, key.toString());
            arrayList.add(ThrowingRunnable.unchecked(() -> {
                downloadDataForStateHandle(path2, value, closeableRegistry);
            }));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void downloadDataForStateHandle(Path path, StreamStateHandle streamStateHandle, CloseableRegistry closeableRegistry) throws IOException {
        FSDataInputStream fSDataInputStream = null;
        FSDataOutputStream fSDataOutputStream = null;
        try {
            FileSystem fileSystem = path.getFileSystem();
            fSDataInputStream = streamStateHandle.openInputStream();
            closeableRegistry.registerCloseable(fSDataInputStream);
            fSDataOutputStream = fileSystem.create(path, FileSystem.WriteMode.OVERWRITE);
            closeableRegistry.registerCloseable(fSDataOutputStream);
            byte[] bArr = new byte[65536];
            while (true) {
                int read = fSDataInputStream.read(bArr);
                if (read == -1) {
                    break;
                } else {
                    fSDataOutputStream.write(bArr, 0, read);
                }
            }
            if (closeableRegistry.unregisterCloseable(fSDataInputStream)) {
                fSDataInputStream.close();
            }
            if (closeableRegistry.unregisterCloseable(fSDataOutputStream)) {
                fSDataOutputStream.close();
            }
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(fSDataInputStream)) {
                fSDataInputStream.close();
            }
            if (closeableRegistry.unregisterCloseable(fSDataOutputStream)) {
                fSDataOutputStream.close();
            }
            throw th;
        }
    }
}
