/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.resultpartitionmaster;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.resultpartitionmaster.RegisteredJobStatus;
import org.apache.flink.runtime.taskexecutor.ResultPartitionReport;
import org.apache.flink.runtime.taskexecutor.ResultPartitionStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultPartitionMaster
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionMaster.class);
    private final HashMap<InstanceID, TaskExecutorConnection> taskManagerRegistrations = new HashMap(4);
    private final Map<InstanceID, Map<JobID, Set<ResultPartitionStatus>>> resultPartitions = new HashMap<InstanceID, Map<JobID, Set<ResultPartitionStatus>>>();
    private final ScheduledExecutor scheduledExecutor;
    private final Map<JobID, RegisteredJobStatus> registeredJobs = new HashMap<JobID, RegisteredJobStatus>();
    private Executor mainThreadExecutor;
    private Time resultPartitionDisconnectJmTimeout;
    private ScheduledFuture<?> resultPartitionDisconnectJmTimeoutChecker;
    private boolean started;

    public ResultPartitionMaster(ScheduledExecutor scheduledExecutor, Time resultPartitionDisconnectJmTimeout) {
        this.scheduledExecutor = scheduledExecutor;
        this.resultPartitionDisconnectJmTimeout = resultPartitionDisconnectJmTimeout;
    }

    public void start(Executor newMainThreadExecutor) {
        this.started = true;
        this.mainThreadExecutor = (Executor)Preconditions.checkNotNull((Object)newMainThreadExecutor);
        this.resultPartitionDisconnectJmTimeoutChecker = this.scheduledExecutor.scheduleWithFixedDelay(() -> this.mainThreadExecutor.execute(this::checkResultPartitionDisconnectJmTimeout), this.resultPartitionDisconnectJmTimeout.toMilliseconds(), this.resultPartitionDisconnectJmTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public void suspend() {
        LOG.info("Suspending the ResultPartitionMaster");
        if (this.resultPartitionDisconnectJmTimeoutChecker != null) {
            this.resultPartitionDisconnectJmTimeoutChecker.cancel(false);
            this.resultPartitionDisconnectJmTimeoutChecker = null;
        }
        ArrayList<InstanceID> registeredTaskManagers = new ArrayList<InstanceID>(this.taskManagerRegistrations.keySet());
        for (InstanceID registeredTaskManager : registeredTaskManagers) {
            this.unregisterTaskManager(registeredTaskManager);
        }
        this.started = false;
    }

    @Override
    public void close() throws Exception {
        LOG.info("Close the result partition master");
        this.suspend();
    }

    public void registerTaskManager(TaskExecutorConnection taskExecutorConnection, ResultPartitionReport initialResultPartitionReport) {
        this.checkInit();
        LOG.info("Registering TaskManager {} under {} at the ResultPartitionMaster.", (Object)taskExecutorConnection.getResourceID(), (Object)taskExecutorConnection.getInstanceID());
        if (!this.taskManagerRegistrations.containsKey((Object)taskExecutorConnection.getInstanceID())) {
            this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskExecutorConnection);
        }
        this.reportResultPartitionStatus(taskExecutorConnection.getInstanceID(), initialResultPartitionReport);
    }

    public boolean unregisterTaskManager(InstanceID instanceId) {
        TaskExecutorConnection taskManagerRegistration = this.taskManagerRegistrations.remove((Object)instanceId);
        if (null != taskManagerRegistration) {
            this.resultPartitions.remove((Object)instanceId);
            return true;
        }
        LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", (Object)instanceId);
        return false;
    }

    public void reportResultPartitionStatus(InstanceID instanceId, ResultPartitionReport resultPartitionReport) {
        this.checkInit();
        LOG.debug("Received result partition report from instance {}.", (Object)instanceId);
        Map<JobID, Set<ResultPartitionStatus>> resultPartitionsOnTm = this.resultPartitions.get((Object)instanceId);
        if (resultPartitionsOnTm != null) {
            resultPartitionsOnTm.clear();
        } else {
            resultPartitionsOnTm = new HashMap<JobID, Set<ResultPartitionStatus>>();
            this.resultPartitions.put(instanceId, resultPartitionsOnTm);
        }
        for (ResultPartitionStatus status : resultPartitionReport) {
            resultPartitionsOnTm.computeIfAbsent(status.getJobId(), k -> new HashSet()).add(status);
        }
    }

    public void addFinishedResultPartition(InstanceID instanceId, ResultPartitionStatus resultPartitionStatus) {
        this.checkInit();
        Map<JobID, Set<ResultPartitionStatus>> resultPartitionsOnTm = this.resultPartitions.get((Object)instanceId);
        if (resultPartitionsOnTm != null) {
            resultPartitionsOnTm.computeIfAbsent(resultPartitionStatus.getJobId(), k -> new HashSet()).add(resultPartitionStatus);
        }
    }

    public void releaseResultPartitions(JobID jobId) {
        this.checkInit();
        this.taskManagerRegistrations.forEach((instanceId, connection) -> {
            Map<JobID, Set<ResultPartitionStatus>> resultPartitionsOnTm = this.resultPartitions.get(instanceId);
            if (resultPartitionsOnTm != null && resultPartitionsOnTm.containsKey(jobId)) {
                resultPartitionsOnTm.remove(jobId);
                connection.getTaskExecutorGateway().releaseResultPartitions(jobId);
            }
        });
    }

    public void registerJob(JobID jobId) {
        this.checkInit();
        RegisteredJobStatus jobStatus = this.registeredJobs.compute(jobId, (k, v) -> v == null ? new RegisteredJobStatus(jobId) : v);
        jobStatus.markOnline();
    }

    public void unregisterJob(JobID jobId) {
        this.checkInit();
        RegisteredJobStatus jobStatus = this.registeredJobs.get(jobId);
        if (jobStatus != null) {
            jobStatus.markOffline();
        } else {
            LOG.warn("Unregistered a job that has not been registered before, job id is " + jobId);
        }
    }

    private void checkResultPartitionDisconnectJmTimeout() {
        this.checkInit();
        HashSet timeoutJobs = new HashSet();
        this.registeredJobs.forEach((jobId, jobStatus) -> {
            if (jobStatus.isTimeout(this.resultPartitionDisconnectJmTimeout.toMilliseconds())) {
                timeoutJobs.add(jobId);
            }
        });
        for (JobID jobId2 : timeoutJobs) {
            this.registeredJobs.remove(jobId2);
        }
        this.resultPartitions.forEach((instanceID, resultPartitionsOnTm) -> {
            TaskExecutorConnection connection = this.taskManagerRegistrations.get(instanceID);
            if (connection != null) {
                HashSet<JobID> jobsToRemove = new HashSet<JobID>();
                Iterator jobsIterator = resultPartitionsOnTm.keySet().iterator();
                while (jobsIterator.hasNext()) {
                    JobID jobId = (JobID)jobsIterator.next();
                    if (this.registeredJobs.containsKey(jobId)) continue;
                    jobsToRemove.add(jobId);
                    jobsIterator.remove();
                }
                for (JobID jobId : jobsToRemove) {
                    connection.getTaskExecutorGateway().releaseResultPartitions(jobId);
                }
            } else {
                LOG.warn("No connection found with task manager " + (Object)instanceID + ", but there are remaining result partitions recorded");
            }
        });
    }

    @VisibleForTesting
    HashMap<InstanceID, TaskExecutorConnection> getTaskManagerRegistrations() {
        return this.taskManagerRegistrations;
    }

    @VisibleForTesting
    Map<InstanceID, Map<JobID, Set<ResultPartitionStatus>>> getResultPartitions() {
        return this.resultPartitions;
    }

    private void checkInit() {
        Preconditions.checkState((boolean)this.started, (Object)"The slot manager has not been started.");
    }
}

