package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.class */
public class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
    private final ArrayDeque<NetworkSequenceViewReader> availableReaders = new ArrayDeque<>();
    private final ConcurrentMap<InputChannelID, NetworkSequenceViewReader> allReaders = new ConcurrentHashMap();
    private final Set<InputChannelID> released = Sets.newHashSet();
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.class */
    public class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
        private final NetworkSequenceViewReader reader;

        public WriteAndFlushNextMessageIfPossibleListener(NetworkSequenceViewReader networkSequenceViewReader) {
            this.reader = networkSequenceViewReader;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            try {
                if (!this.reader.isRegisteredAsAvailable() && this.reader.isAvailable()) {
                    PartitionRequestQueue.this.enqueueAvailableReader(this.reader);
                }
                if (channelFuture.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(channelFuture.channel());
                } else if (channelFuture.cause() != null) {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), channelFuture.cause());
                } else {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            } catch (Throwable th) {
                PartitionRequestQueue.this.handleException(channelFuture.channel(), th);
            }
        }
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ctx == null) {
            this.ctx = channelHandlerContext;
        }
        super.channelRegistered(channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyReaderNonEmpty(final NetworkSequenceViewReader networkSequenceViewReader) {
        this.ctx.executor().execute(new Runnable() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.1
            @Override // java.lang.Runnable
            public void run() {
                PartitionRequestQueue.this.ctx.pipeline().fireUserEventTriggered(networkSequenceViewReader);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueAvailableReader(NetworkSequenceViewReader networkSequenceViewReader) throws Exception {
        if (networkSequenceViewReader.isRegisteredAsAvailable() || !networkSequenceViewReader.isAvailable()) {
            return;
        }
        boolean isEmpty = this.availableReaders.isEmpty();
        registerAvailableReader(networkSequenceViewReader);
        if (isEmpty) {
            writeAndFlushNextMessageIfPossible(this.ctx.channel());
        }
    }

    @VisibleForTesting
    ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
        return this.availableReaders;
    }

    @VisibleForTesting
    Collection<NetworkSequenceViewReader> getAllReaders() {
        return this.allReaders.values();
    }

    @VisibleForTesting
    ChannelHandlerContext getChannelHandlerContext() {
        return this.ctx;
    }

    public void notifyReaderCreated(NetworkSequenceViewReader networkSequenceViewReader) {
        this.allReaders.put(networkSequenceViewReader.getReceiverId(), networkSequenceViewReader);
    }

    public void cancel(InputChannelID inputChannelID) {
        this.ctx.pipeline().fireUserEventTriggered(inputChannelID);
    }

    public void close() throws IOException {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
        releaseAllResources();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addCredit(InputChannelID inputChannelID, int i) throws Exception {
        if (this.fatalError) {
            return;
        }
        NetworkSequenceViewReader networkSequenceViewReader = this.allReaders.get(inputChannelID);
        if (networkSequenceViewReader == null) {
            throw new IllegalStateException("No reader for receiverId = " + inputChannelID + " exists.");
        }
        networkSequenceViewReader.addCredit(i);
        enqueueAvailableReader(networkSequenceViewReader);
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof NetworkSequenceViewReader) {
            enqueueAvailableReader((NetworkSequenceViewReader) obj);
            return;
        }
        if (obj.getClass() != InputChannelID.class) {
            channelHandlerContext.fireUserEventTriggered(obj);
            return;
        }
        InputChannelID inputChannelID = (InputChannelID) obj;
        if (this.released.contains(inputChannelID)) {
            return;
        }
        markAsReleased(inputChannelID);
        NetworkSequenceViewReader remove = this.allReaders.remove(inputChannelID);
        if (remove != null) {
            remove.releaseAllResources(new RemoteTransportException("Canceled by remote input channel " + inputChannelID, channelHandlerContext.channel().remoteAddress()));
        } else {
            LOG.warn("The view for this receiver {} has not been created or has been canceled before!", inputChannelID);
        }
        int size = this.availableReaders.size();
        for (int i = 0; i < size; i++) {
            NetworkSequenceViewReader pollAvailableReader = pollAvailableReader();
            if (!pollAvailableReader.getReceiverId().equals(inputChannelID)) {
                registerAvailableReader(pollAvailableReader);
            }
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeAndFlushNextMessageIfPossible(Channel channel) throws IOException {
        if (this.fatalError || !channel.isWritable()) {
            return;
        }
        InputChannel.BufferAndAvailability bufferAndAvailability = null;
        while (true) {
            try {
                NetworkSequenceViewReader pollAvailableReader = pollAvailableReader();
                if (pollAvailableReader == null) {
                    return;
                }
                try {
                    bufferAndAvailability = pollAvailableReader.getNextBuffer();
                } catch (Exception e) {
                    LOG.warn("Getting next buffer from read view failed", e);
                    markAsReleased(pollAvailableReader.getReceiverId());
                    pollAvailableReader.releaseAllResources(e);
                    this.ctx.writeAndFlush(new NettyMessage.ErrorResponse(e, pollAvailableReader.getReceiverId()));
                }
                if (bufferAndAvailability != null) {
                    if (bufferAndAvailability.moreAvailable()) {
                        registerAvailableReader(pollAvailableReader);
                    }
                    channel.writeAndFlush(new NettyMessage.BufferResponse(bufferAndAvailability.buffer(), pollAvailableReader.getSequenceNumber(), pollAvailableReader.getReceiverId(), bufferAndAvailability.buffersInBacklog())).addListener(new WriteAndFlushNextMessageIfPossibleListener(pollAvailableReader));
                    return;
                }
            } catch (Throwable th) {
                if (bufferAndAvailability != null) {
                    bufferAndAvailability.buffer().recycleBuffer();
                }
                throw new IOException(th.getMessage(), th);
            }
        }
    }

    private void registerAvailableReader(NetworkSequenceViewReader networkSequenceViewReader) {
        this.availableReaders.add(networkSequenceViewReader);
        networkSequenceViewReader.setRegisteredAsAvailable(true);
    }

    @Nullable
    private NetworkSequenceViewReader pollAvailableReader() {
        NetworkSequenceViewReader poll = this.availableReaders.poll();
        if (poll != null) {
            poll.setRegisteredAsAvailable(false);
        }
        return poll;
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.allReaders.isEmpty()) {
            releaseAllResources();
        } else {
            releaseAllResources(new RemoteTransportException("Connection unexpected closed by consumer", channelHandlerContext.channel().remoteAddress()));
        }
        channelHandlerContext.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        handleException(channelHandlerContext.channel(), th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Channel channel, Throwable th) throws IOException {
        LOG.error("Encountered error while consuming partitions", th);
        this.fatalError = true;
        releaseAllResources(th);
        if (channel.isActive()) {
            channel.writeAndFlush(new NettyMessage.ErrorResponse(th)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        releaseAllResources(null);
    }

    private void releaseAllResources(@Nullable Throwable th) throws IOException {
        for (NetworkSequenceViewReader networkSequenceViewReader : this.allReaders.values()) {
            networkSequenceViewReader.releaseAllResources(th);
            markAsReleased(networkSequenceViewReader.getReceiverId());
        }
        this.availableReaders.clear();
        this.allReaders.clear();
    }

    private void markAsReleased(InputChannelID inputChannelID) {
        LOG.debug("Mark {} as released", inputChannelID);
        this.released.add(inputChannelID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isMarkedReleased(InputChannelID inputChannelID) {
        return this.released.contains(inputChannelID);
    }
}
