/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileBasedStateOutputStream;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface CheckpointStreamWithResultProvider
extends Closeable {
    public static final Logger LOG = LoggerFactory.getLogger(CheckpointStreamWithResultProvider.class);

    @Nonnull
    public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;

    @Nonnull
    public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();

    @Override
    default public void close() throws IOException {
        this.getCheckpointOutputStream().close();
    }

    @Nonnull
    public static CheckpointStreamWithResultProvider createSimpleStream(@Nonnull long checkpointId, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointedStateScope);
        return new PrimaryStreamOnly(primaryOut);
    }

    @Nonnull
    public static CheckpointStreamWithResultProvider createDuplicatingStream(@Nonnegative long checkpointId, @Nonnull CheckpointedStateScope checkpointedStateScope, @Nonnull CheckpointStreamFactory primaryStreamFactory, @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {
        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointedStateScope);
        try {
            File outFile = new File(secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId), String.valueOf(UUID.randomUUID()));
            Path outPath = new Path(outFile.toURI());
            FileBasedStateOutputStream secondaryOut = new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
            return new PrimaryAndSecondaryStream(primaryOut, secondaryOut);
        }
        catch (IOException secondaryEx) {
            LOG.warn("Exception when opening secondary/local checkpoint output stream. Continue only with the primary stream.", (Throwable)secondaryEx);
            return new PrimaryStreamOnly(primaryOut);
        }
    }

    @Nonnull
    public static SnapshotResult<KeyedStateHandle> toKeyedStateHandleSnapshotResult(@Nonnull SnapshotResult<StreamStateHandle> snapshotResult, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) {
        StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            KeyGroupsStateHandle jmKeyedState = new KeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);
            StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();
            if (taskLocalSnapshot != null) {
                KeyGroupsStateHandle localKeyedState = new KeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);
                return SnapshotResult.withLocalState(jmKeyedState, localKeyedState);
            }
            return SnapshotResult.of(jmKeyedState);
        }
        return SnapshotResult.empty();
    }

    public static class PrimaryAndSecondaryStream
    implements CheckpointStreamWithResultProvider {
        private static final Logger LOG = LoggerFactory.getLogger(PrimaryAndSecondaryStream.class);
        @Nonnull
        private final DuplicatingCheckpointOutputStream outputStream;

        public PrimaryAndSecondaryStream(@Nonnull CheckpointStreamFactory.CheckpointStateOutputStream primaryOut, CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut) throws IOException {
            this(new DuplicatingCheckpointOutputStream(primaryOut, secondaryOut));
        }

        PrimaryAndSecondaryStream(@Nonnull DuplicatingCheckpointOutputStream outputStream) {
            this.outputStream = outputStream;
        }

        @Override
        @Nonnull
        public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException {
            StreamStateHandle primaryStreamStateHandle;
            try {
                primaryStreamStateHandle = this.outputStream.closeAndGetPrimaryHandle();
            }
            catch (IOException primaryEx) {
                try {
                    this.outputStream.close();
                }
                catch (IOException closeEx) {
                    primaryEx = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)closeEx, (Throwable)primaryEx);
                }
                throw primaryEx;
            }
            StreamStateHandle secondaryStreamStateHandle = null;
            try {
                secondaryStreamStateHandle = this.outputStream.closeAndGetSecondaryHandle();
            }
            catch (IOException secondaryEx) {
                LOG.warn("Exception from secondary/local checkpoint stream.", (Throwable)secondaryEx);
            }
            if (primaryStreamStateHandle != null) {
                if (secondaryStreamStateHandle != null) {
                    return SnapshotResult.withLocalState(primaryStreamStateHandle, secondaryStreamStateHandle);
                }
                return SnapshotResult.of(primaryStreamStateHandle);
            }
            return SnapshotResult.empty();
        }

        @Override
        @Nonnull
        public DuplicatingCheckpointOutputStream getCheckpointOutputStream() {
            return this.outputStream;
        }
    }

    public static class PrimaryStreamOnly
    implements CheckpointStreamWithResultProvider {
        @Nonnull
        private final CheckpointStreamFactory.CheckpointStateOutputStream outputStream;

        public PrimaryStreamOnly(@Nonnull CheckpointStreamFactory.CheckpointStateOutputStream outputStream) {
            this.outputStream = outputStream;
        }

        @Override
        @Nonnull
        public SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException {
            return SnapshotResult.of(this.outputStream.closeAndGetHandle());
        }

        @Override
        @Nonnull
        public CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream() {
            return this.outputStream;
        }
    }
}

