package cn.com.duibaboot.ext.autoconfigure.logger.logback.appender;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import cn.com.duiba.boot.utils.DeflateUtils;
import cn.com.duiba.cat.Cat;
import cn.com.duiba.cat.message.internal.DefaultTransaction;
import cn.com.duibaboot.ext.autoconfigure.cloud.zipkin.ZipkinConstants;
import cn.com.duibaboot.ext.autoconfigure.core.utils.CatUtils;
import cn.com.duibaboot.ext.autoconfigure.logger.logback.appender.KafkaAppenderProperties;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;

/* loaded from: input_file:cn/com/duibaboot/ext/autoconfigure/logger/logback/appender/KafkaAppender.class */
public class KafkaAppender<E> extends UnsynchronizedAppenderBase<E> {
    private KafkaTemplate kafkaTemplate;
    private List<KafkaAppenderProperties.Pattern> patterns;
    private boolean needCheckJson;
    private BlockingQueue<KafkaAppender<E>.RoutingData> queue;
    private Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
    private int queueSize = 5120;
    private Thread flushQueueThread = new Thread(() -> {
        do {
            KafkaAppender<E>.RoutingData routingData = null;
            try {
                routingData = this.queue.take();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            sendEvent(routingData);
        } while (!Thread.currentThread().isInterrupted());
        while (true) {
            KafkaAppender<E>.RoutingData poll = this.queue.poll();
            if (poll == null) {
                this.kafkaTemplate.flush();
                return;
            }
            sendEvent(poll);
        }
    });
    private boolean blockWhenFull = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/com/duibaboot/ext/autoconfigure/logger/logback/appender/KafkaAppender$RoutingData.class */
    public class RoutingData {
        Set<String> topics = new HashSet();
        byte[] data;

        RoutingData() {
        }
    }

    public KafkaAppender(KafkaTemplate kafkaTemplate, Collection<KafkaAppenderProperties.Pattern> collection) {
        this.needCheckJson = false;
        this.kafkaTemplate = kafkaTemplate;
        this.patterns = new ArrayList(collection);
        Iterator<KafkaAppenderProperties.Pattern> it = collection.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            KafkaAppenderProperties.Pattern next = it.next();
            if (next.getMatchJson() != null && !next.getMatchJson().isEmpty()) {
                this.needCheckJson = true;
                break;
            }
        }
        setQueueSize(this.patterns.get(0).getQueueSize().intValue());
        this.queue = new ArrayBlockingQueue(this.queueSize);
        setBlockWhenFull(this.patterns.get(0).getBlockWhenFull().booleanValue());
    }

    protected void sendEvent(KafkaAppender<E>.RoutingData routingData) {
        if (routingData == null) {
            return;
        }
        try {
            for (String str : routingData.topics) {
                long nanoTime = System.nanoTime();
                ListenableFuture send = this.kafkaTemplate.send(str, routingData.data);
                if (CatUtils.isCatEnabled()) {
                    send.addCallback(sendResult -> {
                        DefaultTransaction newTransaction = Cat.newTransaction("KafkaLogAppender", "KafkaLogAppender");
                        newTransaction.setDurationStart(nanoTime);
                        newTransaction.setStatus(ZipkinConstants.SPAN_NOT_SAMPLED);
                        newTransaction.complete();
                    }, th -> {
                        DefaultTransaction newTransaction = Cat.newTransaction("KafkaLogAppender", "KafkaLogAppender");
                        newTransaction.setDurationStart(nanoTime);
                        newTransaction.setStatus(th);
                        newTransaction.complete();
                    });
                }
            }
        } catch (Throwable th2) {
            this.logger.error("logback kafka发送失败", th2);
        }
    }

    protected void append(E e) {
        if (e instanceof DeferredProcessingAware) {
            ((DeferredProcessingAware) e).prepareForDeferredProcessing();
        }
        convertAndQueue(e);
    }

    private void convertAndQueue(E e) {
        KafkaAppender<E>.RoutingData convertToData = convertToData(e);
        if (convertToData == null) {
            return;
        }
        if (this.blockWhenFull) {
            try {
                this.queue.put(convertToData);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        } else {
            if (this.queue.offer(convertToData)) {
                return;
            }
            this.logger.warn("try to sendMessage to kafka failed, because queue(with size:{}) is full, drop this message", Integer.valueOf(this.queue.size()));
        }
    }

    private KafkaAppender<E>.RoutingData convertToData(E e) {
        if (!(e instanceof ILoggingEvent)) {
            this.logger.error("[NOTIFYME]will never be here");
            return null;
        }
        String message = ((ILoggingEvent) e).getMessage();
        JSONObject parseObject = this.needCheckJson ? JSON.parseObject(message) : null;
        KafkaAppender<E>.RoutingData routingData = new RoutingData();
        for (KafkaAppenderProperties.Pattern pattern : this.patterns) {
            if (matches(pattern, parseObject)) {
                routingData.topics.add(pattern.getTopic());
            }
        }
        if (routingData.topics.isEmpty()) {
            return null;
        }
        routingData.data = DeflateUtils.compress(message);
        return routingData;
    }

    private boolean matches(KafkaAppenderProperties.Pattern pattern, JSONObject jSONObject) {
        if (!this.needCheckJson || pattern.getMatchJson() == null || pattern.getMatchJson().isEmpty()) {
            return true;
        }
        for (Map.Entry<String, String> entry : pattern.getMatchJson().entrySet()) {
            if (!StringUtils.equals(JSONPath.eval(jSONObject, entry.getKey()).toString(), entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    public void start() {
        super.start();
        this.flushQueueThread.setName("KafkaAppenderFlusher");
        this.flushQueueThread.start();
    }

    public void stop() {
        super.stop();
        this.flushQueueThread.interrupt();
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean z) {
        this.blockWhenFull = z;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }
}
