/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.preaggregatedaccumulators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.preaggregatedaccumulators.AccumulatorAggregationManager;
import org.apache.flink.runtime.preaggregatedaccumulators.CommitAccumulator;
import org.apache.flink.runtime.taskexecutor.JobManagerConnection;
import org.apache.flink.runtime.taskexecutor.JobManagerTable;
import org.apache.flink.util.Preconditions;

public class RPCBasedAccumulatorAggregationManager
implements AccumulatorAggregationManager {
    private final JobManagerTable jobManagerTable;
    private final Map<JobID, Map<String, AggregatedAccumulator>> perJobAggregatedAccumulators = new HashMap<JobID, Map<String, AggregatedAccumulator>>();
    private final Object queryLock = new Object();
    @GuardedBy(value="queryLock")
    private final Map<JobID, Map<String, List<CompletableFuture<Accumulator>>>> perJobUnfulfilledUserQueryFutures = new HashMap<JobID, Map<String, List<CompletableFuture<Accumulator>>>>();
    @GuardedBy(value="queryLock")
    private final Map<JobID, Map<String, Object>> perJobCachedQueryResults = new HashMap<JobID, Map<String, Object>>();

    public RPCBasedAccumulatorAggregationManager(JobManagerTable jobManagerTable) {
        this.jobManagerTable = jobManagerTable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerPreAggregatedAccumulator(JobID jobId, JobVertexID jobVertexId, int subtaskIndex, String name) {
        Map<JobID, Map<String, AggregatedAccumulator>> map = this.perJobAggregatedAccumulators;
        synchronized (map) {
            AggregatedAccumulator aggregatedAccumulator = this.perJobAggregatedAccumulators.computeIfAbsent(jobId, k -> new HashMap()).computeIfAbsent(name, k -> new AggregatedAccumulator(jobVertexId));
            aggregatedAccumulator.registerForTask(jobVertexId, subtaskIndex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitPreAggregatedAccumulator(JobID jobId, JobVertexID jobVertexId, int subtaskIndex, String name, Accumulator value) {
        Map<JobID, Map<String, AggregatedAccumulator>> map = this.perJobAggregatedAccumulators;
        synchronized (map) {
            Map<String, AggregatedAccumulator> currentJobAccumulators = this.perJobAggregatedAccumulators.get(jobId);
            AggregatedAccumulator aggregatedAccumulator = currentJobAccumulators != null ? currentJobAccumulators.get(name) : null;
            Preconditions.checkState((aggregatedAccumulator != null ? 1 : 0) != 0, (Object)"The committed accumulator does not exist.");
            aggregatedAccumulator.commitForTask(jobVertexId, subtaskIndex, value);
            if (aggregatedAccumulator.isAllCommitted()) {
                this.commitAggregatedAccumulators(jobId, Collections.singletonList(new CommitAccumulator(aggregatedAccumulator.getJobVertexId(), name, aggregatedAccumulator.getAggregatedValue(), aggregatedAccumulator.getCommittedTasks())));
                currentJobAccumulators.remove(name);
            }
            if (currentJobAccumulators.isEmpty()) {
                this.perJobAggregatedAccumulators.remove(jobId);
            }
        }
    }

    @Override
    public <V, A extends Serializable> CompletableFuture<Accumulator<V, A>> queryPreAggregatedAccumulator(JobID jobId, String name) {
        Object object = this.queryLock;
        synchronized (object) {
            Object cachedQueryResult;
            CompletableFuture<Accumulator<V, A>> localQueryFuture = new CompletableFuture<Accumulator<V, A>>();
            Map<String, Object> currentJobCachedQueryResults = this.perJobCachedQueryResults.get(jobId);
            Object object2 = cachedQueryResult = currentJobCachedQueryResults == null ? null : currentJobCachedQueryResults.get(name);
            if (cachedQueryResult == null) {
                boolean hasQueriedJobMaster;
                boolean bl = hasQueriedJobMaster = this.perJobUnfulfilledUserQueryFutures.containsKey(jobId) && this.perJobUnfulfilledUserQueryFutures.get(jobId).containsKey(name);
                if (!hasQueriedJobMaster) {
                    JobManagerConnection connection = this.jobManagerTable.get(jobId);
                    if (connection == null) {
                        localQueryFuture.complete(null);
                        return localQueryFuture;
                    }
                    this.perJobUnfulfilledUserQueryFutures.computeIfAbsent(jobId, k -> new HashMap()).computeIfAbsent(name, k -> new ArrayList()).add(localQueryFuture);
                    CompletableFuture queryToJobMaster = connection.getJobManagerGateway().queryPreAggregatedAccumulator(name);
                    queryToJobMaster.whenComplete((accumulator, throwable) -> this.onAccumulatorQueryFinished(jobId, name, (Accumulator)accumulator, (Throwable)throwable));
                } else {
                    this.perJobUnfulfilledUserQueryFutures.get(jobId).get(name).add(localQueryFuture);
                }
                return localQueryFuture;
            }
            if (cachedQueryResult instanceof Accumulator) {
                localQueryFuture.complete((Accumulator)cachedQueryResult);
                return localQueryFuture;
            }
            if (cachedQueryResult instanceof Throwable) {
                localQueryFuture.completeExceptionally((Throwable)cachedQueryResult);
                return localQueryFuture;
            }
            throw new IllegalStateException("The cached result should be either accumulator or throwable.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clearRegistrationForTask(JobID jobId, JobVertexID jobVertexId, int subtaskIndex) {
        Map<JobID, Map<String, AggregatedAccumulator>> map = this.perJobAggregatedAccumulators;
        synchronized (map) {
            Map<String, AggregatedAccumulator> currentJobAccumulators = this.perJobAggregatedAccumulators.get(jobId);
            if (currentJobAccumulators != null) {
                ArrayList<CommitAccumulator> commitAccumulators = new ArrayList<CommitAccumulator>();
                ArrayList<String> shouldRemove = new ArrayList<String>();
                for (Map.Entry<String, AggregatedAccumulator> entry : currentJobAccumulators.entrySet()) {
                    AggregatedAccumulator aggregatedAccumulator = entry.getValue();
                    if (!aggregatedAccumulator.getJobVertexId().equals((Object)jobVertexId)) continue;
                    aggregatedAccumulator.clearRegistrationForTask(subtaskIndex);
                    if (aggregatedAccumulator.isAllCommitted()) {
                        commitAccumulators.add(new CommitAccumulator(entry.getValue().getJobVertexId(), entry.getKey(), entry.getValue().getAggregatedValue(), entry.getValue().getCommittedTasks()));
                    }
                    if (!aggregatedAccumulator.isAllCommitted() && !aggregatedAccumulator.isEmpty()) continue;
                    shouldRemove.add(entry.getKey());
                }
                if (commitAccumulators.size() > 0) {
                    this.commitAggregatedAccumulators(jobId, commitAccumulators);
                }
                for (String name : shouldRemove) {
                    currentJobAccumulators.remove(name);
                }
                if (currentJobAccumulators.isEmpty()) {
                    this.perJobAggregatedAccumulators.remove(jobId);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void clearAccumulatorsForJob(JobID jobId) {
        Object object = this.perJobAggregatedAccumulators;
        synchronized (object) {
            this.perJobAggregatedAccumulators.computeIfPresent(jobId, (k, currentJobAccumulators) -> {
                currentJobAccumulators.clear();
                return null;
            });
        }
        object = this.queryLock;
        synchronized (object) {
            this.perJobUnfulfilledUserQueryFutures.computeIfPresent(jobId, (k, currentJobQueryFutures) -> {
                currentJobQueryFutures.clear();
                return null;
            });
            this.perJobCachedQueryResults.computeIfPresent(jobId, (k, currentJobCachedResults) -> {
                currentJobCachedResults.clear();
                return null;
            });
        }
    }

    @VisibleForTesting
    Map<JobID, Map<String, AggregatedAccumulator>> getPerJobAggregatedAccumulators() {
        return this.perJobAggregatedAccumulators;
    }

    @VisibleForTesting
    public Map<JobID, Map<String, List<CompletableFuture<Accumulator>>>> getPerJobUnfulfilledUserQueryFutures() {
        return this.perJobUnfulfilledUserQueryFutures;
    }

    @VisibleForTesting
    public Map<JobID, Map<String, Object>> getPerJobCachedQueryResults() {
        return this.perJobCachedQueryResults;
    }

    private void commitAggregatedAccumulators(JobID jobId, List<CommitAccumulator> accumulators) {
        assert (Thread.holdsLock(this.perJobAggregatedAccumulators));
        JobManagerConnection connection = this.jobManagerTable.get(jobId);
        if (connection != null) {
            connection.getJobManagerGateway().commitPreAggregatedAccumulator(accumulators);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <V, A extends Serializable> void onAccumulatorQueryFinished(JobID jobId, String name, Accumulator<V, A> accumulator, Throwable throwable) {
        Object object = this.queryLock;
        synchronized (object) {
            Preconditions.checkState((!this.perJobCachedQueryResults.containsKey(jobId) || !this.perJobCachedQueryResults.get(jobId).containsKey(name) ? 1 : 0) != 0, (Object)("The target accumulator " + name + " of job " + jobId + " should not be in the cached result list."));
            Preconditions.checkState((this.perJobUnfulfilledUserQueryFutures.containsKey(jobId) && this.perJobUnfulfilledUserQueryFutures.get(jobId).containsKey(name) ? 1 : 0) != 0, (Object)"The target accumulator should reside in the unfulfilled query map.");
            Map currentJobCacheQueryResults = this.perJobCachedQueryResults.computeIfAbsent(jobId, k -> new HashMap());
            List<CompletableFuture<Accumulator>> userQueries = this.perJobUnfulfilledUserQueryFutures.get(jobId).get(name);
            if (accumulator != null) {
                userQueries.forEach(userQuery -> userQuery.complete(accumulator));
                currentJobCacheQueryResults.put(name, accumulator);
            } else {
                userQueries.forEach(userQuery -> userQuery.completeExceptionally(throwable));
                currentJobCacheQueryResults.put(name, throwable);
            }
            this.perJobUnfulfilledUserQueryFutures.compute(jobId, (k, currentJobUserQueryFutures) -> {
                currentJobUserQueryFutures.remove(name);
                return currentJobUserQueryFutures.size() == 0 ? null : currentJobUserQueryFutures;
            });
        }
    }

    static final class AggregatedAccumulator {
        private final Set<Integer> registeredTasks = new HashSet<Integer>();
        private final Set<Integer> committedTasks = new HashSet<Integer>();
        private final JobVertexID jobVertexId;
        private Accumulator aggregatedValue;

        AggregatedAccumulator(JobVertexID jobVertexId) {
            this.jobVertexId = jobVertexId;
        }

        void registerForTask(JobVertexID jobVertexId, int subtaskIndex) {
            Preconditions.checkArgument((boolean)this.jobVertexId.equals((Object)jobVertexId), (Object)"The registered task belongs to different JobVertex with previous registered ones");
            Preconditions.checkState((!this.registeredTasks.contains(subtaskIndex) ? 1 : 0) != 0, (Object)"This task has already registered.");
            this.registeredTasks.add(subtaskIndex);
        }

        void commitForTask(JobVertexID jobVertexId, int subtaskIndex, Accumulator value) {
            Preconditions.checkArgument((boolean)this.jobVertexId.equals((Object)jobVertexId), (Object)"The registered task belongs to different JobVertex with previous registered ones");
            Preconditions.checkState((boolean)this.registeredTasks.contains(subtaskIndex), (Object)"Can not commit for an accumulator that has not been registered before");
            if (this.aggregatedValue == null) {
                this.aggregatedValue = value.clone();
            } else {
                this.aggregatedValue.merge(value);
            }
            this.committedTasks.add(subtaskIndex);
        }

        void clearRegistrationForTask(int subtaskIndex) {
            if (this.registeredTasks.contains(subtaskIndex) && !this.committedTasks.contains(subtaskIndex)) {
                this.registeredTasks.remove(subtaskIndex);
            }
        }

        boolean isEmpty() {
            return this.registeredTasks.size() == 0;
        }

        boolean isAllCommitted() {
            return this.registeredTasks.size() > 0 && this.registeredTasks.size() == this.committedTasks.size();
        }

        Accumulator getAggregatedValue() {
            return this.aggregatedValue;
        }

        JobVertexID getJobVertexId() {
            return this.jobVertexId;
        }

        Set<Integer> getCommittedTasks() {
            return this.committedTasks;
        }

        @VisibleForTesting
        Set<Integer> getRegisteredTasks() {
            return this.registeredTasks;
        }
    }
}

