/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.failover;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmaster.failover.OperationLog;
import org.apache.flink.runtime.jobmaster.failover.OperationLogStore;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemOperationLogStore
implements OperationLogStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemOperationLogStore.class);
    private static final String logNamePrefix = "operation.log.";
    private final Path workingDir;
    private FileSystem fileSystem;
    private Path filePath;
    private DataOutputStream outputStream;
    private FileFlusher fileFlusher;
    private FileSystemOperationLogReader opLogReader;
    private volatile boolean currupted = false;
    private final int flushIntervalInMs;

    @VisibleForTesting
    FileSystemOperationLogStore(@Nonnull Path workingDir) {
        this.workingDir = workingDir;
        this.flushIntervalInMs = 1000;
        try {
            this.fileSystem = workingDir.getFileSystem();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Fail to get file system.");
        }
    }

    public FileSystemOperationLogStore(@Nonnull JobID jobID, @Nonnull Configuration configuration) {
        String rootPath = configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
        if (rootPath == null || StringUtils.isBlank((CharSequence)rootPath)) {
            throw new IllegalConfigurationException(String.format("Missing high-availability storage path for storing operation logs. Specify via configuration key '%s'.", HighAvailabilityOptions.HA_STORAGE_PATH));
        }
        rootPath = rootPath + configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
        this.workingDir = new Path(new Path(rootPath, jobID.toString()), "jobmaster-oplog");
        this.flushIntervalInMs = configuration.getInteger(JobManagerOptions.OPLOG_FLUSH_INTERVAL);
        try {
            this.fileSystem = this.workingDir.getFileSystem();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Fail to get file system.");
        }
    }

    @Override
    public void start() {
        try {
            if (!this.fileSystem.exists(this.workingDir)) {
                this.fileSystem.mkdirs(this.workingDir);
            }
            int i = 1;
            while (true) {
                Path file;
                if (!this.fileSystem.exists(file = new Path(this.workingDir, logNamePrefix + i))) {
                    this.outputStream = new DataOutputStream((OutputStream)this.fileSystem.create(file, FileSystem.WriteMode.NO_OVERWRITE));
                    this.filePath = file;
                    assert (!this.fileSystem.exists(new Path(this.workingDir, logNamePrefix + (i + 1))));
                    break;
                }
                ++i;
            }
            LOG.info("Operation log will be written to {}.", (Object)this.filePath);
            this.fileFlusher = new FileFlusher();
            this.fileFlusher.start();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Fail to start file system log store.", (Throwable)e);
        }
    }

    @Override
    public void stop() {
        try {
            if (this.outputStream != null) {
                this.outputStream.flush();
                this.outputStream.close();
                this.outputStream = null;
            }
            if (this.fileFlusher != null) {
                this.fileFlusher.interrupt();
                this.fileFlusher.join();
                this.fileFlusher = null;
            }
            if (this.opLogReader != null) {
                this.opLogReader.close();
                this.opLogReader = null;
            }
        }
        catch (Exception e) {
            LOG.warn("Fail to stop {}.", (Object)this.filePath.getName(), (Object)e);
        }
    }

    @Override
    public void clear() {
        try {
            if (this.outputStream != null) {
                this.outputStream.close();
                this.outputStream = null;
            }
            if (this.fileFlusher != null) {
                this.fileFlusher.interrupt();
                this.fileFlusher.join();
            }
            if (this.opLogReader != null) {
                this.opLogReader.close();
                this.opLogReader = null;
            }
            this.fileSystem.delete(this.workingDir, true);
        }
        catch (Exception e) {
            LOG.warn("Fail to delete the {}.", (Object)this.workingDir.getName(), (Object)e);
        }
    }

    @Override
    public void writeOpLog(@Nonnull OperationLog opLog) {
        Preconditions.checkNotNull((Object)this.outputStream);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing a operation log on file system at {} while {}.", (Object)this.filePath, (Object)this.currupted);
        }
        try {
            if (!this.currupted) {
                byte[] bytes = InstantiationUtil.serializeObject((Object)opLog);
                this.outputStream.writeInt(bytes.length);
                this.outputStream.write(bytes);
            }
        }
        catch (Exception e) {
            LOG.warn("Write log meet error, will not record log any more.", (Throwable)e);
            this.currupted = true;
        }
    }

    @Override
    public OperationLog readOpLog() {
        if (this.opLogReader == null) {
            this.opLogReader = new FileSystemOperationLogReader();
        }
        return this.opLogReader.read();
    }

    private class FileFlusher
    extends Thread {
        private FileFlusher() {
        }

        @Override
        public void run() {
            while (FileSystemOperationLogStore.this.outputStream != null && !FileSystemOperationLogStore.this.currupted) {
                try {
                    FileSystemOperationLogStore.this.outputStream.flush();
                    Thread.sleep(FileSystemOperationLogStore.this.flushIntervalInMs);
                }
                catch (Exception e) {
                    LOG.warn("Fail to flush the log file!", (Throwable)e);
                }
            }
        }
    }

    class FileSystemOperationLogReader {
        private DataInputStream inputStream;
        private int index = 1;

        FileSystemOperationLogReader() {
            try {
                Path file = new Path(FileSystemOperationLogStore.this.workingDir, FileSystemOperationLogStore.logNamePrefix + this.index++);
                this.inputStream = new DataInputStream((InputStream)FileSystemOperationLogStore.this.fileSystem.open(file));
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Cannot init filesystem opLog store.", (Throwable)e);
            }
        }

        public OperationLog read() {
            try {
                OperationLog operationLog = null;
                while (operationLog == null) {
                    try {
                        int logLength = this.inputStream.readInt();
                        byte[] logByte = new byte[logLength];
                        int logReadLen = this.inputStream.read(logByte);
                        if (logReadLen == logLength) {
                            operationLog = (OperationLog)InstantiationUtil.deserializeObject((byte[])logByte, (ClassLoader)ClassLoader.getSystemClassLoader());
                            continue;
                        }
                        String message = String.format("Fail to read log from %s%s, expected %s, only read %s", FileSystemOperationLogStore.logNamePrefix, this.index, logLength, logReadLen);
                        throw new IOException(message);
                    }
                    catch (EOFException eof) {
                        Path file = new Path(FileSystemOperationLogStore.this.workingDir, FileSystemOperationLogStore.logNamePrefix + this.index++);
                        if (!FileSystemOperationLogStore.this.fileSystem.exists(file)) break;
                        this.inputStream.close();
                        this.inputStream = new DataInputStream((InputStream)FileSystemOperationLogStore.this.fileSystem.open(file));
                    }
                }
                return operationLog == null ? null : operationLog;
            }
            catch (Exception e) {
                throw new FlinkRuntimeException("Cannot read next opLog from opLog store.", (Throwable)e);
            }
        }

        public void close() throws IOException {
            if (this.inputStream != null) {
                this.inputStream.close();
                this.inputStream = null;
            }
        }
    }
}

