package org.apache.flink.runtime.resourcemanager.resultpartitionmaster;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.ResultPartitionReport;
import org.apache.flink.runtime.taskexecutor.ResultPartitionStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/resultpartitionmaster/ResultPartitionMaster.class */
public class ResultPartitionMaster implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionMaster.class);
    private final ScheduledExecutor scheduledExecutor;
    private Executor mainThreadExecutor;
    private Time resultPartitionDisconnectJmTimeout;
    private ScheduledFuture<?> resultPartitionDisconnectJmTimeoutChecker;
    private boolean started;
    private final HashMap<InstanceID, TaskExecutorConnection> taskManagerRegistrations = new HashMap<>(4);
    private final Map<InstanceID, Map<JobID, Set<ResultPartitionStatus>>> resultPartitions = new HashMap();
    private final Map<JobID, RegisteredJobStatus> registeredJobs = new HashMap();

    public ResultPartitionMaster(ScheduledExecutor scheduledExecutor, Time time) {
        this.scheduledExecutor = scheduledExecutor;
        this.resultPartitionDisconnectJmTimeout = time;
    }

    public void start(Executor executor) {
        this.started = true;
        this.mainThreadExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.resultPartitionDisconnectJmTimeoutChecker = this.scheduledExecutor.scheduleWithFixedDelay(() -> {
            this.mainThreadExecutor.execute(this::checkResultPartitionDisconnectJmTimeout);
        }, this.resultPartitionDisconnectJmTimeout.toMilliseconds(), this.resultPartitionDisconnectJmTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public void suspend() {
        LOG.info("Suspending the ResultPartitionMaster");
        if (this.resultPartitionDisconnectJmTimeoutChecker != null) {
            this.resultPartitionDisconnectJmTimeoutChecker.cancel(false);
            this.resultPartitionDisconnectJmTimeoutChecker = null;
        }
        Iterator it = new ArrayList(this.taskManagerRegistrations.keySet()).iterator();
        while (it.hasNext()) {
            unregisterTaskManager((InstanceID) it.next());
        }
        this.started = false;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Close the result partition master");
        suspend();
    }

    public void registerTaskManager(TaskExecutorConnection taskExecutorConnection, ResultPartitionReport resultPartitionReport) {
        checkInit();
        LOG.info("Registering TaskManager {} under {} at the ResultPartitionMaster.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
        if (!this.taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
            this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskExecutorConnection);
        }
        reportResultPartitionStatus(taskExecutorConnection.getInstanceID(), resultPartitionReport);
    }

    public boolean unregisterTaskManager(InstanceID instanceID) {
        if (null != this.taskManagerRegistrations.remove(instanceID)) {
            this.resultPartitions.remove(instanceID);
            return true;
        }
        LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceID);
        return false;
    }

    public void reportResultPartitionStatus(InstanceID instanceID, ResultPartitionReport resultPartitionReport) {
        checkInit();
        LOG.debug("Received result partition report from instance {}.", instanceID);
        Map<JobID, Set<ResultPartitionStatus>> map = this.resultPartitions.get(instanceID);
        if (map != null) {
            map.clear();
        } else {
            map = new HashMap();
            this.resultPartitions.put(instanceID, map);
        }
        Iterator<ResultPartitionStatus> it = resultPartitionReport.iterator();
        while (it.hasNext()) {
            ResultPartitionStatus next = it.next();
            map.computeIfAbsent(next.getJobId(), jobID -> {
                return new HashSet();
            }).add(next);
        }
    }

    public void addFinishedResultPartition(InstanceID instanceID, ResultPartitionStatus resultPartitionStatus) {
        checkInit();
        Map<JobID, Set<ResultPartitionStatus>> map = this.resultPartitions.get(instanceID);
        if (map != null) {
            map.computeIfAbsent(resultPartitionStatus.getJobId(), jobID -> {
                return new HashSet();
            }).add(resultPartitionStatus);
        }
    }

    public void releaseResultPartitions(JobID jobID) {
        checkInit();
        this.taskManagerRegistrations.forEach((instanceID, taskExecutorConnection) -> {
            Map<JobID, Set<ResultPartitionStatus>> map = this.resultPartitions.get(instanceID);
            if (map == null || !map.containsKey(jobID)) {
                return;
            }
            map.remove(jobID);
            taskExecutorConnection.getTaskExecutorGateway().releaseResultPartitions(jobID);
        });
    }

    public void registerJob(JobID jobID) {
        checkInit();
        this.registeredJobs.compute(jobID, (jobID2, registeredJobStatus) -> {
            return registeredJobStatus == null ? new RegisteredJobStatus(jobID) : registeredJobStatus;
        }).markOnline();
    }

    public void unregisterJob(JobID jobID) {
        checkInit();
        RegisteredJobStatus registeredJobStatus = this.registeredJobs.get(jobID);
        if (registeredJobStatus != null) {
            registeredJobStatus.markOffline();
        } else {
            LOG.warn("Unregistered a job that has not been registered before, job id is " + jobID);
        }
    }

    private void checkResultPartitionDisconnectJmTimeout() {
        checkInit();
        HashSet hashSet = new HashSet();
        this.registeredJobs.forEach((jobID, registeredJobStatus) -> {
            if (registeredJobStatus.isTimeout(this.resultPartitionDisconnectJmTimeout.toMilliseconds())) {
                hashSet.add(jobID);
            }
        });
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.registeredJobs.remove((JobID) it.next());
        }
        this.resultPartitions.forEach((instanceID, map) -> {
            TaskExecutorConnection taskExecutorConnection = this.taskManagerRegistrations.get(instanceID);
            if (taskExecutorConnection == null) {
                LOG.warn("No connection found with task manager " + instanceID + ", but there are remaining result partitions recorded");
                return;
            }
            HashSet hashSet2 = new HashSet();
            Iterator it2 = map.keySet().iterator();
            while (it2.hasNext()) {
                JobID jobID2 = (JobID) it2.next();
                if (!this.registeredJobs.containsKey(jobID2)) {
                    hashSet2.add(jobID2);
                    it2.remove();
                }
            }
            Iterator it3 = hashSet2.iterator();
            while (it3.hasNext()) {
                taskExecutorConnection.getTaskExecutorGateway().releaseResultPartitions((JobID) it3.next());
            }
        });
    }

    @VisibleForTesting
    HashMap<InstanceID, TaskExecutorConnection> getTaskManagerRegistrations() {
        return this.taskManagerRegistrations;
    }

    @VisibleForTesting
    Map<InstanceID, Map<JobID, Set<ResultPartitionStatus>>> getResultPartitions() {
        return this.resultPartitions;
    }

    private void checkInit() {
        Preconditions.checkState(this.started, "The slot manager has not been started.");
    }
}
