/*
 * Decompiled with CFR 0.152.
 */
package cn.com.duibaboot.ext.autoconfigure.logger.logback.appender;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import ch.qos.logback.core.spi.DeferredProcessingAware;
import cn.com.duibaboot.ext.autoconfigure.logger.logback.appender.KafkaAppenderProperties;
import cn.com.duibaboot.ext.autoconfigure.perftest.PerfTestContext;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaAppender<E>
extends UnsynchronizedAppenderBase<E> {
    private KafkaTemplate kafkaTemplate;
    private List<KafkaAppenderProperties.Pattern> patterns;
    private boolean needCheckJson = false;
    private Logger logger = LoggerFactory.getLogger(KafkaAppender.class);
    private BlockingQueue<RoutingData> queue;
    private int queueSize = 5120;
    private Thread flushQueueThread = new Thread(() -> {
        RoutingData routingData;
        do {
            routingData = null;
            try {
                routingData = this.queue.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.sendEvent(routingData);
        } while (!Thread.currentThread().isInterrupted());
        while ((routingData = (RoutingData)this.queue.poll()) != null) {
            this.sendEvent(routingData);
            this.kafkaTemplate.flush();
        }
    });
    private boolean blockWhenFull = false;

    public KafkaAppender(KafkaTemplate kafkaTemplate, Collection<KafkaAppenderProperties.Pattern> patterns) {
        this.kafkaTemplate = kafkaTemplate;
        this.patterns = new ArrayList<KafkaAppenderProperties.Pattern>(patterns);
        for (KafkaAppenderProperties.Pattern p : patterns) {
            if (p.getMatchJson() == null || p.getMatchJson().isEmpty()) continue;
            this.needCheckJson = true;
            break;
        }
        this.setQueueSize(this.patterns.get(0).getQueueSize());
        this.queue = new ArrayBlockingQueue<RoutingData>(this.queueSize);
        this.setBlockWhenFull(this.patterns.get(0).getBlockWhenFull());
    }

    protected void sendEvent(RoutingData routingData) {
        if (routingData == null) {
            return;
        }
        for (String topic : routingData.topics) {
            this.kafkaTemplate.send(topic, (Object)routingData.data);
        }
    }

    protected void append(E eventObject) {
        if (eventObject instanceof DeferredProcessingAware) {
            ((DeferredProcessingAware)eventObject).prepareForDeferredProcessing();
        }
        if (PerfTestContext.isCurrentInPerfTestMode()) {
            return;
        }
        this.convertAndQueue(eventObject);
    }

    private void convertAndQueue(E eventObject) {
        RoutingData routingData = this.convertToData(eventObject);
        if (routingData == null) {
            return;
        }
        if (!this.blockWhenFull) {
            boolean added = this.queue.offer(routingData);
            if (!added) {
                this.logger.warn("try to sendMessage to kafka failed, because queue(with size:{}) is full, drop this message", (Object)this.queue.size());
            }
        } else {
            try {
                this.queue.put(routingData);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private RoutingData convertToData(E eventObject) {
        if (!(eventObject instanceof ILoggingEvent)) {
            this.logger.error("[NOTIFYME]will never be here");
            return null;
        }
        String eventData = ((ILoggingEvent)eventObject).getMessage().toString();
        JSONObject json = null;
        if (this.needCheckJson) {
            json = JSON.parseObject((String)eventData);
        }
        RoutingData routingData = new RoutingData();
        routingData.data = eventData;
        for (KafkaAppenderProperties.Pattern pattern : this.patterns) {
            if (!this.matches(pattern, json)) continue;
            routingData.topics.add(pattern.getTopic());
        }
        if (routingData.topics.isEmpty()) {
            return null;
        }
        return routingData;
    }

    private boolean matches(KafkaAppenderProperties.Pattern pattern, JSONObject json) {
        if (!this.needCheckJson) {
            return true;
        }
        if (pattern.getMatchJson() == null || pattern.getMatchJson().isEmpty()) {
            return true;
        }
        for (Map.Entry<String, String> entry : pattern.getMatchJson().entrySet()) {
            if (StringUtils.equals((CharSequence)JSONPath.eval((Object)json, (String)entry.getKey()).toString(), (CharSequence)entry.getValue())) continue;
            return false;
        }
        return true;
    }

    public void start() {
        super.start();
        this.flushQueueThread.setName("KafkaAppenderFlusher");
        this.flushQueueThread.start();
    }

    public void stop() {
        super.stop();
        this.flushQueueThread.interrupt();
    }

    public boolean isBlockWhenFull() {
        return this.blockWhenFull;
    }

    public void setBlockWhenFull(boolean blockWhenFull) {
        this.blockWhenFull = blockWhenFull;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    class RoutingData {
        Set<String> topics = new HashSet<String>();
        String data;

        RoutingData() {
        }
    }
}

