package org.apache.flink.table.plan.resource.schedule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.event.OperatorEvent;
import org.apache.flink.runtime.event.ResultPartitionConsumableEvent;
import org.apache.flink.runtime.event.TaskFailoverEvent;
import org.apache.flink.runtime.event.TaskStateChangedEvent;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.JobScheduler;
import org.apache.flink.runtime.scheduler.LogicalTask;
import org.apache.flink.runtime.scheduler.SchedulerEventHandler;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.plan.resource.BatchExecRelStage;
import org.apache.flink.table.plan.resource.RelRunningUnit;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/plan/resource/schedule/RunningUnitSchedulerEventHandler.class */
public class RunningUnitSchedulerEventHandler implements SchedulerEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    public static final String RUNNING_UNIT_CONF_KEY = "runningUnit.key";
    private LogicalJobVertexRunningUnit leftJobVertexRunningUnit;
    private Map<JobVertexID, String> jobVerticesNameMap;
    private JobScheduler scheduler;
    private Map<JobVertexID, LogicalJobVertex> jobVertices = new LinkedHashMap();
    private List<LogicalJobVertexRunningUnit> allJobVertexRunningUnitList = new LinkedList();
    private AtomicInteger scheduledUnitNum = new AtomicInteger();
    private LinkedList<LogicalJobVertexRunningUnit> jobVertexRunningUnitQueue = new LinkedList<>();
    private Set<LogicalJobVertexRunningUnit> scheduledJobVertexRunningUnitSet = Collections.newSetFromMap(new IdentityHashMap());
    private LogicalJobVertexRunningUnit currentScheduledUnit = null;
    private Map<RelStageID, List<LogicalJobVertexRunningUnit>> eventRunningUnitMap = new LinkedHashMap();
    private Map<BatchExecRelStage, List<RelRunningUnit>> stageUnitMap = new IdentityHashMap();
    private Map<RelRunningUnit, Set<BatchExecRelStage>> unitStageMap = new LinkedHashMap();

    public void open(JobScheduler jobScheduler, Configuration configuration, ClassLoader classLoader) {
        try {
            open((Map) InstantiationUtil.readObjectFromConfig(configuration, "jobVertexToNameMap", classLoader), jobScheduler, (Map) InstantiationUtil.readObjectFromConfig(configuration, "jobVertexToStreamNodeMap", classLoader), (List) InstantiationUtil.readObjectFromConfig(configuration, RUNNING_UNIT_CONF_KEY, classLoader));
        } catch (IOException e) {
            LOG.warn("catch IOException.", e);
        } catch (ClassNotFoundException e2) {
            LOG.warn("catch ClassNotFoundException.", e2);
        }
    }

    @VisibleForTesting
    public void open(Map<JobVertexID, String> map, JobScheduler jobScheduler, Map<JobVertexID, ArrayList<Integer>> map2, List<RelRunningUnit> list) {
        this.jobVerticesNameMap = map;
        this.scheduler = jobScheduler;
        Map<Integer, JobVertexID> reverseMap = reverseMap(map2);
        buildJobVertices();
        buildJobRunningUnits(list, reverseMap);
    }

    private void buildJobRunningUnits(List<RelRunningUnit> list, Map<Integer, JobVertexID> map) {
        HashMap hashMap = new HashMap();
        for (RelRunningUnit relRunningUnit : list) {
            List<BatchExecRelStage> allRelStages = relRunningUnit.getAllRelStages();
            Iterator<BatchExecRelStage> it = allRelStages.iterator();
            while (it.hasNext()) {
                this.stageUnitMap.computeIfAbsent(it.next(), batchExecRelStage -> {
                    return new LinkedList();
                }).add(relRunningUnit);
            }
            Set<BatchExecRelStage> newSetFromMap = Collections.newSetFromMap(new IdentityHashMap());
            newSetFromMap.addAll(allRelStages);
            this.unitStageMap.put(relRunningUnit, newSetFromMap);
        }
        avoidDeadLockDepend(list);
        Iterator<RelRunningUnit> it2 = list.iterator();
        while (it2.hasNext()) {
            List<BatchExecRelStage> allRelStages2 = it2.next().getAllRelStages();
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            Iterator<BatchExecRelStage> it3 = allRelStages2.iterator();
            while (it3.hasNext()) {
                Iterator<Integer> it4 = it3.next().getTransformationIDList().iterator();
                while (it4.hasNext()) {
                    linkedHashSet.add(this.jobVertices.get(map.get(it4.next())));
                }
            }
            LogicalJobVertexRunningUnit logicalJobVertexRunningUnit = new LogicalJobVertexRunningUnit(linkedHashSet);
            Iterator<BatchExecRelStage> it5 = allRelStages2.iterator();
            while (it5.hasNext()) {
                for (BatchExecRelStage batchExecRelStage2 : it5.next().getDependStageList()) {
                    RelStageID relStageID = new RelStageID(batchExecRelStage2.getRelID().intValue(), batchExecRelStage2.getStageID());
                    this.eventRunningUnitMap.computeIfAbsent(relStageID, relStageID2 -> {
                        return new LinkedList();
                    }).add(logicalJobVertexRunningUnit);
                    logicalJobVertexRunningUnit.addDependRelStage(relStageID);
                }
            }
            Iterator<LogicalJobVertex> it6 = logicalJobVertexRunningUnit.getJobVertexSet().iterator();
            while (it6.hasNext()) {
                hashMap.put(it6.next().getJobVertexID(), logicalJobVertexRunningUnit);
            }
            this.allJobVertexRunningUnitList.add(logicalJobVertexRunningUnit);
        }
        LinkedHashSet linkedHashSet2 = new LinkedHashSet();
        for (JobVertexID jobVertexID : this.jobVertices.keySet()) {
            if (!hashMap.containsKey(jobVertexID)) {
                linkedHashSet2.add(this.jobVertices.get(jobVertexID));
            }
        }
        this.leftJobVertexRunningUnit = new LogicalJobVertexRunningUnit(linkedHashSet2);
    }

    private void avoidDeadLockDepend(List<RelRunningUnit> list) {
        for (RelRunningUnit relRunningUnit : list) {
            for (BatchExecRelStage batchExecRelStage : this.unitStageMap.get(relRunningUnit)) {
                LinkedList linkedList = new LinkedList();
                for (BatchExecRelStage batchExecRelStage2 : batchExecRelStage.getDependStageList()) {
                    Iterator<RelRunningUnit> it = this.stageUnitMap.get(batchExecRelStage2).iterator();
                    while (it.hasNext()) {
                        if (loopDepend(it.next(), relRunningUnit, new HashSet())) {
                            linkedList.add(batchExecRelStage2);
                        }
                    }
                }
                Iterator it2 = linkedList.iterator();
                while (it2.hasNext()) {
                    batchExecRelStage.removeDependStage((BatchExecRelStage) it2.next());
                }
            }
        }
    }

    private boolean loopDepend(RelRunningUnit relRunningUnit, RelRunningUnit relRunningUnit2, Set<RelRunningUnit> set) {
        if (relRunningUnit == relRunningUnit2) {
            return true;
        }
        if (set.contains(relRunningUnit)) {
            return false;
        }
        set.add(relRunningUnit);
        Iterator<BatchExecRelStage> it = this.unitStageMap.get(relRunningUnit).iterator();
        while (it.hasNext()) {
            Iterator<BatchExecRelStage> it2 = it.next().getDependStageList().iterator();
            while (it2.hasNext()) {
                Iterator<RelRunningUnit> it3 = this.stageUnitMap.get(it2.next()).iterator();
                while (it3.hasNext()) {
                    if (loopDepend(it3.next(), relRunningUnit2, set)) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

    private void buildJobVertices() {
        Collection allTasks = this.scheduler.getAllTasks();
        LinkedHashSet<JobVertexID> linkedHashSet = new LinkedHashSet();
        Iterator it = allTasks.iterator();
        while (it.hasNext()) {
            linkedHashSet.add(((LogicalTask) it.next()).getVertexId());
        }
        for (JobVertexID jobVertexID : linkedHashSet) {
            this.jobVertices.put(jobVertexID, new LogicalJobVertex(jobVertexID, this.scheduler.getTasks(jobVertexID), this.jobVerticesNameMap.get(jobVertexID)));
        }
    }

    private Map<Integer, JobVertexID> reverseMap(Map<JobVertexID, ArrayList<Integer>> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<JobVertexID, ArrayList<Integer>> entry : map.entrySet()) {
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                linkedHashMap.put(it.next(), entry.getKey());
            }
        }
        return linkedHashMap;
    }

    public void close() {
    }

    public void onSchedulerStarted() {
        for (LogicalJobVertexRunningUnit logicalJobVertexRunningUnit : this.allJobVertexRunningUnitList) {
            if (logicalJobVertexRunningUnit.dependEventsReady()) {
                runningUnitEventsReady(logicalJobVertexRunningUnit);
            }
        }
    }

    public void onResultPartitionConsumable(ResultPartitionConsumableEvent resultPartitionConsumableEvent) {
    }

    public void onTaskStateChanged(TaskStateChangedEvent taskStateChangedEvent) {
        if (taskStateChangedEvent.getTaskNewExecutionState() == ExecutionState.DEPLOYING) {
            LogicalJobVertexRunningUnit logicalJobVertexRunningUnit = this.currentScheduledUnit;
            this.jobVertices.get(taskStateChangedEvent.getTask().getVertexId()).taskScheduled();
            if (logicalJobVertexRunningUnit.allTasksDeploying()) {
                finishCurrentScheduling(logicalJobVertexRunningUnit);
            }
        }
    }

    public void onOperatorEvent(OperatorEvent operatorEvent) {
        if (operatorEvent instanceof RelStageDoneEvent) {
            RelStageDoneEvent relStageDoneEvent = (RelStageDoneEvent) operatorEvent;
            LOG.info("receive event from : " + this.jobVerticesNameMap.get(operatorEvent.getVertexID()) + ", event id: " + relStageDoneEvent.getRelStageID());
            RelStageID relStageID = relStageDoneEvent.getRelStageID();
            List<LogicalJobVertexRunningUnit> list = this.eventRunningUnitMap.get(relStageID);
            if (list == null) {
                return;
            }
            for (LogicalJobVertexRunningUnit logicalJobVertexRunningUnit : list) {
                logicalJobVertexRunningUnit.receiveEventID(relStageID);
                if (logicalJobVertexRunningUnit.dependEventsReady()) {
                    runningUnitEventsReady(logicalJobVertexRunningUnit);
                }
            }
        }
    }

    private synchronized void finishCurrentScheduling(LogicalJobVertexRunningUnit logicalJobVertexRunningUnit) {
        if (logicalJobVertexRunningUnit != this.currentScheduledUnit || unitDeployedAndCheckAllDeployed()) {
            return;
        }
        do {
            this.currentScheduledUnit = this.jobVertexRunningUnitQueue.pollFirst();
            if (this.currentScheduledUnit == null || !this.currentScheduledUnit.allTasksDeploying() || !unitDeployedAndCheckAllDeployed()) {
                if (this.currentScheduledUnit == null) {
                    break;
                }
            } else {
                return;
            }
        } while (this.currentScheduledUnit.allTasksDeploying());
        if (this.currentScheduledUnit != null) {
            scheduleRunningUnit(this.currentScheduledUnit);
        }
    }

    private synchronized void runningUnitEventsReady(LogicalJobVertexRunningUnit logicalJobVertexRunningUnit) {
        if (this.scheduledJobVertexRunningUnitSet.add(logicalJobVertexRunningUnit)) {
            if (logicalJobVertexRunningUnit.allTasksDeploying()) {
                unitDeployedAndCheckAllDeployed();
            } else if (this.currentScheduledUnit == null) {
                scheduleRunningUnit(logicalJobVertexRunningUnit);
            } else {
                this.jobVertexRunningUnitQueue.add(logicalJobVertexRunningUnit);
            }
        }
    }

    private synchronized boolean unitDeployedAndCheckAllDeployed() {
        if (this.scheduledUnitNum.incrementAndGet() == this.allJobVertexRunningUnitList.size() && !this.leftJobVertexRunningUnit.allTasksDeploying()) {
            scheduleRunningUnit(this.leftJobVertexRunningUnit);
        }
        return this.scheduledUnitNum.get() >= this.allJobVertexRunningUnitList.size();
    }

    private synchronized void scheduleRunningUnit(LogicalJobVertexRunningUnit logicalJobVertexRunningUnit) {
        this.currentScheduledUnit = logicalJobVertexRunningUnit;
        LOG.info("begin to schedule runningUnit: ");
        Iterator<LogicalJobVertex> it = this.currentScheduledUnit.getJobVertexSet().iterator();
        while (it.hasNext()) {
            LOG.info(it.next().toString());
        }
        this.scheduler.scheduleTasks(logicalJobVertexRunningUnit.getToScheduleTasks());
    }

    public void onTaskFailover(TaskFailoverEvent taskFailoverEvent) {
    }

    public boolean scheduleInBatch() {
        return false;
    }
}
