package com.dianping.cat.message.io;

import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;

/* loaded from: input_file:WEB-INF/lib/cat-client-1.4.0.db.jar:com/dianping/cat/message/io/TcpSocketSender.class */
public class TcpSocketSender implements Threads.Task, MessageSender, LogEnabled {
    public static final String ID = "tcp-socket-sender";
    public static final int SIZE = 5000;

    @Inject
    private MessageCodec m_codec;

    @Inject
    private MessageStatistics m_statistics;

    @Inject
    private ClientConfigManager m_configManager;

    @Inject
    private MessageIdFactory m_factory;
    private List<InetSocketAddress> m_serverAddresses;
    private ChannelManager m_manager;
    private Logger m_logger;
    private transient boolean m_active;
    private static final int MAX_CHILD_NUMBER = 200;
    private MessageQueue m_queue = new DefaultMessageQueue(5000);
    private MessageQueue m_atomicTrees = new DefaultMessageQueue(5000);
    private AtomicInteger m_errors = new AtomicInteger();
    private AtomicInteger m_attempts = new AtomicInteger();

    /* loaded from: input_file:WEB-INF/lib/cat-client-1.4.0.db.jar:com/dianping/cat/message/io/TcpSocketSender$MergeAtomicTask.class */
    public class MergeAtomicTask implements Threads.Task {
        public MergeAtomicTask() {
        }

        @Override // org.unidal.helper.Threads.Task
        public String getName() {
            return "merge-atomic-task";
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (TcpSocketSender.this.shouldMerge(TcpSocketSender.this.m_atomicTrees)) {
                    MessageTree mergeTree = TcpSocketSender.this.mergeTree(TcpSocketSender.this.m_atomicTrees);
                    if (!TcpSocketSender.this.m_queue.offer(mergeTree)) {
                        TcpSocketSender.this.logQueueFullInfo(mergeTree);
                    }
                } else {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        }

        @Override // org.unidal.helper.Threads.Task
        public void shutdown() {
        }
    }

    private boolean checkWritable(ChannelFuture channelFuture) {
        boolean z = false;
        Channel channel = channelFuture.channel();
        if (channelFuture != null && channel.isOpen()) {
            if (channel.isActive() && channel.isWritable()) {
                z = true;
            } else {
                int incrementAndGet = this.m_attempts.incrementAndGet();
                if (incrementAndGet % 1000 == 0 || incrementAndGet == 1) {
                    this.m_logger.error("Netty write buffer is full! Attempts: " + incrementAndGet);
                }
            }
        }
        return z;
    }

    @Override // org.codehaus.plexus.logging.LogEnabled
    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    @Override // org.unidal.helper.Threads.Task
    public String getName() {
        return "TcpSocketSender";
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void initialize() {
        this.m_manager = new ChannelManager(this.m_logger, this.m_serverAddresses, this.m_queue, this.m_configManager, this.m_factory);
        Threads.forGroup("cat").start(this);
        Threads.forGroup("cat").start(this.m_manager);
        Threads.forGroup("cat").start(new MergeAtomicTask());
    }

    private boolean isAtomicMessage(MessageTree messageTree) {
        Message message = messageTree.getMessage();
        if (!(message instanceof Transaction)) {
            return true;
        }
        String type = message.getType();
        return type.startsWith("Cache.") || "SQL".equals(type);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logQueueFullInfo(MessageTree messageTree) {
        if (this.m_statistics != null) {
            this.m_statistics.onOverflowed(messageTree);
        }
        int incrementAndGet = this.m_errors.incrementAndGet();
        if (incrementAndGet % 1000 == 0 || incrementAndGet == 1) {
            this.m_logger.error("Message queue is full in tcp socket sender! Count: " + incrementAndGet);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageTree mergeTree(MessageQueue messageQueue) {
        int i = 200;
        DefaultTransaction defaultTransaction = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);
        MessageTree poll = messageQueue.poll();
        defaultTransaction.setStatus("0");
        defaultTransaction.setCompleted(true);
        defaultTransaction.addChild(poll.getMessage());
        defaultTransaction.setTimestamp(poll.getMessage().getTimestamp());
        long j = 0;
        long j2 = 0;
        while (true) {
            if (i < 0) {
                break;
            }
            MessageTree poll2 = messageQueue.poll();
            if (poll2 == null) {
                defaultTransaction.setDurationInMillis((j - defaultTransaction.getTimestamp()) + j2);
                break;
            }
            j = poll2.getMessage().getTimestamp();
            j2 = poll2.getMessage() instanceof DefaultTransaction ? ((DefaultTransaction) poll2.getMessage()).getDurationInMillis() : 0L;
            defaultTransaction.addChild(poll2.getMessage());
            this.m_factory.reuse(poll2.getMessageId());
            i--;
        }
        ((DefaultMessageTree) poll).setMessage(defaultTransaction);
        return poll;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.m_active = true;
        while (this.m_active) {
            ChannelFuture channel = this.m_manager.channel();
            if (channel == null || !checkWritable(channel)) {
                try {
                    Thread.sleep(5L);
                } catch (Exception e) {
                    this.m_active = false;
                }
            } else {
                try {
                    MessageTree poll = this.m_queue.poll();
                    if (poll != null) {
                        sendInternal(poll);
                        poll.setMessage(null);
                    }
                } catch (Throwable th) {
                    this.m_logger.error("Error when sending message over TCP socket!", th);
                }
            }
        }
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void send(MessageTree messageTree) {
        if (isAtomicMessage(messageTree)) {
            if (this.m_atomicTrees.offer(messageTree, this.m_manager.getSample())) {
                return;
            }
            logQueueFullInfo(messageTree);
        } else {
            if (this.m_queue.offer(messageTree, this.m_manager.getSample())) {
                return;
            }
            logQueueFullInfo(messageTree);
        }
    }

    private void sendInternal(MessageTree messageTree) {
        ChannelFuture channel = this.m_manager.channel();
        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(10240);
        this.m_codec.encode(messageTree, buffer);
        int readableBytes = buffer.readableBytes();
        channel.channel().writeAndFlush(buffer);
        if (this.m_statistics != null) {
            this.m_statistics.onBytes(readableBytes);
        }
    }

    public void setServerAddresses(List<InetSocketAddress> list) {
        this.m_serverAddresses = list;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldMerge(MessageQueue messageQueue) {
        MessageTree peek = messageQueue.peek();
        if (peek != null) {
            return System.currentTimeMillis() - peek.getMessage().getTimestamp() > ((long) 30000) || messageQueue.size() >= 200;
        }
        return false;
    }

    @Override // org.unidal.helper.Threads.Task
    public void shutdown() {
        this.m_active = false;
        this.m_manager.shutdown();
    }
}
