/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IORequest;
import org.apache.flink.runtime.io.disk.iomanager.IORequestScheduler;
import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.util.Preconditions;

public abstract class AsynchronousFileIOChannel<T, R extends IORequest>
extends AbstractFileIOChannel {
    private final Object listenerLock = new Object();
    protected final Object closeLock = new Object();
    protected final IORequestScheduler<R> requestScheduler;
    protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
    protected final RequestDoneCallback<T> resultHandler;
    protected volatile IOException exception;
    protected volatile boolean closed;
    private NotificationListener allRequestsProcessedListener;

    protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, IORequestScheduler<R> requestScheduler, RequestDoneCallback<T> callback, boolean writeEnabled) throws IOException {
        super(channelID, writeEnabled);
        this.requestScheduler = (IORequestScheduler)Preconditions.checkNotNull(requestScheduler);
        this.resultHandler = (RequestDoneCallback)Preconditions.checkNotNull(callback);
    }

    @Override
    public boolean isClosed() {
        return this.closed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                while (this.requestsNotReturned.get() > 0) {
                    try {
                        this.closeLock.wait(1000L);
                        this.checkErroneous();
                    }
                    catch (InterruptedException iex) {
                        throw new IOException("Closing of asynchronous file channel was interrupted.");
                    }
                }
                this.checkErroneous();
            }
            finally {
                super.close();
            }
        }
    }

    @Override
    public void closeAndDelete() throws IOException {
        try {
            this.close();
        }
        finally {
            this.deleteChannel();
        }
    }

    public final void checkErroneous() throws IOException {
        if (this.exception != null) {
            throw this.exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void handleProcessedBuffer(T buffer, IOException ex) {
        if (ex != null) {
            LOG.error("Process buffer request failed.", (Throwable)ex);
        }
        if (buffer == null) {
            return;
        }
        try {
            if (ex != null && this.exception == null) {
                this.exception = ex;
                this.resultHandler.requestFailed(buffer, ex);
            } else {
                this.resultHandler.requestSuccessful(buffer);
            }
        }
        finally {
            NotificationListener listener = null;
            Object object = this.closeLock;
            synchronized (object) {
                if (this.requestsNotReturned.decrementAndGet() == 0) {
                    if (this.closed) {
                        this.closeLock.notifyAll();
                    }
                    Object object2 = this.listenerLock;
                    synchronized (object2) {
                        listener = this.allRequestsProcessedListener;
                        this.allRequestsProcessedListener = null;
                    }
                }
            }
            if (listener != null) {
                listener.onNotification();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void addRequest(R request) throws IOException {
        this.checkErroneous();
        this.requestsNotReturned.incrementAndGet();
        if (this.closed || this.requestScheduler.isClosed()) {
            NotificationListener listener;
            this.requestsNotReturned.decrementAndGet();
            Object object = this.listenerLock;
            synchronized (object) {
                listener = this.allRequestsProcessedListener;
                this.allRequestsProcessedListener = null;
            }
            if (listener != null) {
                listener.onNotification();
            }
            throw new IOException("I/O channel already closed (" + this.closed + "," + this.requestScheduler.isClosed() + "). Could not fulfill: " + request);
        }
        this.requestScheduler.addRequest(this.id, request);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException {
        Preconditions.checkNotNull((Object)listener);
        Object object = this.listenerLock;
        synchronized (object) {
            if (this.allRequestsProcessedListener == null) {
                if (this.requestsNotReturned.get() == 0) {
                    return false;
                }
                this.allRequestsProcessedListener = listener;
                return true;
            }
        }
        throw new IllegalStateException("Already subscribed.");
    }
}

