package com.dianping.cat.message.io;

import com.dianping.cat.ApplicationSettings;
import com.dianping.cat.CatConstants;
import com.dianping.cat.analyzer.LocalAggregator;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.NativeMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.status.StatusExtension;
import com.dianping.cat.status.StatusExtensionRegister;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.springframework.util.backoff.ExponentialBackOff;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named
/* loaded from: input_file:BOOT-INF/lib/cat-client-3.0.5.jar:com/dianping/cat/message/io/TcpSocketSender.class */
public class TcpSocketSender implements Threads.Task, MessageSender, LogEnabled {
    public static final int SIZE = ApplicationSettings.getQueueSize();
    private static final int MAX_CHILD_NUMBER = 200;
    private static final int MAX_DURATION = 30000;
    public static final long HOUR = 3600000;

    @Inject
    private MessageStatistics m_statistics;

    @Inject
    private ClientConfigManager m_configManager;

    @Inject
    private MessageIdFactory m_factory;
    private ChannelManager m_channelManager;
    private Logger m_logger;
    private boolean m_active;
    private MessageCodec m_codec = new NativeMessageCodec();
    private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
    private MessageQueue m_atomicQueue = new DefaultMessageQueue(SIZE);
    private AtomicInteger m_errors = new AtomicInteger();

    @Override // org.codehaus.plexus.logging.LogEnabled
    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    @Override // org.unidal.helper.Threads.Task
    public String getName() {
        return "TcpSocketSender";
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void initialize(List<InetSocketAddress> list) {
        this.m_channelManager = new ChannelManager(this.m_logger, list, this.m_configManager, this.m_factory);
        Threads.forGroup("cat").start(this);
        Threads.forGroup("cat").start(this.m_channelManager);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.dianping.cat.message.io.TcpSocketSender.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                TcpSocketSender.this.m_logger.info("shut down cat client in runtime shut down hook!");
                TcpSocketSender.this.shutdown();
            }
        });
        StatusExtensionRegister.getInstance().register(new StatusExtension() { // from class: com.dianping.cat.message.io.TcpSocketSender.2
            @Override // com.dianping.cat.status.StatusExtension
            public String getDescription() {
                return "client-send-queue";
            }

            @Override // com.dianping.cat.status.StatusExtension
            public String getId() {
                return "client-send-queue";
            }

            @Override // com.dianping.cat.status.StatusExtension
            public Map<String, String> getProperties() {
                HashMap hashMap = new HashMap();
                hashMap.put("msg-queue", String.valueOf(TcpSocketSender.this.m_queue.size()));
                hashMap.put("atomic-queue", String.valueOf(TcpSocketSender.this.m_queue.size()));
                return hashMap;
            }
        });
    }

    private void logQueueFullInfo(MessageTree messageTree) {
        if (this.m_statistics != null) {
            this.m_statistics.onOverflowed(messageTree);
        }
        int incrementAndGet = this.m_errors.incrementAndGet();
        if (incrementAndGet % 1000 == 0 || incrementAndGet == 1) {
            this.m_logger.error("Message queue is full in tcp socket sender! Count: " + incrementAndGet);
        }
    }

    private MessageTree mergeTree(MessageQueue messageQueue) {
        MessageTree poll;
        DefaultTransaction defaultTransaction = new DefaultTransaction(CatConstants.CAT_SYSTEM, "_CatMergeTree", null);
        MessageTree poll2 = messageQueue.poll();
        defaultTransaction.setStatus("0");
        defaultTransaction.setCompleted(true);
        defaultTransaction.setDurationInMicros(0L);
        defaultTransaction.addChild(poll2.getMessage());
        for (int i = 200; i >= 0 && (poll = messageQueue.poll()) != null; i--) {
            defaultTransaction.addChild(poll.getMessage());
        }
        ((DefaultMessageTree) poll2).setMessage(defaultTransaction);
        return poll2;
    }

    private void offer(MessageTree messageTree) {
        if (this.m_configManager.isAtomicMessage(messageTree)) {
            if (this.m_atomicQueue.offer(messageTree)) {
                return;
            }
            logQueueFullInfo(messageTree);
        } else {
            if (this.m_queue.offer(messageTree)) {
                return;
            }
            logQueueFullInfo(messageTree);
        }
    }

    private void processAtomicMessage() {
        while (shouldMerge(this.m_atomicQueue)) {
            MessageTree mergeTree = mergeTree(this.m_atomicQueue);
            if (!this.m_queue.offer(mergeTree)) {
                logQueueFullInfo(mergeTree);
            }
        }
    }

    private void processNormalMessage() {
        while (true) {
            ChannelFuture channel = this.m_channelManager.channel();
            if (channel != null) {
                try {
                    MessageTree poll = this.m_queue.poll();
                    if (poll != null) {
                        sendInternal(channel, poll);
                        poll.setMessage(null);
                    } else {
                        try {
                            Thread.sleep(5L);
                            break;
                        } catch (Exception e) {
                            this.m_active = false;
                        }
                    }
                } catch (Throwable th) {
                    this.m_logger.error("Error when sending message over TCP socket!", th);
                }
            } else {
                long currentTimeMillis = System.currentTimeMillis() - ExponentialBackOff.DEFAULT_MAX_INTERVAL;
                for (int i = 0; i < SIZE; i++) {
                    try {
                        MessageTree peek = this.m_queue.peek();
                        if (peek == null || peek.getMessage().getTimestamp() >= currentTimeMillis) {
                            break;
                        }
                        if (this.m_queue.poll() != null) {
                            logQueueFullInfo(peek);
                        }
                    } catch (Exception e2) {
                        this.m_logger.error(e2.getMessage(), e2);
                    }
                }
                try {
                    Thread.sleep(5L);
                } catch (Exception e3) {
                    this.m_active = false;
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.m_active = true;
        while (this.m_active) {
            processAtomicMessage();
            processNormalMessage();
        }
        processAtomicMessage();
        while (true) {
            MessageTree poll = this.m_queue.poll();
            if (poll == null) {
                return;
            }
            ChannelFuture channel = this.m_channelManager.channel();
            if (channel != null) {
                sendInternal(channel, poll);
            } else {
                offer(poll);
            }
        }
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void send(MessageTree messageTree) {
        if (this.m_configManager.isBlock()) {
            return;
        }
        double sampleRatio = this.m_configManager.getSampleRatio();
        if (!messageTree.canDiscard() || sampleRatio >= 1.0d || messageTree.isHitSample()) {
            offer(messageTree);
        } else {
            processTreeInClient(messageTree);
        }
    }

    private void processTreeInClient(MessageTree messageTree) {
        LocalAggregator.aggregate(messageTree);
    }

    public void sendInternal(ChannelFuture channelFuture, MessageTree messageTree) {
        if (messageTree.getMessageId() == null) {
            messageTree.setMessageId(this.m_factory.getNextId());
        }
        ByteBuf encode = this.m_codec.encode(messageTree);
        int readableBytes = encode.readableBytes();
        channelFuture.channel().writeAndFlush(encode);
        if (this.m_statistics != null) {
            this.m_statistics.onBytes(readableBytes);
        }
    }

    private boolean shouldMerge(MessageQueue messageQueue) {
        MessageTree peek = messageQueue.peek();
        if (peek != null) {
            return System.currentTimeMillis() - peek.getMessage().getTimestamp() > ExponentialBackOff.DEFAULT_MAX_INTERVAL || messageQueue.size() >= 200;
        }
        return false;
    }

    @Override // org.unidal.helper.Threads.Task
    public void shutdown() {
        this.m_active = false;
        this.m_channelManager.shutdown();
    }
}
