/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.lindorm.client.core.ipc;

import com.alibaba.hbase.net.jpountz.lz4.LZ4Factory;
import com.alibaba.lindorm.client.AsyncCallback;
import com.alibaba.lindorm.client.core.auth.AuthenticationPassport;
import com.alibaba.lindorm.client.core.ipc.ConnectionHeader;
import com.alibaba.lindorm.client.core.ipc.EventLoopGroupShutdownThread;
import com.alibaba.lindorm.client.core.ipc.Invocation;
import com.alibaba.lindorm.client.core.ipc.LindormClientProtocol;
import com.alibaba.lindorm.client.core.ipc.OperationContext;
import com.alibaba.lindorm.client.core.ipc.Request;
import com.alibaba.lindorm.client.core.ipc.ResponseFlag;
import com.alibaba.lindorm.client.core.ipc.RpcClient;
import com.alibaba.lindorm.client.core.ipc.RpcOptionalParams;
import com.alibaba.lindorm.client.core.ipc.Status;
import com.alibaba.lindorm.client.core.utils.StringUtils;
import com.alibaba.lindorm.client.core.utils.Version;
import com.alibaba.lindorm.client.core.utils.WritableUtils;
import com.alibaba.lindorm.client.exception.ConnectionResetException;
import com.alibaba.lindorm.client.exception.DoNotRetryIOException;
import com.alibaba.lindorm.client.exception.LDRemoteException;
import com.alibaba.lindorm.thirdparty.netty.buffer.ByteBuf;
import com.alibaba.lindorm.thirdparty.netty.buffer.ByteBufInputStream;
import com.alibaba.lindorm.thirdparty.netty.buffer.ByteBufOutputStream;
import com.alibaba.lindorm.thirdparty.netty.buffer.Unpooled;
import com.alibaba.lindorm.thirdparty.netty.channel.Channel;
import com.alibaba.lindorm.thirdparty.netty.channel.ChannelFuture;
import com.alibaba.lindorm.thirdparty.netty.channel.ChannelHandlerContext;
import com.alibaba.lindorm.thirdparty.netty.channel.EventLoopGroup;
import com.alibaba.lindorm.thirdparty.netty.handler.codec.ReplayingDecoder;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Connection
extends ReplayingDecoder<Void> {
    private static final Log LOG = LogFactory.getLog((String)Connection.class.getName());
    public static final byte[] HEADER = new byte[]{104, 114, 112, 99, 6};
    public static int RESPHEADERLENGTH = 13;
    private volatile Channel chan;
    final AtomicInteger requestID = new AtomicInteger(-1);
    private RpcClient.ConnectionId myConnectionId;
    private Throwable lastException = null;
    private volatile boolean dead = false;
    private final ConcurrentHashMap<Integer, Request> requestInFlight = new ConcurrentHashMap();
    private volatile int inFlightWriteRequest = 0;
    private volatile int inFlightReadRequest = 0;
    private final AtomicLong lastCallBackTime = new AtomicLong(0L);
    private ArrayList<Request> requestPending = new ArrayList();
    private RpcClient rpcClient;
    private volatile boolean pendingLimitReached = false;
    private volatile ChannelFuture connectFuture;
    private static final LZ4Factory factory = LZ4Factory.fastestJavaInstance();
    private EventLoopGroup eventLoopGroup = null;
    private long connectionStartTs;
    private AtomicBoolean isCleaned = new AtomicBoolean(false);

    public void setConnectFuture(ChannelFuture future) {
        this.connectFuture = future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitOnThrottle(long timeout) throws InterruptedException {
        if (timeout > 0L) {
            Connection connection = this;
            synchronized (connection) {
                this.wait(timeout);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyWaiting() {
        Connection connection = this;
        synchronized (connection) {
            this.notifyAll();
        }
    }

    public String getHostAndPort() {
        return this.myConnectionId.getAddress().getHostAndPort();
    }

    public RpcClient.ConnectionId getMyConnectionId() {
        return this.myConnectionId;
    }

    public Connection(RpcClient rpcClient, RpcClient.ConnectionId myConnectionId, EventLoopGroup group) {
        this.rpcClient = rpcClient;
        this.myConnectionId = myConnectionId;
        this.eventLoopGroup = group;
        this.connectionStartTs = System.currentTimeMillis();
    }

    public RpcClient getRpcClient() {
        return this.rpcClient;
    }

    public boolean isWritable() {
        if (this.rpcClient.requestInFlightLimit > 0 && this.requestInFlight.size() > this.rpcClient.requestInFlightLimit) {
            return false;
        }
        if (this.pendingLimitReached) {
            return false;
        }
        return this.chan == null || this.chan.isWritable();
    }

    public boolean isBlocked(OperationContext context) {
        if (context == null) {
            return false;
        }
        if (System.currentTimeMillis() - this.lastCallBackTime.get() <= (long)this.rpcClient.connectionBlockTime) {
            return false;
        }
        OperationContext.OperationType type = context.getOperationType();
        if (OperationContext.isUserReadOperation(type)) {
            return this.rpcClient.readRequestInFlightLimit > 0 && this.inFlightReadRequest > this.rpcClient.readRequestInFlightLimit;
        }
        if (OperationContext.isUserWriteOperation(type)) {
            return this.rpcClient.writeRequestInFlightLimit > 0 && this.inFlightWriteRequest > this.rpcClient.writeRequestInFlightLimit;
        }
        return false;
    }

    public boolean isWriteDisabled() {
        return this.chan != null && this.chan.unsafe().outboundBuffer() == null;
    }

    private static void ensureReadable(ByteBuf buf, int nBytes) {
        buf.markReaderIndex();
        buf.skipBytes(nBytes);
        buf.resetReaderIndex();
    }

    IOException deserializeException(DataInput buf, Request request) throws IOException {
        String type = WritableUtils.readString(buf);
        String msg = WritableUtils.readString(buf);
        LDRemoteException exception = new LDRemoteException(type, msg, this.getHostAndPort());
        if (this.myConnectionId.getProtocol().isAssignableFrom(LindormClientProtocol.class)) {
            return exception;
        }
        return exception.unwrapRemoteException();
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ByteBuf buf = in;
        int id = buf.readInt();
        Request request = this.requestInFlight.get(id);
        Object decoded = this.deserialize(in, request);
        if (request == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Skipped timed out RPC ID " + id + " with an response on client " + this));
            }
        } else {
            assert (request.getId() == id);
            long lastTs = this.lastCallBackTime.get();
            long now = System.currentTimeMillis();
            if (now > lastTs) {
                this.lastCallBackTime.compareAndSet(lastTs, now);
            }
            this.removeRpc(request);
            request.callback(decoded);
        }
    }

    private void readOptionalParam(Request request, DataInput in) throws IOException {
        RpcOptionalParams options = new RpcOptionalParams();
        options.readFrom(in);
        if (request != null) {
            Integer executionTime;
            Long receiveTs = options.getServerReceiveTimestamp();
            if (receiveTs != null && receiveTs > 0L) {
                request.setServerReceiveTimestamp(receiveTs);
            }
            if ((executionTime = options.getServerExceutionTime()) != null && executionTime >= 0) {
                request.setServerExecutionTime(executionTime);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.lastException = cause;
        if (cause instanceof RejectedExecutionException) {
            LOG.warn((Object)("RPC rejected by the executor, ignore this if we're shutting down" + this), cause);
        } else {
            LOG.error((Object)("Receive exception from downstream on " + this), cause);
        }
        this.close("exception caught " + StringUtils.stringifyException(cause));
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.cleanup();
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel chan = ctx.channel();
        if (System.currentTimeMillis() - this.connectionStartTs > 1000L && LOG.isInfoEnabled()) {
            LOG.info((Object)("Spend too long " + (System.currentTimeMillis() - this.connectionStartTs) + " ms for the connection to become active " + this.myConnectionId.getAddress()));
        }
        this.sendConnectionHeader(chan);
        super.channelActive(ctx);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isWritable()) {
            this.notifyWaiting();
        }
        super.channelWritabilityChanged(ctx);
    }

    private Object deserialize(ByteBuf buf, Request request) throws IOException {
        Object ret;
        byte flag = buf.readByte();
        boolean isError = ResponseFlag.isError(flag);
        int length = 0;
        if (!ResponseFlag.isLength(flag)) {
            throw new IOException("No length info found when processing" + (request == null ? "null " : request) + "flag" + flag);
        }
        length = buf.readInt() - RESPHEADERLENGTH;
        int state = buf.readInt();
        if (!Status.isValidState(state)) {
            throw new IOException("Got invalid state " + state + ", expected " + Arrays.asList(Status.values()) + ". Close connection as inputstream becomes incomplete");
        }
        Connection.ensureReadable(buf, length);
        ByteBufInputStream in = new ByteBufInputStream(buf, length);
        int position = buf.readerIndex();
        if (isError) {
            IOException exception = this.deserializeException(in, request);
            this.readOptionalParam(request, in);
            ret = exception;
        } else {
            if (ResponseFlag.isCompress(flag)) {
                int uncompressedSize = in.readInt();
                int compressedSize = in.readInt();
                byte[] compressedBuffer = new byte[compressedSize];
                in.read(compressedBuffer, 0, compressedSize);
                byte[] uncompressedBuffer = factory.fastDecompressor().decompress(compressedBuffer, uncompressedSize);
                ByteArrayInputStream bais = new ByteArrayInputStream(uncompressedBuffer);
                DataInputStream dis = new DataInputStream(bais);
                ret = this.rpcClient.getSerializer().deserialize(dis);
            } else {
                ret = this.rpcClient.getSerializer().deserialize(in);
            }
            this.readOptionalParam(request, in);
        }
        int readLength = buf.readerIndex() - position;
        if (readLength != length) {
            throw new IOException("deserialize size:" + readLength + " small than response length:" + length);
        }
        return ret;
    }

    private ByteBuf serialize(Request request) {
        ByteBuf data = null;
        try {
            request.setId(this.requestID.incrementAndGet());
            Invocation invocation = request.getInvocation();
            ByteBufOutputStream d = new ByteBufOutputStream(Unpooled.compositeBuffer(4096));
            d.writeInt(-559038737);
            d.writeInt(request.getId());
            invocation.writeTo(d);
            request.setClientSendTime(System.currentTimeMillis());
            request.getOptionalParams().writeTo(d);
            data = d.buffer();
            int dataLength = data.readableBytes() - 4;
            data.setInt(0, dataLength);
            Request oldRequest = this.requestInFlight.put(request.getId(), request);
            if (request.isRead()) {
                ++this.inFlightReadRequest;
            } else if (request.isWrite()) {
                ++this.inFlightWriteRequest;
            }
            if (oldRequest != null) {
                String error = "There was already an request in flight with requestID=" + this.requestID + ": " + oldRequest + ".  This happened when sending out: " + request;
                LOG.error((Object)(error + "connection: " + this));
                oldRequest.callback(new DoNotRetryIOException(error));
                if (oldRequest.isRead()) {
                    --this.inFlightReadRequest;
                } else if (oldRequest.isWrite()) {
                    --this.inFlightWriteRequest;
                }
            }
            return data;
        }
        catch (Throwable ie) {
            LOG.error((Object)("Uncaught exception while serializing : " + request + " connection=" + this), ie);
            request.callback(ie);
            return null;
        }
    }

    Request removeRpc(Request toRemoveRequest) {
        int id = toRemoveRequest.getId();
        Request alreadyRemovedRequest = this.requestInFlight.remove(id);
        if (alreadyRemovedRequest != null) {
            this.notifyWaiting();
            if (alreadyRemovedRequest.isRead()) {
                --this.inFlightReadRequest;
            } else if (alreadyRemovedRequest.isWrite()) {
                --this.inFlightWriteRequest;
            }
        }
        if (alreadyRemovedRequest != toRemoveRequest && alreadyRemovedRequest != null) {
            LOG.warn((Object)("Removed the wrong RPC " + alreadyRemovedRequest + " when we meant to remove " + toRemoveRequest + this));
            alreadyRemovedRequest.callback(new DoNotRetryIOException("Removed the wrong RPC from connection " + this));
        }
        return alreadyRemovedRequest;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendRequest(Request request) {
        if (this.chan != null) {
            ByteBuf serialized = this.serialize(request);
            if (serialized == null) {
                this.removeRpc(request);
                request.callback(new IOException("serialized error for " + request));
                return;
            }
            if (this.chan != null) {
                this.chan.writeAndFlush(serialized);
                return;
            }
            Request removedRequest = this.removeRpc(request);
            if (removedRequest != null) {
                request.callback(new ConnectionResetException(this.myConnectionId.getAddress(), this + "is closed"));
            }
            return;
        }
        boolean tryagain = false;
        Connection connection = this;
        synchronized (connection) {
            if (this.chan != null) {
                tryagain = true;
            } else if (!this.dead) {
                this.requestPending.add(request);
                if (this.rpcClient.requestPendingLimit > 0 && this.requestPending.size() > this.rpcClient.requestPendingLimit) {
                    this.pendingLimitReached = true;
                }
                return;
            }
        }
        if (this.dead) {
            request.callback(new ConnectionResetException(this.myConnectionId.getAddress(), this + "is closed"));
            return;
        }
        if (tryagain) {
            this.sendRequest(request);
            return;
        }
        LOG.error((Object)("Impossible state for " + this));
    }

    ByteBuf getConnectionHeader() throws IOException {
        ByteBuf header = Unpooled.buffer();
        header.writeBytes(HEADER);
        ByteBufOutputStream out = new ByteBufOutputStream(Unpooled.buffer());
        ConnectionHeader connectionHeader = new ConnectionHeader(this.myConnectionId.getProtocol().getName());
        connectionHeader.writeTo(out);
        ByteBuf connectionHeaderBuf = out.buffer();
        int bufLen = connectionHeaderBuf.readableBytes();
        header.writeInt(bufLen);
        header.writeBytes(connectionHeaderBuf);
        return header;
    }

    public void sendConnectionHeader(Channel channel) throws Exception {
        LOG.debug((Object)("Send connection header for " + this));
        ByteBuf header = this.getConnectionHeader();
        channel.writeAndFlush(header);
        Method getProtocolVersion = LindormClientProtocol.class.getDeclaredMethod("getProtocolVersion", String.class, Long.TYPE);
        Field version = this.myConnectionId.getProtocol().getDeclaredField("VERSION");
        Invocation invocation = new Invocation(LindormClientProtocol.class, getProtocolVersion, new Object[]{this.myConnectionId.getProtocol().getName(), version.get(null)}, this.rpcClient.getSerializer(), Integer.MAX_VALUE);
        Request request = new Request(this.getRpcClient(), invocation, new ProtocolVersionCallBack(channel), this.rpcClient.getConf().getInt("lindorm.rpc.connect.timeout", 3000));
        if (this.myConnectionId.getUserName() != null && this.myConnectionId.getPassword() != null) {
            AuthenticationPassport authPassport = AuthenticationPassport.create(this.myConnectionId.getUserName(), this.myConnectionId.getPassword());
            request.getOptionalParams().setAuthPassport(authPassport);
        }
        request.setWaitTime();
        request.getOptionalParams().setClientVersion(Version.getVersion());
        request.enqueueTimeout(this, this.rpcClient.getRpcTimeoutTimer());
        channel.writeAndFlush(this.serialize(request));
    }

    void becomeReady(Channel chan, long startTs) {
        if (LOG.isInfoEnabled()) {
            LOG.info((Object)("Connection ready for" + this + " after " + (System.currentTimeMillis() - startTs) + " ms"));
        }
        this.chan = chan;
        this.sendPendingRequests();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendPendingRequests() {
        ArrayList<Request> requests;
        Connection connection = this;
        synchronized (connection) {
            requests = this.requestPending;
            this.requestPending = new ArrayList();
        }
        if (requests != null) {
            for (Request request : requests) {
                this.sendRequest(request);
            }
        }
        this.pendingLimitReached = false;
        this.notifyWaiting();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanup() {
        ArrayList<Request> requests;
        if (!this.isCleaned.compareAndSet(false, true)) {
            LOG.warn((Object)("Clean up is already executed!" + this));
            return;
        }
        LOG.debug((Object)("Start to cleanup " + this));
        this.rpcClient.removeConnectionFromCache(this);
        ConnectionResetException exception = new ConnectionResetException(this.myConnectionId.getAddress(), this.toString());
        if (this.connectFuture != null) {
            this.connectFuture.awaitUninterruptibly();
            if (!this.connectFuture.isSuccess()) {
                exception.initCause(this.connectFuture.cause());
            }
            if (this.connectFuture.channel() != null) {
                this.connectFuture.channel().close();
            }
        }
        if (exception.getCause() == null && this.lastException != null) {
            exception.initCause(this.lastException);
        }
        Connection connection = this;
        synchronized (connection) {
            this.dead = true;
            this.chan = null;
            requests = this.requestPending;
            this.requestPending = new ArrayList();
        }
        this.failRequests(this.requestInFlight.values(), exception);
        if (requests != null) {
            this.failRequests(requests, exception);
        }
        if (this.eventLoopGroup != null) {
            try {
                EventLoopGroupShutdownThread closeGroupThread = new EventLoopGroupShutdownThread(this.eventLoopGroup);
                closeGroupThread.start();
                closeGroupThread.join();
                this.eventLoopGroup = null;
            }
            catch (Throwable t) {
                LOG.error((Object)("Error happened when closing " + this), t);
            }
        }
    }

    private void failRequests(Collection<Request> requests, ConnectionResetException exception) {
        for (Request request : requests) {
            int id = request.getId();
            if (id == -1) {
                request.callback(exception);
                continue;
            }
            Request rpcRequest = this.requestInFlight.remove(id);
            if (rpcRequest != null) {
                if (request.isRead()) {
                    --this.inFlightReadRequest;
                } else if (request.isWrite()) {
                    --this.inFlightWriteRequest;
                }
                request.callback(exception);
                continue;
            }
            LOG.error((Object)("connection=" + this + " is already removed for request=" + request));
        }
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append(" [Connection: " + this.hashCode());
        builder.append(", host=" + this.myConnectionId.getAddress().toString());
        builder.append(", chan=" + (this.chan == null ? "" : this.chan));
        builder.append(", isDead=" + this.dead);
        builder.append(", currentRpcId=" + this.requestID.get());
        builder.append(", requestPending=" + (this.requestPending == null ? 0 : this.requestPending.size()));
        builder.append(", rpcInFight=" + this.requestInFlight.size());
        builder.append("]");
        return builder.toString();
    }

    public String toSimpleString() {
        return "[Connection: " + this.getHostAndPort() + " ]";
    }

    public void close(String why) {
        LOG.info((Object)("close requested, " + this + ", because of " + why));
        Channel chancopy = this.chan;
        if (chancopy == null) {
            this.cleanup();
        } else {
            chancopy.close();
        }
    }

    private final class ProtocolVersionCallBack
    extends AsyncCallback<Long> {
        private Channel channel;
        private long startTs;

        public ProtocolVersionCallBack(Channel channel) {
            this.channel = channel;
            this.startTs = System.currentTimeMillis();
        }

        @Override
        public void onComplete(Long result) {
            Connection.this.becomeReady(this.channel, this.startTs);
        }

        @Override
        public void onError(Throwable exception) {
            Connection.this.lastException = exception;
            Connection.this.close("protocol version callback caught " + StringUtils.stringifyException(exception));
        }

        @Override
        public boolean isRetrying() {
            return false;
        }

        @Override
        public boolean shouldProcessResultInPool() {
            return false;
        }
    }

    public static class RPCProfiling {
        private int id;
        private volatile long waitSendTime;
        private volatile long transferRequestTime;
        private volatile long serverExecutionTime;
        private volatile long transferResponseTime;

        public RPCProfiling(int id, long waitSendTime, long transferRequestTime, long serverExecutionTime, long transferResponseTime) {
            this.id = id;
            this.waitSendTime = waitSendTime;
            this.transferRequestTime = transferRequestTime;
            this.serverExecutionTime = serverExecutionTime;
            this.transferResponseTime = transferResponseTime;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder(128);
            sb.append("{id=" + this.id);
            sb.append(",wait_send_ms=" + this.waitSendTime);
            if (this.transferRequestTime >= 0L) {
                sb.append(",transfer_request_ms=" + this.transferRequestTime);
            }
            if (this.serverExecutionTime >= 0L) {
                sb.append(",server_execution_ms=" + this.serverExecutionTime);
            }
            if (this.transferResponseTime >= 0L) {
                sb.append(",transfer_response_ms=" + this.transferResponseTime);
            }
            sb.append("}");
            return sb.toString();
        }
    }
}

