/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.net.ProtocolException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NetworkBufferAllocator;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZeroCopyNettyMessageDecoder
extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(ZeroCopyNettyMessageDecoder.class);
    private static final int INITIAL_MESSAGE_HEADER_BUFFER_LENGTH = 128;
    private final NetworkBufferAllocator networkBufferAllocator;
    private final ByteBuf frameHeaderBuffer;
    private final ByteBuf messageHeaderBuffer;
    private int remainingMessageHeaderToCopy = -1;
    private byte msgId = (byte)-1;
    private NettyMessage currentNettyMessage;
    private int remainingBufferSize = -1;

    ZeroCopyNettyMessageDecoder(NetworkBufferAllocator networkBufferAllocator) {
        this.networkBufferAllocator = networkBufferAllocator;
        this.frameHeaderBuffer = Unpooled.directBuffer((int)9, (int)9);
        this.messageHeaderBuffer = Unpooled.directBuffer((int)128);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof ByteBuf)) {
            ctx.fireChannelRead(msg);
            return;
        }
        ByteBuf data = (ByteBuf)msg;
        try {
            while (data.readableBytes() > 0) {
                boolean dataBufferAllReceived;
                if (this.frameHeaderBuffer.isWritable()) {
                    this.copyToTargetBuffer(data, this.frameHeaderBuffer, this.frameHeaderBuffer.writableBytes());
                    if (this.frameHeaderBuffer.isWritable()) break;
                    this.decodeFrameHeader();
                }
                if (this.remainingMessageHeaderToCopy > 0 || this.remainingMessageHeaderToCopy == 0 && this.messageHeaderBuffer.writerIndex() == 0) {
                    int messageHeaderReceivedSize = this.copyToTargetBuffer(data, this.messageHeaderBuffer, this.remainingMessageHeaderToCopy);
                    this.remainingMessageHeaderToCopy -= messageHeaderReceivedSize;
                    if (this.remainingMessageHeaderToCopy != 0) break;
                    this.currentNettyMessage = this.decodeNettyMessage();
                    if (this.msgId != 0) {
                        ctx.fireChannelRead((Object)this.currentNettyMessage);
                        this.clearState();
                        continue;
                    }
                }
                if (!(dataBufferAllReceived = this.readOrDiscardBufferResponse(data))) continue;
                ctx.fireChannelRead((Object)this.currentNettyMessage);
                this.clearState();
            }
            Preconditions.checkState((!data.isReadable() ? 1 : 0) != 0, (Object)"Not all data of the received buffer consumed.");
        }
        finally {
            data.release();
        }
    }

    private void decodeFrameHeader() {
        int messageLength = this.frameHeaderBuffer.readInt();
        Preconditions.checkState((messageLength >= 0 ? 1 : 0) != 0, (Object)"The length field of current message must be non-negative");
        int magicNumber = this.frameHeaderBuffer.readInt();
        Preconditions.checkState((magicNumber == -1159983106 ? 1 : 0) != 0, (Object)"Network stream corrupted: received incorrect magic number.");
        this.msgId = this.frameHeaderBuffer.readByte();
        this.remainingMessageHeaderToCopy = this.msgId != 0 ? messageLength - 9 : 29;
        if (this.messageHeaderBuffer.capacity() < this.remainingMessageHeaderToCopy) {
            this.messageHeaderBuffer.capacity(this.remainingMessageHeaderToCopy);
        }
    }

    private NettyMessage decodeNettyMessage() throws Exception {
        NettyMessage decodedMsg;
        switch (this.msgId) {
            case 0: {
                Preconditions.checkState((this.networkBufferAllocator != null ? 1 : 0) != 0, (Object)"buffer allocator is required to decode BufferResponse");
                decodedMsg = NettyMessage.BufferResponse.readFrom(this.messageHeaderBuffer, this.networkBufferAllocator);
                break;
            }
            case 2: {
                decodedMsg = NettyMessage.PartitionRequest.readFrom(this.messageHeaderBuffer);
                break;
            }
            case 3: {
                decodedMsg = NettyMessage.TaskEventRequest.readFrom(this.messageHeaderBuffer, ((Object)((Object)this)).getClass().getClassLoader());
                break;
            }
            case 1: {
                decodedMsg = NettyMessage.ErrorResponse.readFrom(this.messageHeaderBuffer);
                break;
            }
            case 4: {
                decodedMsg = NettyMessage.CancelPartitionRequest.readFrom(this.messageHeaderBuffer);
                break;
            }
            case 5: {
                decodedMsg = NettyMessage.CloseRequest.readFrom(this.messageHeaderBuffer);
                break;
            }
            case 6: {
                decodedMsg = NettyMessage.AddCredit.readFrom(this.messageHeaderBuffer);
                break;
            }
            default: {
                throw new ProtocolException("Received unknown message from producer: " + this.messageHeaderBuffer);
            }
        }
        return decodedMsg;
    }

    private boolean readOrDiscardBufferResponse(ByteBuf data) {
        Preconditions.checkState((this.currentNettyMessage != null && this.currentNettyMessage instanceof NettyMessage.BufferResponse ? 1 : 0) != 0);
        NettyMessage.BufferResponse bufferResponse = (NettyMessage.BufferResponse)this.currentNettyMessage;
        if (bufferResponse.dataBufferSize == 0) {
            return true;
        }
        ByteBuf dataBuffer = bufferResponse.getBuffer();
        if (this.remainingBufferSize < 0) {
            this.remainingBufferSize = bufferResponse.dataBufferSize;
        }
        if (dataBuffer != null) {
            this.remainingBufferSize -= this.copyToTargetBuffer(data, dataBuffer, this.remainingBufferSize);
        } else {
            int actualBytesToDiscard = Math.min(data.readableBytes(), this.remainingBufferSize);
            data.readerIndex(data.readerIndex() + actualBytesToDiscard);
            this.remainingBufferSize -= actualBytesToDiscard;
        }
        return this.remainingBufferSize == 0;
    }

    private void clearState() {
        this.frameHeaderBuffer.clear();
        this.messageHeaderBuffer.clear();
        this.remainingMessageHeaderToCopy = -1;
        this.msgId = (byte)-1;
        this.currentNettyMessage = null;
        this.remainingBufferSize = -1;
    }

    private int copyToTargetBuffer(ByteBuf src, ByteBuf dest, int maxCopySize) {
        int copyLength = Math.min(src.readableBytes(), maxCopySize);
        Preconditions.checkState((dest.writableBytes() >= copyLength ? 1 : 0) != 0, (Object)("There is not enough space to copy " + copyLength + " bytes, writable = " + dest.writableBytes()));
        dest.writeBytes(src, copyLength);
        return copyLength;
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (this.currentNettyMessage != null && this.currentNettyMessage instanceof NettyMessage.BufferResponse && ((NettyMessage.BufferResponse)this.currentNettyMessage).getBuffer() != null) {
            LOG.info("Channel get inactive and the buffer of currentNettyMessage {} is released", (Object)this.currentNettyMessage);
            ((NettyMessage.BufferResponse)this.currentNettyMessage).getBuffer().release();
        }
        this.currentNettyMessage = null;
        this.frameHeaderBuffer.release();
        this.messageHeaderBuffer.release();
    }

    @VisibleForTesting
    public NettyMessage getCurrentNettyMessage() {
        return this.currentNettyMessage;
    }
}

