package com.taobao.kmonitor.sink.flume;

import com.taobao.kmonitor.common.TagsUtils;
import com.taobao.kmonitor.core.MetricsRecord;
import com.taobao.kmonitor.core.MetricsSink;
import com.taobao.kmonitor.core.MetricsValue;
import com.taobao.kmonitor.impl.KMonitorConfig;
import com.taobao.kmonitor.net.AgentClient;
import com.taobao.kmonitor.net.AgentIp;
import com.taobao.kmonitor.net.thrift.Event.Event;
import com.taobao.kmonitor.net.thrift.Event.EventBuilder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/taobao/kmonitor/sink/flume/FlumeSink.class */
public class FlumeSink implements MetricsSink {
    public static final String FLUME_ADDRESS = "address";
    public static final String DEFAULT_FLUME_ADDRESS = "localhost:4141";
    public static final String FLUME_TOPIC = "topic";
    public static final String DEFAULT_FLUME_TOPIC = "metrics-topic";
    private static final String FLUME_ASSIGNED = "assigned";
    private static final String FLUME_WORKER_SIZE = "worker";
    private static final String FLUME_QUEUE_CAPACITY = "queue.capacity";
    private static final int DEFAULT_FLUME_WORKER_SIZE = 2;
    private static final int DEFAULT_FLUME_QUEUE_CAPACITY = 99999;
    private static final String DEFAULT_FLUME_ASSIGNED = "false";
    private static final String DEFAULT_HOST_PORT_SEPERATOR = ":";
    private static final long MAX_DELAY = 1800000;
    private AgentClient client;
    private String topic;
    private static final Log LOG = LogFactory.getLog(FlumeSink.class);
    private static final String UTF8_ENCODING = "UTF-8";
    private static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING);

    private String nerdAddress(SubsetConfiguration subsetConfiguration) {
        String string = subsetConfiguration.getString(FLUME_ADDRESS, DEFAULT_FLUME_ADDRESS);
        if (!Boolean.valueOf(subsetConfiguration.getString(FLUME_ASSIGNED, DEFAULT_FLUME_ASSIGNED)).booleanValue()) {
            String[] split = string.split(DEFAULT_HOST_PORT_SEPERATOR);
            String ip = AgentIp.getIp();
            string = (ip.isEmpty() ? split[0] : ip) + DEFAULT_HOST_PORT_SEPERATOR + split[1];
        }
        LOG.info("sink to agent address:" + string);
        return string;
    }

    private static <T> void showMetric(String str, String str2, long j, T t) {
        LOG.debug("send message detail:");
        StringBuilder sb = new StringBuilder();
        sb.append("name:").append(str).append("\n");
        sb.append("tags:").append(str2).append("\n");
        sb.append("timestamp:").append(j).append("\n");
        sb.append("value:").append(t).append("\n");
        LOG.debug(sb.toString());
    }

    @Override // com.taobao.kmonitor.core.MetricsSink
    public void init(SubsetConfiguration subsetConfiguration) {
        this.topic = subsetConfiguration != null ? subsetConfiguration.getString(FLUME_TOPIC, DEFAULT_FLUME_TOPIC) : DEFAULT_FLUME_TOPIC;
        this.client = new AgentClient(subsetConfiguration == null ? KMonitorConfig.getSinkAddress() : nerdAddress(subsetConfiguration), subsetConfiguration != null ? subsetConfiguration.getInt(FLUME_WORKER_SIZE, 2) : 2, subsetConfiguration != null ? subsetConfiguration.getInt(FLUME_QUEUE_CAPACITY, DEFAULT_FLUME_QUEUE_CAPACITY) : DEFAULT_FLUME_QUEUE_CAPACITY);
        this.client.start();
        LOG.info("flume topic is : " + this.topic);
    }

    @Override // com.taobao.kmonitor.core.MetricsSink
    public void putMetrics(MetricsRecord metricsRecord) {
        ArrayList arrayList = new ArrayList();
        String convertMetricsTagToString = TagsUtils.convertMetricsTagToString(metricsRecord.tags());
        checkDelay(metricsRecord.timestamp());
        for (MetricsValue metricsValue : metricsRecord.metrics()) {
            StringBuilder sb = new StringBuilder();
            sb.append(metricsValue.name()).append(" ");
            sb.append(metricsRecord.timestamp()).append(" ");
            sb.append(metricsValue.value()).append(" ");
            sb.append(convertMetricsTagToString);
            Event withBody = EventBuilder.withBody(sb.toString().getBytes(UTF8_CHARSET));
            withBody.getHeaders().put("service", metricsRecord.getService());
            if (KMonitorConfig.getKMonitorTenantName() != null) {
                withBody.getHeaders().put("tenant", KMonitorConfig.getKMonitorTenantName());
            }
            arrayList.add(withBody);
            if (LOG.isDebugEnabled()) {
                showMetric(metricsValue.name(), convertMetricsTagToString, metricsRecord.timestamp(), metricsValue.value());
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.client.appendBatch(arrayList);
        LOG.debug("send message to agent success");
    }

    private void checkDelay(long j) {
        if (System.currentTimeMillis() - j > MAX_DELAY) {
            LOG.error("client send metric delay at least 30 min!");
        }
    }

    @Override // com.taobao.kmonitor.core.MetricsSink
    public void flush() {
    }

    @Override // com.taobao.kmonitor.core.MetricsSink
    public void close() {
        this.client.close();
    }
}
