package cn.com.duiba.cat.message.io;

import cn.com.duiba.cat.CatConstants;
import cn.com.duiba.cat.analyzer.LocalAggregator;
import cn.com.duiba.cat.configuration.ClientConfigService;
import cn.com.duiba.cat.configuration.DefaultClientConfigService;
import cn.com.duiba.cat.configuration.client.entity.Server;
import cn.com.duiba.cat.log.CatLogger;
import cn.com.duiba.cat.message.Message;
import cn.com.duiba.cat.message.internal.DefaultTransaction;
import cn.com.duiba.cat.message.internal.MessageIdFactory;
import cn.com.duiba.cat.message.queue.DefaultMessageQueue;
import cn.com.duiba.cat.message.queue.PriorityMessageQueue;
import cn.com.duiba.cat.message.spi.MessageCodec;
import cn.com.duiba.cat.message.spi.MessageQueue;
import cn.com.duiba.cat.message.spi.MessageStatistics;
import cn.com.duiba.cat.message.spi.MessageTree;
import cn.com.duiba.cat.message.spi.codec.NativeMessageCodec;
import cn.com.duiba.cat.message.spi.codec.PlainTextMessageCodec;
import cn.com.duiba.cat.message.spi.internal.DefaultMessageStatistics;
import cn.com.duiba.cat.status.AbstractCollector;
import cn.com.duiba.cat.status.StatusExtensionRegister;
import cn.com.duiba.cat.util.Threads;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/* loaded from: input_file:cn/com/duiba/cat/message/io/TcpSocketSender.class */
public class TcpSocketSender implements Threads.Task, MessageSender {
    private MessageCodec nativeCodec = new NativeMessageCodec();
    private MessageStatistics statistics = new DefaultMessageStatistics();
    private ClientConfigService configService = DefaultClientConfigService.getInstance();
    private MessageQueue messageQueue = new PriorityMessageQueue(SIZE);
    private MessageIdFactory factory = MessageIdFactory.getInstance();
    private AtomicMessageManager atomicQueueManager = new AtomicMessageManager(SIZE);
    private ChannelManager channelManager = ChannelManager.getInstance();
    private boolean active;
    private static final int SIZE = 5000;
    private static final long TEN_SECONDS = 10000;
    private static CatLogger LOGGER = CatLogger.getInstance();
    private static TcpSocketSender INSTANCE = new TcpSocketSender();

    /* loaded from: input_file:cn/com/duiba/cat/message/io/TcpSocketSender$AtomicMessageManager.class */
    public class AtomicMessageManager {
        private MessageQueue smallMessages;
        private static final long HOUR = 3600000;
        private static final int MAX_CHILD_NUMBER = 200;
        private static final int MAX_DURATION = 30000;

        public AtomicMessageManager(int i) {
            this.smallMessages = new DefaultMessageQueue(i);
        }

        public int getQueueSize() {
            return this.smallMessages.size();
        }

        private boolean isSameHour(long j, long j2) {
            return ((int) (j / HOUR)) == ((int) (j2 / HOUR));
        }

        private MessageTree mergeTree(MessageQueue messageQueue) {
            MessageTree peek;
            MessageTree poll;
            DefaultTransaction defaultTransaction = new DefaultTransaction(CatConstants.CAT_SYSTEM, "AtomicAggregator");
            MessageTree poll2 = messageQueue.poll();
            Message message = poll2.getMessage();
            long timestamp = message.getTimestamp();
            defaultTransaction.setStatus(Message.SUCCESS);
            defaultTransaction.setCompleted(true);
            defaultTransaction.setDurationStart(timestamp);
            defaultTransaction.setTimestamp(timestamp);
            defaultTransaction.setDurationInMicros(0L);
            defaultTransaction.addChild(message);
            for (int i = MAX_CHILD_NUMBER; i >= 0 && (peek = messageQueue.peek()) != null && isSameHour(timestamp, peek.getMessage().getTimestamp()) && (poll = messageQueue.poll()) != null; i--) {
                defaultTransaction.addChild(poll.getMessage());
            }
            poll2.setMessage(defaultTransaction);
            return poll2;
        }

        public boolean offerToQueue(MessageTree messageTree) {
            return this.smallMessages.offer(messageTree);
        }

        public void processAtomicMessage() {
            processNormalAtomicMessage();
        }

        void processNormalAtomicMessage() {
            while (shouldMerge(this.smallMessages)) {
                TcpSocketSender.this.offer(mergeTree(this.smallMessages));
            }
        }

        private boolean shouldMerge(MessageQueue messageQueue) {
            MessageTree peek = messageQueue.peek();
            if (peek != null) {
                return System.currentTimeMillis() - peek.getMessage().getTimestamp() > 30000 || messageQueue.size() >= MAX_CHILD_NUMBER;
            }
            return false;
        }
    }

    public static TcpSocketSender getInstance() {
        return INSTANCE;
    }

    private TcpSocketSender() {
        List<Server> servers = this.configService.getServers();
        ArrayList arrayList = new ArrayList();
        for (Server server : servers) {
            if (server.isEnabled()) {
                arrayList.add(new InetSocketAddress(server.getIp(), server.getPort()));
            }
        }
        initialize(arrayList);
    }

    @Override // cn.com.duiba.cat.util.Threads.Task
    public String getName() {
        return "netty-tcp-data-sender";
    }

    private void initialize(List<InetSocketAddress> list) {
        Threads.forGroup("cat").start(this.channelManager);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: cn.com.duiba.cat.message.io.TcpSocketSender.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                TcpSocketSender.LOGGER.info("shut down cat client in runtime shut down hook!");
                TcpSocketSender.this.shutdown();
            }
        });
        StatusExtensionRegister.getInstance().register(new AbstractCollector() { // from class: cn.com.duiba.cat.message.io.TcpSocketSender.2
            @Override // cn.com.duiba.cat.status.StatusExtension
            public String getId() {
                return "cat.status";
            }

            @Override // cn.com.duiba.cat.status.StatusExtension
            public Map<String, String> getProperties() {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put("cat.status.send.sample.ratio", String.valueOf(TcpSocketSender.this.configService.getSamplingRate() * 100.0d));
                linkedHashMap.put("cat.status.send.queue.size", String.valueOf(TcpSocketSender.this.messageQueue.size()));
                linkedHashMap.put("cat.status.send.atomic.queue.size", String.valueOf(TcpSocketSender.this.atomicQueueManager.getQueueSize()));
                for (Map.Entry<String, Long> entry : TcpSocketSender.this.statistics.getStatistics().entrySet()) {
                    linkedHashMap.put(entry.getKey(), String.valueOf(entry.getValue()));
                }
                return linkedHashMap;
            }
        });
    }

    private void logMessageDiscard(MessageTree messageTree) {
        this.statistics.onOverflowed(messageTree);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void offer(MessageTree messageTree) {
        boolean z = true;
        switch (this.configService.parseMessageType(messageTree)) {
            case NORMAL_MESSAGE:
                z = this.messageQueue.offer(messageTree);
                break;
            case SMALL_TRANSACTION:
                z = this.atomicQueueManager.offerToQueue(messageTree);
                break;
            case STAND_ALONE_EVENT:
                processTreeInClient(messageTree);
                break;
        }
        if (z) {
            return;
        }
        processTreeInClient(messageTree);
        if (messageTree.canDiscard()) {
            return;
        }
        logMessageDiscard(messageTree);
    }

    private void processMessage() {
        ChannelFuture channel = this.channelManager.channel();
        if (channel != null) {
            MessageTree messageTree = null;
            try {
                messageTree = this.messageQueue.poll();
                if (messageTree != null) {
                    sendInternal(channel, messageTree);
                    messageTree.setMessage(null);
                } else {
                    try {
                        Thread.sleep(5L);
                    } catch (Exception e) {
                        this.active = false;
                    }
                }
                return;
            } catch (Throwable th) {
                LOGGER.error(PlainTextMessageCodec.encodeTree(messageTree));
                LOGGER.error("Error when sending message over TCP socket!", th);
                return;
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - TEN_SECONDS;
        while (true) {
            try {
                MessageTree peek = this.messageQueue.peek();
                if (peek == null || peek.getMessage().getTimestamp() >= currentTimeMillis) {
                    break;
                }
                MessageTree poll = this.messageQueue.poll();
                if (poll != null) {
                    this.statistics.onOverflowed(poll);
                }
            } catch (Exception e2) {
                LOGGER.error(e2.getMessage(), e2);
            }
        }
        try {
            Thread.sleep(5L);
        } catch (Exception e3) {
            this.active = false;
        }
    }

    private void processTreeInClient(MessageTree messageTree) {
        LocalAggregator.aggregate(messageTree);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.active = true;
        while (this.active) {
            processMessage();
            this.atomicQueueManager.processAtomicMessage();
        }
        this.atomicQueueManager.processAtomicMessage();
        while (true) {
            MessageTree poll = this.messageQueue.poll();
            if (poll == null) {
                return;
            }
            ChannelFuture channel = this.channelManager.channel();
            if (channel != null) {
                sendInternal(channel, poll);
            } else {
                offer(poll);
            }
        }
    }

    @Override // cn.com.duiba.cat.message.io.MessageSender
    public void send(MessageTree messageTree) {
        if (this.configService.isMessageBlock()) {
            return;
        }
        double samplingRate = this.configService.getSamplingRate();
        if (!messageTree.canDiscard() || samplingRate >= 1.0d || messageTree.isHitSample()) {
            offer(messageTree);
        } else {
            processTreeInClient(messageTree);
        }
    }

    private void sendInternal(ChannelFuture channelFuture, MessageTree messageTree) {
        if (messageTree.getMessageId() == null) {
            messageTree.setMessageId(this.factory.getNextId());
        }
        ByteBuf encode = this.nativeCodec.encode(messageTree);
        int readableBytes = encode.readableBytes();
        channelFuture.channel().writeAndFlush(encode);
        if (this.statistics != null) {
            this.statistics.onBytes(readableBytes);
        }
    }

    @Override // cn.com.duiba.cat.util.Threads.Task
    public void shutdown() {
        this.active = false;
        this.channelManager.shutdown();
    }
}
