/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.deployment.ResultPartitionLocationTrackerProxy;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;

public class ExecutionGraphBuilder {
    public static final String PARALLELISM_AUTO_MAX_ERROR_MESSAGE = "PARALLELISM_AUTO_MAX is no longer supported. Please specify a concrete value for the parallelism.";

    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior, JobGraph jobGraph, Configuration jobManagerConfig, ScheduledExecutorService futureExecutor, Executor ioExecutor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time rpcTimeout, RestartStrategy restartStrategy, MetricGroup metrics, BlobWriter blobWriter, ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy, Time allocationTimeout, Logger log) throws JobExecutionException, JobException {
        return ExecutionGraphBuilder.buildGraph(prior, jobGraph, jobManagerConfig, futureExecutor, ioExecutor, slotProvider, classLoader, recoveryFactory, rpcTimeout, restartStrategy, metrics, -1, blobWriter, resultPartitionLocationTrackerProxy, allocationTimeout, log);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Deprecated
    public static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior, JobGraph jobGraph, Configuration jobManagerConfig, ScheduledExecutorService futureExecutor, Executor ioExecutor, SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time rpcTimeout, RestartStrategy restartStrategy, MetricGroup metrics, int parallelismForAutoMax, BlobWriter blobWriter, ResultPartitionLocationTrackerProxy resultPartitionLocationTrackerProxy, Time allocationTimeout, Logger log) throws JobExecutionException, JobException {
        JobCheckpointingSettings snapshotSettings;
        ExecutionConfig executionConfig;
        ExecutionGraph executionGraph;
        Preconditions.checkNotNull((Object)jobGraph, (String)"job graph cannot be null");
        String jobName = jobGraph.getName();
        JobID jobId = jobGraph.getJobID();
        FailoverStrategy.Factory failoverStrategy = FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
        JobInformation jobInformation = new JobInformation(jobId, jobName, jobGraph.getJobVersion(), jobGraph.getSerializedExecutionConfig(), jobGraph.getJobConfiguration(), jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
        try {
            executionGraph = prior != null ? prior : new ExecutionGraph(jobInformation, futureExecutor, ioExecutor, rpcTimeout, restartStrategy, failoverStrategy, slotProvider, classLoader, blobWriter, resultPartitionLocationTrackerProxy, allocationTimeout, metrics, jobManagerConfig);
        }
        catch (IOException e) {
            throw new JobException("Could not create the ExecutionGraph.", e);
        }
        try {
            executionConfig = (ExecutionConfig)jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader);
        }
        catch (IOException | ClassNotFoundException e) {
            throw new JobException("Failed to deserialize the execution config.", e);
        }
        executionGraph.setPerTaskInputSplitsLimitAsAverageMultiplier(executionConfig.getPerTaskInputSplitsLimitAsAverageMultiplier());
        executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
        try {
            executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        }
        catch (Throwable t) {
            log.warn("Cannot create JSON plan for job", t);
            executionGraph.setJsonPlan("{}");
        }
        long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", (Object)jobName, (Object)jobId);
        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId, "The vertex " + (Object)((Object)vertex.getID()) + " (" + vertex.getName() + ") has no invokable class.");
            }
            if (vertex.getParallelism() == Integer.MAX_VALUE) {
                if (parallelismForAutoMax < 0) {
                    throw new JobSubmissionException(jobId, PARALLELISM_AUTO_MAX_ERROR_MESSAGE);
                }
                vertex.setParallelism(parallelismForAutoMax);
            }
            if (vertex.getParallelism() <= 0) {
                vertex.setParallelism(jobGraph.getJobConfiguration().getInteger(CoreOptions.DEFAULT_PARALLELISM));
            }
            try {
                vertex.initializeOnMaster(classLoader);
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobId, "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
            }
        }
        log.info("Successfully ran initialization on master in {} ms.", (Object)((System.nanoTime() - initMasterStart) / 1000000L));
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", new Object[]{sortedTopology.size(), jobName, jobId});
        }
        executionGraph.attachJobGraph(sortedTopology);
        if (log.isDebugEnabled()) {
            log.debug("Successfully created execution graph from job graph {} ({}).", (Object)jobName, (Object)jobId);
        }
        if ((snapshotSettings = jobGraph.getCheckpointingSettings()) != null) {
            List<MasterTriggerRestoreHook<?>> hooks;
            StateBackend rootBackend;
            StateBackend applicationConfiguredBackend;
            CheckpointIDCounter checkpointIdCounter;
            CompletedCheckpointStore completedCheckpoints;
            List<ExecutionJobVertex> triggerVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
            List<ExecutionJobVertex> ackVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
            List<ExecutionJobVertex> confirmVertices = ExecutionGraphBuilder.idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
            try {
                int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
                if (maxNumberOfCheckpointsToRetain <= 0) {
                    log.warn("The setting for '{} : {}' is invalid. Using default value of {}", new Object[]{CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), maxNumberOfCheckpointsToRetain, CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()});
                    maxNumberOfCheckpointsToRetain = (Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
                }
                completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
                checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
            }
            catch (Exception e) {
                throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
            }
            int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
            CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize, ackVertices, snapshotSettings.getCheckpointCoordinatorConfiguration(), metrics);
            String externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
            SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            } else {
                try {
                    applicationConfiguredBackend = (StateBackend)serializedAppConfigured.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);
                }
            }
            try {
                rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);
            }
            catch (IOException | IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
            }
            SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            } else {
                MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = (MasterTriggerRestoreHook.Factory[])serializedHooks.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }
                Thread thread = Thread.currentThread();
                ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);
                try {
                    hooks = new ArrayList(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                }
                finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }
            CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
            executionGraph.enableCheckpointing(chkConfig.getCheckpointInterval(), chkConfig.getCheckpointTimeout(), chkConfig.getMinPauseBetweenCheckpoints(), chkConfig.getMaxConcurrentCheckpoints(), chkConfig.getCheckpointRetentionPolicy(), triggerVertices, ackVertices, confirmVertices, hooks, checkpointIdCounter, completedCheckpoints, rootBackend, checkpointStatsTracker);
        }
        metrics.gauge("restartingTime", (Gauge)new RestartTimeGauge(executionGraph));
        metrics.gauge("downtime", (Gauge)new DownTimeGauge(executionGraph));
        metrics.gauge("uptime", (Gauge)new UpTimeGauge(executionGraph));
        metrics.gauge("fullRestarts", (Gauge)new NumberOfFullRestartsGauge(executionGraph));
        executionGraph.getFailoverStrategy().registerMetrics(metrics);
        executionGraph.setUpdatePartitionInfoSendInterval(jobManagerConfig.getLong(JobManagerOptions.UPDATE_PARTITION_INFO_SEND_INTERVAL));
        return executionGraph;
    }

    private static List<ExecutionJobVertex> idToVertex(List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException {
        ArrayList<ExecutionJobVertex> result = new ArrayList<ExecutionJobVertex>(jobVertices.size());
        for (JobVertexID id : jobVertices) {
            ExecutionJobVertex vertex = executionGraph.getJobVertex(id);
            if (vertex != null) {
                result.add(vertex);
                continue;
            }
            throw new IllegalArgumentException("The snapshot checkpointing settings refer to non-existent vertex " + (Object)((Object)id));
        }
        return result;
    }

    private ExecutionGraphBuilder() {
    }
}

