/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.LeaderShipLostHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;

public class StandaloneDispatcher
extends Dispatcher {
    private long clusterIdleStartTimestamp = -1L;
    private Time clusterIdleTimeout;

    public StandaloneDispatcher(RpcService rpcService, String endpointId, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String metricQueryServicePath, ArchivedExecutionGraphStore archivedExecutionGraphStore, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nullable String restAddress, HistoryServerArchivist historyServerArchivist, LeaderShipLostHandler leaderShipLostHandler) throws Exception {
        super(rpcService, endpointId, configuration, highAvailabilityServices, highAvailabilityServices.getSubmittedJobGraphStore(), resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, metricQueryServicePath, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, restAddress, historyServerArchivist, leaderShipLostHandler);
        this.clusterIdleTimeout = Time.milliseconds((long)configuration.getLong(CoreOptions.CLUSTER_IDLE_TIMEOUT));
        this.log.info("Init StandaloneDispatcher, clusterIdleTimeout: {}", (Object)this.clusterIdleTimeout);
    }

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        if (this.clusterIdleTimeout.toMilliseconds() < 0L) {
            return super.submitJob(jobGraph, timeout);
        }
        CompletableFuture<Acknowledge> submitJobFuture = super.submitJob(jobGraph, timeout);
        return submitJobFuture.thenApply(acknowledge -> {
            this.clusterIdleStartTimestamp = -1L;
            this.log.info("Reset the begin timestamp of delaying cluster termination to -1.");
            return acknowledge;
        });
    }

    @Override
    protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
        super.jobReachedGloballyTerminalState(archivedExecutionGraph);
        if (this.clusterIdleTimeout.toMilliseconds() < 0L) {
            return;
        }
        CompletableFuture<Long> numNonGloballyTerminatedJobsFuture = this.getNumNonGloballyTerminatedJobsFuture();
        numNonGloballyTerminatedJobsFuture.thenAccept(num -> {
            this.log.info("Number of non-global-terminated jobs is {}.", num);
            if (num == 0L) {
                this.clusterIdleStartTimestamp = System.currentTimeMillis();
                this.scheduleRunAsync(this::terminateIdleApp, this.clusterIdleTimeout);
                this.log.info("Delaying cluster termination after {} and set cluster-idle-start-timestamp: {}.", (Object)this.clusterIdleTimeout, (Object)this.clusterIdleStartTimestamp);
            }
        });
    }

    private void terminateIdleApp() {
        this.log.info("Double check for delay termination, cluster-idle-start-timestamp: {}.", (Object)this.clusterIdleStartTimestamp);
        if (this.clusterIdleStartTimestamp > 0L && System.currentTimeMillis() - this.clusterIdleStartTimestamp >= this.clusterIdleTimeout.toMilliseconds()) {
            CompletableFuture<Long> numNonGloballyTerminatedJobsFuture = this.getNumNonGloballyTerminatedJobsFuture();
            numNonGloballyTerminatedJobsFuture.thenAccept(num -> {
                this.log.info("Number of non-global-terminated jobs in double check is {}.", num);
                if (num == 0L) {
                    this.log.info("Start terminating this cluster.", num);
                    this.getTerminationFuture().complete(null);
                }
            });
        }
    }

    private CompletableFuture<Long> getNumNonGloballyTerminatedJobsFuture() {
        CompletableFuture<Collection<JobStatus>> nonArchivedJobsFuture = this.getNonArchivedJobsFuture(RpcUtils.INF_TIMEOUT);
        return nonArchivedJobsFuture.thenApply(nonArchivedJobs -> nonArchivedJobs.stream().filter(jobStatus -> !jobStatus.isGloballyTerminalState()).count());
    }
}

