package org.apache.flink.runtime.dispatcher;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.LeaderShipLostHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/StandaloneDispatcher.class */
public class StandaloneDispatcher extends Dispatcher {
    private long clusterIdleStartTimestamp;
    private Time clusterIdleTimeout;

    public StandaloneDispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String str2, ArchivedExecutionGraphStore archivedExecutionGraphStore, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String str3, HistoryServerArchivist historyServerArchivist, LeaderShipLostHandler leaderShipLostHandler) throws Exception {
        super(rpcService, str, configuration, highAvailabilityServices, highAvailabilityServices.getSubmittedJobGraphStore(), resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, str2, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, str3, historyServerArchivist, leaderShipLostHandler);
        this.clusterIdleStartTimestamp = -1L;
        this.clusterIdleTimeout = Time.milliseconds(configuration.getLong(CoreOptions.CLUSTER_IDLE_TIMEOUT));
        this.log.info("Init StandaloneDispatcher, clusterIdleTimeout: {}", this.clusterIdleTimeout);
    }

    @Override // org.apache.flink.runtime.dispatcher.Dispatcher, org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time time) {
        return this.clusterIdleTimeout.toMilliseconds() < 0 ? super.submitJob(jobGraph, time) : super.submitJob(jobGraph, time).thenApply(acknowledge -> {
            this.clusterIdleStartTimestamp = -1L;
            this.log.info("Reset the begin timestamp of delaying cluster termination to -1.");
            return acknowledge;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.dispatcher.Dispatcher
    public void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        super.jobReachedGloballyTerminalState(archivedExecutionGraph);
        if (this.clusterIdleTimeout.toMilliseconds() < 0) {
            return;
        }
        getNumNonGloballyTerminatedJobsFuture().thenAccept(l -> {
            this.log.info("Number of non-global-terminated jobs is {}.", l);
            if (l.longValue() == 0) {
                this.clusterIdleStartTimestamp = System.currentTimeMillis();
                scheduleRunAsync(this::terminateIdleApp, this.clusterIdleTimeout);
                this.log.info("Delaying cluster termination after {} and set cluster-idle-start-timestamp: {}.", this.clusterIdleTimeout, Long.valueOf(this.clusterIdleStartTimestamp));
            }
        });
    }

    private void terminateIdleApp() {
        this.log.info("Double check for delay termination, cluster-idle-start-timestamp: {}.", Long.valueOf(this.clusterIdleStartTimestamp));
        if (this.clusterIdleStartTimestamp <= 0 || System.currentTimeMillis() - this.clusterIdleStartTimestamp < this.clusterIdleTimeout.toMilliseconds()) {
            return;
        }
        getNumNonGloballyTerminatedJobsFuture().thenAccept(l -> {
            this.log.info("Number of non-global-terminated jobs in double check is {}.", l);
            if (l.longValue() == 0) {
                this.log.info("Start terminating this cluster.", l);
                getTerminationFuture().complete(null);
            }
        });
    }

    private CompletableFuture<Long> getNumNonGloballyTerminatedJobsFuture() {
        return getNonArchivedJobsFuture(RpcUtils.INF_TIMEOUT).thenApply(collection -> {
            return Long.valueOf(collection.stream().filter(jobStatus -> {
                return !jobStatus.isGloballyTerminalState();
            }).count());
        });
    }
}
