/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IORequest;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IORequestScheduler<E extends IORequest>
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(IORequestScheduler.class);
    private volatile boolean closed = false;
    private final ConcurrentHashMap<FileIOChannel.ID, RequestQueue<E>> requestQueues = new ConcurrentHashMap();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RequestQueue<E> nextRequestQueue() throws InterruptedException {
        if (this.requestQueues.isEmpty()) {
            IORequestScheduler iORequestScheduler = this;
            synchronized (iORequestScheduler) {
                if (this.requestQueues.isEmpty()) {
                    this.wait();
                }
            }
        }
        long maxNumRequests = -1L;
        FileIOChannel.ID channelID = null;
        for (Map.Entry<FileIOChannel.ID, RequestQueue<E>> entry : this.requestQueues.entrySet()) {
            if ((long)entry.getValue().size() <= maxNumRequests) continue;
            maxNumRequests = entry.getValue().size();
            channelID = entry.getKey();
        }
        if (maxNumRequests > 0L) {
            return this.requestQueues.get(channelID);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void requestQueueProcessed(RequestQueue<E> requestQueue) {
        if (requestQueue.isEmpty()) {
            IORequestScheduler iORequestScheduler = this;
            synchronized (iORequestScheduler) {
                if (requestQueue.isEmpty()) {
                    this.requestQueues.remove(requestQueue.getFileChannelID());
                    requestQueue.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public RequestQueue<E> getRequestQueue(FileIOChannel.ID channelID) {
        IORequestScheduler iORequestScheduler = this;
        synchronized (iORequestScheduler) {
            return this.requestQueues.get(channelID);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addRequest(FileIOChannel.ID channelID, E request) {
        IORequestScheduler iORequestScheduler = this;
        synchronized (iORequestScheduler) {
            boolean isEmpty = this.requestQueues.isEmpty();
            RequestQueue requestQueue = this.requestQueues.computeIfAbsent(channelID, key -> new RequestQueue((FileIOChannel.ID)key));
            requestQueue.add(request);
            if (isEmpty) {
                this.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        IORequestScheduler iORequestScheduler = this;
        synchronized (iORequestScheduler) {
            for (RequestQueue<E> requestQueue : this.requestQueues.values()) {
                requestQueue.close();
            }
            this.closed = true;
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutDown(IOException ioex) {
        IORequestScheduler iORequestScheduler = this;
        synchronized (iORequestScheduler) {
            for (RequestQueue<E> requestQueue : this.requestQueues.values()) {
                while (!requestQueue.isEmpty()) {
                    IORequest request = (IORequest)requestQueue.poll();
                    if (request == null) continue;
                    try {
                        request.requestDone(ioex);
                    }
                    catch (Throwable t) {
                        IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                    }
                }
            }
            this.requestQueues.clear();
        }
    }
}

