package org.apache.flink.runtime.jobmaster.failover;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/failover/FileSystemOperationLogStore.class */
public class FileSystemOperationLogStore implements OperationLogStore {
    private static final Logger LOG;
    private static final String logNamePrefix = "operation.log.";
    private final Path workingDir;
    private FileSystem fileSystem;
    private Path filePath;
    private DataOutputStream outputStream;
    private FileFlusher fileFlusher;
    private FileSystemOperationLogReader opLogReader;
    private volatile boolean currupted;
    private final int flushIntervalInMs;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/failover/FileSystemOperationLogStore$FileFlusher.class */
    private class FileFlusher extends Thread {
        private FileFlusher() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (FileSystemOperationLogStore.this.outputStream != null && !FileSystemOperationLogStore.this.currupted) {
                try {
                    FileSystemOperationLogStore.this.outputStream.flush();
                    Thread.sleep(FileSystemOperationLogStore.this.flushIntervalInMs);
                } catch (Exception e) {
                    FileSystemOperationLogStore.LOG.warn("Fail to flush the log file!", e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/failover/FileSystemOperationLogStore$FileSystemOperationLogReader.class */
    class FileSystemOperationLogReader {
        private DataInputStream inputStream;
        private int index;

        FileSystemOperationLogReader() {
            this.index = 1;
            try {
                Path path = FileSystemOperationLogStore.this.workingDir;
                StringBuilder append = new StringBuilder().append(FileSystemOperationLogStore.logNamePrefix);
                int i = this.index;
                this.index = i + 1;
                this.inputStream = new DataInputStream(FileSystemOperationLogStore.this.fileSystem.open(new Path(path, append.append(i).toString())));
            } catch (IOException e) {
                throw new FlinkRuntimeException("Cannot init filesystem opLog store.", e);
            }
        }

        public OperationLog read() {
            int readInt;
            byte[] bArr;
            int read;
            OperationLog operationLog = null;
            while (operationLog == null) {
                try {
                    try {
                        readInt = this.inputStream.readInt();
                        bArr = new byte[readInt];
                        read = this.inputStream.read(bArr);
                    } catch (EOFException e) {
                        Path path = FileSystemOperationLogStore.this.workingDir;
                        StringBuilder append = new StringBuilder().append(FileSystemOperationLogStore.logNamePrefix);
                        int i = this.index;
                        this.index = i + 1;
                        Path path2 = new Path(path, append.append(i).toString());
                        if (!FileSystemOperationLogStore.this.fileSystem.exists(path2)) {
                            break;
                        }
                        this.inputStream.close();
                        this.inputStream = new DataInputStream(FileSystemOperationLogStore.this.fileSystem.open(path2));
                    }
                    if (read != readInt) {
                        throw new IOException(String.format("Fail to read log from %s%s, expected %s, only read %s", FileSystemOperationLogStore.logNamePrefix, Integer.valueOf(this.index), Integer.valueOf(readInt), Integer.valueOf(read)));
                        break;
                    }
                    operationLog = (OperationLog) InstantiationUtil.deserializeObject(bArr, ClassLoader.getSystemClassLoader());
                } catch (Exception e2) {
                    throw new FlinkRuntimeException("Cannot read next opLog from opLog store.", e2);
                }
            }
            if (operationLog == null) {
                return null;
            }
            return operationLog;
        }

        public void close() throws IOException {
            if (this.inputStream != null) {
                this.inputStream.close();
                this.inputStream = null;
            }
        }
    }

    @VisibleForTesting
    FileSystemOperationLogStore(@Nonnull Path path) {
        this.currupted = false;
        this.workingDir = path;
        this.flushIntervalInMs = 1000;
        try {
            this.fileSystem = path.getFileSystem();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Fail to get file system.");
        }
    }

    public FileSystemOperationLogStore(@Nonnull JobID jobID, @Nonnull Configuration configuration) {
        this.currupted = false;
        String value = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
        if (value == null || StringUtils.isBlank(value)) {
            throw new IllegalConfigurationException(String.format("Missing high-availability storage path for storing operation logs. Specify via configuration key '%s'.", HighAvailabilityOptions.HA_STORAGE_PATH));
        }
        this.workingDir = new Path(new Path(value + configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID), jobID.toString()), "jobmaster-oplog");
        this.flushIntervalInMs = configuration.getInteger(JobManagerOptions.OPLOG_FLUSH_INTERVAL);
        try {
            this.fileSystem = this.workingDir.getFileSystem();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Fail to get file system.");
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.OperationLogStore
    public void start() {
        Path path;
        try {
            if (!this.fileSystem.exists(this.workingDir)) {
                this.fileSystem.mkdirs(this.workingDir);
            }
            int i = 1;
            while (true) {
                path = new Path(this.workingDir, logNamePrefix + i);
                if (!this.fileSystem.exists(path)) {
                    break;
                } else {
                    i++;
                }
            }
            this.outputStream = new DataOutputStream(this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE));
            this.filePath = path;
            if (!$assertionsDisabled && this.fileSystem.exists(new Path(this.workingDir, logNamePrefix + (i + 1)))) {
                throw new AssertionError();
            }
            LOG.info("Operation log will be written to {}.", this.filePath);
            this.fileFlusher = new FileFlusher();
            this.fileFlusher.start();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Fail to start file system log store.", e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.OperationLogStore
    public void stop() {
        try {
            if (this.outputStream != null) {
                this.outputStream.flush();
                this.outputStream.close();
                this.outputStream = null;
            }
            if (this.fileFlusher != null) {
                this.fileFlusher.interrupt();
                this.fileFlusher.join();
                this.fileFlusher = null;
            }
            if (this.opLogReader != null) {
                this.opLogReader.close();
                this.opLogReader = null;
            }
        } catch (Exception e) {
            LOG.warn("Fail to stop {}.", this.filePath.getName(), e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.OperationLogStore
    public void clear() {
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
                this.outputStream = null;
            }
            if (this.fileFlusher != null) {
                this.fileFlusher.interrupt();
                this.fileFlusher.join();
            }
            if (this.opLogReader != null) {
                this.opLogReader.close();
                this.opLogReader = null;
            }
            this.fileSystem.delete(this.workingDir, true);
        } catch (Exception e) {
            LOG.warn("Fail to delete the {}.", this.workingDir.getName(), e);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.OperationLogStore
    public void writeOpLog(@Nonnull OperationLog operationLog) {
        Preconditions.checkNotNull(this.outputStream);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing a operation log on file system at {} while {}.", this.filePath, Boolean.valueOf(this.currupted));
        }
        try {
            if (!this.currupted) {
                byte[] serializeObject = InstantiationUtil.serializeObject(operationLog);
                this.outputStream.writeInt(serializeObject.length);
                this.outputStream.write(serializeObject);
            }
        } catch (Exception e) {
            LOG.warn("Write log meet error, will not record log any more.", e);
            this.currupted = true;
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.failover.OperationLogStore
    public OperationLog readOpLog() {
        if (this.opLogReader == null) {
            this.opLogReader = new FileSystemOperationLogReader();
        }
        return this.opLogReader.read();
    }

    static {
        $assertionsDisabled = !FileSystemOperationLogStore.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(FileSystemOperationLogStore.class);
    }
}
