package org.apache.flink.runtime.jobmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/FileSystemSubmittedJobGraphStore.class */
public class FileSystemSubmittedJobGraphStore implements SubmittedJobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemSubmittedJobGraphStore.class);
    private final Object cacheLock = new Object();
    private final String basePath;
    private final FileSystem jobsFileSystem;
    private boolean isRunning;

    public FileSystemSubmittedJobGraphStore(String str) throws Exception {
        Preconditions.checkNotNull(str, "Current jobs path");
        Path path = new Path(str);
        this.jobsFileSystem = path.getFileSystem();
        if (!this.jobsFileSystem.exists(path)) {
            this.jobsFileSystem.mkdirs(path);
        }
        this.basePath = str;
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void start(SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener) throws Exception {
        synchronized (this.cacheLock) {
            if (!this.isRunning) {
                this.isRunning = true;
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void stop() {
        synchronized (this.cacheLock) {
            if (this.isRunning) {
                this.isRunning = false;
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void putJobGraph(SubmittedJobGraph submittedJobGraph) throws Exception {
        Preconditions.checkNotNull(submittedJobGraph, "Job graph");
        String pathForJob = getPathForJob(submittedJobGraph.getJobId());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{submittedJobGraph.getJobId(), this.basePath, pathForJob});
        synchronized (this.cacheLock) {
            verifyIsRunning();
            try {
                FSDataOutputStream create = this.jobsFileSystem.create(new Path(this.basePath + pathForJob), true);
                create.write(InstantiationUtil.serializeObject(submittedJobGraph));
                create.close();
            } catch (Exception e) {
                throw new RuntimeException("Storing job graph " + submittedJobGraph.getJobId() + " failed", e);
            }
        }
        LOG.info("Added job graph {} to {}{}.", new Object[]{submittedJobGraph, this.basePath, pathForJob});
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void removeJobGraph(JobID jobID) throws Exception {
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobID, this.basePath, pathForJob});
        synchronized (this.cacheLock) {
            try {
                this.jobsFileSystem.delete(new Path(this.basePath + pathForJob), true);
            } catch (Exception e) {
                LOG.info("Removing job graph " + jobID + " failed.", e);
            }
        }
        LOG.info("Removed job graph {} from {}{}.", new Object[]{jobID, this.basePath, pathForJob});
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public Collection<JobID> getJobIds() throws Exception {
        LOG.debug("Retrieving all stored job ids.");
        FileStatus[] listStatus = this.jobsFileSystem.listStatus(new Path(this.basePath));
        if (listStatus == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(listStatus.length);
        for (FileStatus fileStatus : listStatus) {
            String name = fileStatus.getPath().getName();
            try {
                arrayList.add(jobIdfromPath(name));
            } catch (Exception e) {
                LOG.warn("Could not parse job id from {}. This indicates a malformed path.", name, e);
            }
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public SubmittedJobGraph recoverJobGraph(JobID jobID) throws Exception {
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobID, this.basePath, pathForJob});
        synchronized (this.cacheLock) {
            verifyIsRunning();
            Path path = new Path(this.basePath + pathForJob);
            try {
                if (!this.jobsFileSystem.exists(path)) {
                    return null;
                }
                FSDataInputStream open = this.jobsFileSystem.open(path);
                SubmittedJobGraph submittedJobGraph = (SubmittedJobGraph) InstantiationUtil.deserializeObject(open, ClassLoader.getSystemClassLoader());
                open.close();
                LOG.info("Recovered {}.", jobID);
                return submittedJobGraph;
            } catch (Exception e) {
                throw new Exception("Could not retrieve the submitted job graph state handle for " + pathForJob + "from the submitted job graph store.", e);
            }
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState(this.isRunning, "Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobID) {
        Preconditions.checkNotNull(jobID, "Job ID");
        return String.format("/%s", jobID);
    }

    public static JobID jobIdfromPath(String str) {
        return JobID.fromHexString(str);
    }
}
