/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemSubmittedJobGraphStore
implements SubmittedJobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemSubmittedJobGraphStore.class);
    private final Object cacheLock = new Object();
    private final String basePath;
    private final FileSystem jobsFileSystem;
    private boolean isRunning;

    public FileSystemSubmittedJobGraphStore(String currentJobsPath) throws Exception {
        Preconditions.checkNotNull((Object)currentJobsPath, (String)"Current jobs path");
        Path jobsPath = new Path(currentJobsPath);
        this.jobsFileSystem = jobsPath.getFileSystem();
        if (!this.jobsFileSystem.exists(jobsPath)) {
            this.jobsFileSystem.mkdirs(jobsPath);
        }
        this.basePath = currentJobsPath;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener) throws Exception {
        Object object = this.cacheLock;
        synchronized (object) {
            if (!this.isRunning) {
                this.isRunning = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        Object object = this.cacheLock;
        synchronized (object) {
            if (this.isRunning) {
                this.isRunning = false;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
        Preconditions.checkNotNull((Object)jobGraph, (String)"Job graph");
        String path = FileSystemSubmittedJobGraphStore.getPathForJob(jobGraph.getJobId());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{jobGraph.getJobId(), this.basePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            this.verifyIsRunning();
            Path jobGraphPath = new Path(this.basePath + path);
            try {
                FSDataOutputStream outputStream = this.jobsFileSystem.create(jobGraphPath, true);
                outputStream.write(InstantiationUtil.serializeObject((Object)jobGraph));
                outputStream.close();
            }
            catch (Exception e) {
                throw new RuntimeException("Storing job graph " + jobGraph.getJobId() + " failed", e);
            }
        }
        LOG.info("Added job graph {} to {}{}.", new Object[]{jobGraph, this.basePath, path});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = FileSystemSubmittedJobGraphStore.getPathForJob(jobId);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobId, this.basePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            try {
                this.jobsFileSystem.delete(new Path(this.basePath + path), true);
            }
            catch (Exception e) {
                LOG.info("Removing job graph " + jobId + " failed.", (Throwable)e);
            }
        }
        LOG.info("Removed job graph {} from {}{}.", new Object[]{jobId, this.basePath, path});
    }

    @Override
    public Collection<JobID> getJobIds() throws Exception {
        LOG.debug("Retrieving all stored job ids.");
        FileStatus[] fileStats = this.jobsFileSystem.listStatus(new Path(this.basePath));
        if (fileStats == null) {
            return Collections.emptyList();
        }
        ArrayList<JobID> jobIds = new ArrayList<JobID>(fileStats.length);
        for (FileStatus fileStat : fileStats) {
            String path = fileStat.getPath().getName();
            try {
                jobIds.add(FileSystemSubmittedJobGraphStore.jobIdfromPath(path));
            }
            catch (Exception exception) {
                LOG.warn("Could not parse job id from {}. This indicates a malformed path.", (Object)path, (Object)exception);
            }
        }
        return jobIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        String path = FileSystemSubmittedJobGraphStore.getPathForJob(jobId);
        LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobId, this.basePath, path});
        Object object = this.cacheLock;
        synchronized (object) {
            this.verifyIsRunning();
            Path jobGraphPath = new Path(this.basePath + path);
            try {
                if (!this.jobsFileSystem.exists(jobGraphPath)) {
                    return null;
                }
            }
            catch (Exception e) {
                throw new Exception("Could not retrieve the submitted job graph state handle for " + path + "from the submitted job graph store.", e);
            }
            FSDataInputStream inputStream = this.jobsFileSystem.open(jobGraphPath);
            SubmittedJobGraph jobGraph = (SubmittedJobGraph)InstantiationUtil.deserializeObject((InputStream)inputStream, (ClassLoader)ClassLoader.getSystemClassLoader());
            inputStream.close();
            LOG.info("Recovered {}.", (Object)jobId);
            return jobGraph;
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState((boolean)this.isRunning, (Object)"Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobId) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return String.format("/%s", jobId);
    }

    public static JobID jobIdfromPath(String path) {
        return JobID.fromHexString((String)path);
    }
}

