/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.endpoint;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.RequestCancelledException;
import com.couchbase.client.core.ResponseEvent;
import com.couchbase.client.core.ResponseHandler;
import com.couchbase.client.core.endpoint.AbstractEndpoint;
import com.couchbase.client.core.endpoint.DecodingState;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.CoreScheduler;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.DiagnosticRequest;
import com.couchbase.client.core.message.KeepAlive;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.metrics.NetworkLatencyMetricsIdentifier;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.utils.Observables;
import com.couchbase.client.deps.com.lmax.disruptor.EventSink;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.Channel;
import com.couchbase.client.deps.io.netty.channel.ChannelFuture;
import com.couchbase.client.deps.io.netty.channel.ChannelFutureListener;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.channel.ChannelPromise;
import com.couchbase.client.deps.io.netty.handler.codec.DecoderException;
import com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageCodec;
import com.couchbase.client.deps.io.netty.handler.codec.base64.Base64;
import com.couchbase.client.deps.io.netty.handler.codec.http.HttpRequest;
import com.couchbase.client.deps.io.netty.handler.timeout.IdleStateEvent;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.deps.io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLHandshakeException;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subjects.Subject;

public abstract class AbstractGenericHandler<RESPONSE, ENCODED, REQUEST extends CouchbaseRequest>
extends MessageToMessageCodec<RESPONSE, REQUEST> {
    protected static final Charset CHARSET = CharsetUtil.UTF_8;
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(AbstractGenericHandler.class);
    protected static final byte[] EMPTY_BYTES = new byte[0];
    private final EventSink<ResponseEvent> responseBuffer;
    private final AbstractEndpoint endpoint;
    private final Queue<REQUEST> sentRequestQueue;
    private final Queue<Long> sentRequestTimings;
    private final boolean isTransient;
    private final boolean traceEnabled;
    private final boolean moveResponseOut;
    private final Map<Class<? extends CouchbaseRequest>, String> classNameCache;
    private REQUEST currentRequest;
    private DecodingState currentDecodingState;
    private long currentOpTime = -1L;
    private String remoteHostname;
    private ChannelPromise connectFuture;
    private String remoteHttpHost;
    private final int sentQueueLimit;
    private final boolean pipeline;
    private volatile long keepAliveThreshold;
    private volatile ScheduledFuture<?> continuousKeepAliveFuture;

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, boolean isTransient, boolean pipeline) {
        this(endpoint, responseBuffer, new ArrayDeque(), isTransient, pipeline);
    }

    protected AbstractGenericHandler(AbstractEndpoint endpoint, EventSink<ResponseEvent> responseBuffer, Queue<REQUEST> queue, boolean isTransient, boolean pipeline) {
        this.pipeline = pipeline;
        this.endpoint = endpoint;
        this.responseBuffer = responseBuffer;
        this.sentRequestQueue = queue;
        this.currentDecodingState = DecodingState.INITIAL;
        this.isTransient = isTransient;
        this.traceEnabled = LOGGER.isTraceEnabled();
        this.sentRequestTimings = new ArrayDeque<Long>();
        this.classNameCache = new IdentityHashMap<Class<? extends CouchbaseRequest>, String>();
        this.moveResponseOut = this.env() == null || !this.env().callbacksOnIoPool();
        this.sentQueueLimit = Integer.parseInt(System.getProperty("com.couchbase.sentRequestQueueLimit", "5120"));
        this.keepAliveThreshold = 0L;
    }

    protected abstract ENCODED encodeRequest(ChannelHandlerContext var1, REQUEST var2) throws Exception;

    protected abstract CouchbaseResponse decodeResponse(ChannelHandlerContext var1, RESPONSE var2) throws Exception;

    protected abstract ServiceType serviceType();

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (!(this.pipeline || msg instanceof KeepAlive || this.sentRequestQueue.isEmpty() && this.currentDecodingState == DecodingState.INITIAL)) {
            if (this.traceEnabled) {
                LOGGER.trace("Rescheduling {} because pipelining disable and a request is in-flight.", msg);
            }
            RetryHelper.retryOrCancel(this.env(), (CouchbaseRequest)msg, this.responseBuffer);
            return;
        }
        if (this.sentRequestQueue.size() < this.sentQueueLimit) {
            super.write(ctx, msg, promise);
        } else {
            LOGGER.debug("Rescheduling {} because sentRequestQueueLimit reached.", msg);
            RetryHelper.retryOrCancel(this.env(), (CouchbaseRequest)msg, this.responseBuffer);
        }
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, REQUEST msg, List<Object> out) throws Exception {
        ENCODED request;
        try {
            request = this.encodeRequest(ctx, msg);
        }
        catch (Exception ex) {
            msg.observable().onError((Throwable)new RequestCancelledException("Error while encoding Request, cancelling.", ex));
            throw ex;
        }
        this.sentRequestQueue.offer(msg);
        out.add(request);
        this.sentRequestTimings.offer(System.nanoTime());
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, RESPONSE msg, List<Object> out) throws Exception {
        if (this.currentDecodingState == DecodingState.INITIAL) {
            this.initialDecodeTasks(ctx);
        }
        try {
            CouchbaseResponse response = this.decodeResponse(ctx, msg);
            if (response != null) {
                if (this.currentRequest instanceof DiagnosticRequest) {
                    ((DiagnosticRequest)this.currentRequest).localSocket(ctx.channel().localAddress());
                    ((DiagnosticRequest)this.currentRequest).remoteSocket(ctx.channel().remoteAddress());
                }
                this.publishResponse(response, this.currentRequest.observable());
                if (this.currentDecodingState == DecodingState.FINISHED) {
                    this.writeMetrics(response);
                    if (this.currentRequest instanceof KeepAlive) {
                        this.endpoint.setLastKeepAliveLatency(this.currentOpTime);
                    }
                }
            }
        }
        catch (CouchbaseException e) {
            Observables.failSafe(this.env().scheduler(), this.moveResponseOut, this.currentRequest.observable(), e);
        }
        catch (Exception e) {
            Observables.failSafe(this.env().scheduler(), this.moveResponseOut, this.currentRequest.observable(), new CouchbaseException(e));
        }
        if (this.currentDecodingState == DecodingState.FINISHED) {
            this.endpoint.notifyResponseDecoded(this.currentRequest instanceof KeepAlive);
            this.resetStatesAfterDecode(ctx);
        }
    }

    private void writeMetrics(CouchbaseResponse response) {
        if (this.currentRequest != null && this.currentOpTime >= 0L && this.env() != null && this.env().networkLatencyMetricsCollector().isEnabled()) {
            try {
                Class<?> requestClass = this.currentRequest.getClass();
                String simpleName = this.classNameCache.get(requestClass);
                if (simpleName == null) {
                    simpleName = requestClass.getSimpleName();
                    this.classNameCache.put(requestClass, simpleName);
                }
                NetworkLatencyMetricsIdentifier identifier = new NetworkLatencyMetricsIdentifier(this.remoteHostname, this.serviceType().toString(), simpleName, response.status().toString());
                this.env().networkLatencyMetricsCollector().record(identifier, this.currentOpTime);
            }
            catch (Throwable e) {
                LOGGER.warn("Could not collect latency metric for request {} ({})", RedactableArgument.user(this.currentRequest.toString()), this.currentOpTime, e);
            }
        }
    }

    private void resetStatesAfterDecode(ChannelHandlerContext ctx) {
        if (this.traceEnabled) {
            LOGGER.trace("{}Finished decoding of {}", (Object)AbstractGenericHandler.logIdent(ctx, this.endpoint), (Object)this.currentRequest);
        }
        this.currentRequest = null;
        this.currentDecodingState = DecodingState.INITIAL;
    }

    private void initialDecodeTasks(ChannelHandlerContext ctx) {
        this.currentRequest = (CouchbaseRequest)this.sentRequestQueue.poll();
        this.currentDecodingState = DecodingState.STARTED;
        if (this.currentRequest != null) {
            Long st = this.sentRequestTimings.poll();
            this.currentOpTime = st != null ? System.nanoTime() - st : -1L;
        }
        if (this.traceEnabled) {
            LOGGER.trace("{}Started decoding of {}", (Object)AbstractGenericHandler.logIdent(ctx, this.endpoint), (Object)this.currentRequest);
        }
    }

    protected void publishResponse(CouchbaseResponse response, Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        if (response.status() != ResponseStatus.RETRY && observable != null) {
            if (this.moveResponseOut) {
                Scheduler scheduler = this.env().scheduler();
                if (scheduler instanceof CoreScheduler) {
                    AbstractGenericHandler.scheduleDirect((CoreScheduler)scheduler, response, observable);
                } else {
                    AbstractGenericHandler.scheduleWorker(scheduler, response, observable);
                }
            } else {
                AbstractGenericHandler.completeResponse(response, observable);
            }
        } else {
            this.responseBuffer.publishEvent(ResponseHandler.RESPONSE_TRANSLATOR, response, observable);
        }
    }

    private static void completeResponse(CouchbaseResponse response, Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        try {
            observable.onNext((Object)response);
            observable.onCompleted();
        }
        catch (Exception ex) {
            LOGGER.warn("Caught exception while onNext on observable", ex);
            observable.onError((Throwable)ex);
        }
    }

    private static void scheduleDirect(CoreScheduler scheduler, final CouchbaseResponse response, final Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        scheduler.scheduleDirect(new Action0(){

            public void call() {
                AbstractGenericHandler.completeResponse(response, (Subject<CouchbaseResponse, CouchbaseResponse>)observable);
            }
        });
    }

    private static void scheduleWorker(Scheduler scheduler, final CouchbaseResponse response, final Subject<CouchbaseResponse, CouchbaseResponse> observable) {
        final Scheduler.Worker worker = scheduler.createWorker();
        worker.schedule(new Action0(){

            public void call() {
                try {
                    observable.onNext((Object)response);
                    observable.onCompleted();
                }
                catch (Exception ex) {
                    LOGGER.warn("Caught exception while onNext on observable", ex);
                    observable.onError((Throwable)ex);
                }
                finally {
                    worker.unsubscribe();
                }
            }
        });
    }

    protected void finishedDecoding() {
        this.currentDecodingState = DecodingState.FINISHED;
        if (this.isTransient) {
            this.endpoint.disconnect();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Inactive.");
        this.endpoint.notifyChannelInactive();
        ctx.fireChannelInactive();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Channel Active.");
        SocketAddress addr = ctx.channel().remoteAddress();
        this.remoteHostname = addr instanceof InetSocketAddress ? ((InetSocketAddress)addr).getAddress().getHostAddress() : addr.toString();
        this.channelActiveSideEffects(ctx);
        ctx.fireChannelActive();
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (!ctx.channel().isWritable()) {
            ctx.flush();
        }
        ctx.fireChannelWritabilityChanged();
    }

    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise future) throws Exception {
        this.connectFuture = future;
        ctx.connect(remoteAddress, localAddress, future);
    }

    private void channelActiveSideEffects(final ChannelHandlerContext ctx) {
        long interval = this.env().keepAliveInterval();
        if (this.env().continuousKeepAliveEnabled()) {
            this.continuousKeepAliveFuture = ctx.executor().scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    if (AbstractGenericHandler.this.shouldSendKeepAlive()) {
                        AbstractGenericHandler.this.createAndWriteKeepAlive(ctx);
                    }
                }
            }, interval, interval, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof IOException) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Connection reset by peer: " + cause.getMessage(), cause);
            } else {
                LOGGER.info("{}Connection reset by peer: {}", (Object)AbstractGenericHandler.logIdent(ctx, this.endpoint), (Object)cause.getMessage());
            }
            this.handleOutstandingOperations(ctx);
        } else if (cause instanceof DecoderException && cause.getCause() instanceof SSLHandshakeException) {
            if (!this.connectFuture.isDone()) {
                this.connectFuture.setFailure(cause.getCause());
            } else {
                LOGGER.warn("{}Caught SSL exception after being connected: {}", AbstractGenericHandler.logIdent(ctx, this.endpoint), cause.getMessage(), cause);
            }
        } else {
            LOGGER.warn("{}Caught unknown exception: {}", AbstractGenericHandler.logIdent(ctx, this.endpoint), cause.getMessage(), cause);
            ctx.fireExceptionCaught(cause);
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        if (this.continuousKeepAliveFuture != null) {
            LOGGER.trace("Stopping continuous keepalive execution");
            this.continuousKeepAliveFuture.cancel(true);
            this.continuousKeepAliveFuture = null;
        }
        this.handleOutstandingOperations(ctx);
    }

    private void handleOutstandingOperations(ChannelHandlerContext ctx) {
        if (this.sentRequestQueue.isEmpty()) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Not cancelling operations - sent queue is empty.");
            return;
        }
        LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Cancelling " + this.sentRequestQueue.size() + " outstanding requests.");
        while (!this.sentRequestQueue.isEmpty()) {
            CouchbaseRequest req = (CouchbaseRequest)this.sentRequestQueue.poll();
            try {
                this.sideEffectRequestToCancel(req);
                Observables.failSafe(this.env().scheduler(), this.moveResponseOut, req.observable(), new RequestCancelledException("Request cancelled in-flight."));
            }
            catch (Exception ex) {
                LOGGER.info("Exception thrown while cancelling outstanding operation: {}", (Object)RedactableArgument.user(req.toString()), (Object)ex);
            }
        }
        this.sentRequestTimings.clear();
    }

    protected void sideEffectRequestToCancel(REQUEST request) {
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (!this.shouldSendKeepAlive() || this.env().continuousKeepAliveEnabled()) {
                return;
            }
            this.createAndWriteKeepAlive(ctx);
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    private void createAndWriteKeepAlive(ChannelHandlerContext ctx) {
        CouchbaseRequest keepAlive = this.createKeepAliveRequest();
        if (keepAlive != null) {
            KeepAliveResponseAction subscriber = new KeepAliveResponseAction(ctx);
            keepAlive.subscriber(subscriber);
            keepAlive.observable().timeout(this.env().keepAliveTimeout(), TimeUnit.MILLISECONDS).subscribe((Subscriber)subscriber);
            this.onKeepAliveFired(ctx, keepAlive);
            Channel channel = ctx.channel();
            if (channel.isActive() && channel.isWritable()) {
                ctx.pipeline().writeAndFlush(keepAlive);
            }
        }
    }

    private boolean atLeastOneActiveInRequestQueue() {
        for (CouchbaseRequest elem : this.sentRequestQueue) {
            if (!elem.isActive()) continue;
            return true;
        }
        return false;
    }

    public boolean shouldSendKeepAlive() {
        if (this.pipeline) {
            return true;
        }
        return (this.sentRequestQueue.isEmpty() || !this.atLeastOneActiveInRequestQueue()) && this.currentDecodingState == DecodingState.INITIAL;
    }

    protected CouchbaseRequest createKeepAliveRequest() {
        return null;
    }

    protected void onKeepAliveFired(ChannelHandlerContext ctx, CouchbaseRequest keepAliveRequest) {
        if (this.env().continuousKeepAliveEnabled() && LOGGER.isTraceEnabled()) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "Continuous KeepAlive fired");
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "KeepAlive fired");
        }
    }

    protected void onKeepAliveResponse(ChannelHandlerContext ctx, CouchbaseResponse keepAliveResponse) {
        if (this.traceEnabled) {
            LOGGER.trace(AbstractGenericHandler.logIdent(ctx, this.endpoint) + "keepAlive was answered, status " + (Object)((Object)keepAliveResponse.status()));
        }
    }

    protected REQUEST currentRequest() {
        return this.currentRequest;
    }

    protected String remoteHostname() {
        return this.remoteHostname;
    }

    protected CoreEnvironment env() {
        return this.endpoint.environment();
    }

    protected AbstractEndpoint endpoint() {
        return this.endpoint;
    }

    protected static RedactableArgument logIdent(ChannelHandlerContext ctx, Endpoint endpoint) {
        return RedactableArgument.system("[" + ctx.channel().remoteAddress() + "][" + endpoint.getClass().getSimpleName() + "]: ");
    }

    public static void addHttpBasicAuth(ChannelHandlerContext ctx, HttpRequest request, String user, String password) {
        String pw = password == null ? "" : password;
        ByteBuf raw = ctx.alloc().buffer(user.length() + pw.length() + 1);
        raw.writeBytes((user + ":" + pw).getBytes(CHARSET));
        ByteBuf encoded = Base64.encode(raw, false);
        request.headers().add("Authorization", (Object)("Basic " + encoded.toString(CHARSET)));
        encoded.release();
        raw.release();
    }

    protected String remoteHttpHost(ChannelHandlerContext ctx) {
        if (this.remoteHttpHost == null) {
            SocketAddress addr = ctx.channel().remoteAddress();
            if (addr instanceof InetSocketAddress) {
                InetSocketAddress inetAddr = (InetSocketAddress)addr;
                this.remoteHttpHost = inetAddr.getAddress().getHostAddress() + ":" + inetAddr.getPort();
            } else {
                this.remoteHttpHost = addr.toString();
            }
        }
        return this.remoteHttpHost;
    }

    public DecodingState getDecodingState() {
        return this.currentDecodingState;
    }

    private class KeepAliveResponseAction
    extends Subscriber<CouchbaseResponse> {
        private final ChannelHandlerContext ctx;

        KeepAliveResponseAction(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        public void onCompleted() {
            AbstractGenericHandler.this.keepAliveThreshold = 0L;
        }

        public void onError(Throwable e) {
            if (this.ctx.channel() == null || !this.ctx.channel().isActive()) {
                return;
            }
            if (e instanceof TimeoutException) {
                AbstractGenericHandler.this.endpoint.setLastKeepAliveLatency(TimeUnit.MILLISECONDS.toMicros(AbstractGenericHandler.this.env().keepAliveTimeout()));
            }
            LOGGER.warn("{}Got error while consuming KeepAliveResponse.", (Object)AbstractGenericHandler.logIdent(this.ctx, AbstractGenericHandler.this.endpoint), (Object)e);
            AbstractGenericHandler.this.keepAliveThreshold++;
            if (AbstractGenericHandler.this.keepAliveThreshold >= AbstractGenericHandler.this.env().keepAliveErrorThreshold()) {
                LOGGER.warn("{}KeepAliveThreshold reached - closing this socket proactively.", (Object)RedactableArgument.system(AbstractGenericHandler.logIdent(this.ctx, AbstractGenericHandler.this.endpoint)));
                this.ctx.close().addListener(new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            LOGGER.warn("Error while proactively closing the socket.", future.cause());
                        }
                    }
                });
            }
        }

        public void onNext(CouchbaseResponse couchbaseResponse) {
            AbstractGenericHandler.this.onKeepAliveResponse(this.ctx, couchbaseResponse);
        }
    }
}

