/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.kmonitor.sink.flume;

import com.taobao.kmonitor.common.TagsUtils;
import com.taobao.kmonitor.core.MetricsRecord;
import com.taobao.kmonitor.core.MetricsSink;
import com.taobao.kmonitor.core.MetricsValue;
import com.taobao.kmonitor.impl.KMonitorConfig;
import com.taobao.kmonitor.net.AgentClient;
import com.taobao.kmonitor.net.AgentIp;
import com.taobao.kmonitor.net.thrift.Event.Event;
import com.taobao.kmonitor.net.thrift.Event.EventBuilder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class FlumeSink
implements MetricsSink {
    private static final Log LOG = LogFactory.getLog(FlumeSink.class);
    public static final String FLUME_ADDRESS = "address";
    public static final String DEFAULT_FLUME_ADDRESS = "localhost:4141";
    public static final String FLUME_TOPIC = "topic";
    public static final String DEFAULT_FLUME_TOPIC = "metrics-topic";
    private static final String UTF8_ENCODING = "UTF-8";
    private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
    private static final String FLUME_ASSIGNED = "assigned";
    private static final String FLUME_WORKER_SIZE = "worker";
    private static final String FLUME_QUEUE_CAPACITY = "queue.capacity";
    private static final int DEFAULT_FLUME_WORKER_SIZE = 2;
    private static final int DEFAULT_FLUME_QUEUE_CAPACITY = 99999;
    private static final String DEFAULT_FLUME_ASSIGNED = "false";
    private static final String DEFAULT_HOST_PORT_SEPERATOR = ":";
    private static final long MAX_DELAY = 1800000L;
    private AgentClient client;
    private String topic;

    private String nerdAddress(SubsetConfiguration conf) {
        String address = conf.getString(FLUME_ADDRESS, DEFAULT_FLUME_ADDRESS);
        boolean assigned = Boolean.valueOf(conf.getString(FLUME_ASSIGNED, DEFAULT_FLUME_ASSIGNED));
        if (!assigned) {
            String[] hostPort = address.split(DEFAULT_HOST_PORT_SEPERATOR);
            String agentIp = AgentIp.getIp();
            String host = agentIp.isEmpty() ? hostPort[0] : agentIp;
            String port = hostPort[1];
            address = host + DEFAULT_HOST_PORT_SEPERATOR + port;
        }
        LOG.info((Object)("sink to agent address:" + address));
        return address;
    }

    private static <T> void showMetric(String name, String tag, long time, T value) {
        LOG.debug((Object)"send message detail:");
        StringBuilder sb = new StringBuilder();
        sb.append("name:").append(name).append("\n");
        sb.append("tags:").append(tag).append("\n");
        sb.append("timestamp:").append(time).append("\n");
        sb.append("value:").append(value).append("\n");
        LOG.debug((Object)sb.toString());
    }

    @Override
    public void init(SubsetConfiguration conf) {
        this.topic = conf != null ? conf.getString(FLUME_TOPIC, DEFAULT_FLUME_TOPIC) : DEFAULT_FLUME_TOPIC;
        int worker = conf != null ? conf.getInt(FLUME_WORKER_SIZE, 2) : 2;
        int queueCapacity = conf != null ? conf.getInt(FLUME_QUEUE_CAPACITY, 99999) : 99999;
        String address = conf == null ? KMonitorConfig.getSinkAddress() : this.nerdAddress(conf);
        this.client = new AgentClient(address, worker, queueCapacity);
        this.client.start();
        LOG.info((Object)("flume topic is : " + this.topic));
    }

    @Override
    public void putMetrics(MetricsRecord record) {
        ArrayList<Event> flumeEvents = new ArrayList<Event>();
        String tagsString = TagsUtils.convertMetricsTagToString(record.tags());
        this.checkDelay(record.timestamp());
        for (MetricsValue metric : record.metrics()) {
            StringBuilder metricBuilder = new StringBuilder();
            metricBuilder.append(metric.name()).append(" ");
            metricBuilder.append(record.timestamp()).append(" ");
            metricBuilder.append(metric.value()).append(" ");
            metricBuilder.append(tagsString);
            Event flumeEvent = EventBuilder.withBody(metricBuilder.toString().getBytes(UTF8_CHARSET));
            flumeEvent.getHeaders().put("service", record.getService());
            if (KMonitorConfig.getKMonitorTenantName() != null) {
                flumeEvent.getHeaders().put("tenant", KMonitorConfig.getKMonitorTenantName());
            }
            flumeEvents.add(flumeEvent);
            if (!LOG.isDebugEnabled()) continue;
            FlumeSink.showMetric(metric.name(), tagsString, record.timestamp(), metric.value());
        }
        if (!flumeEvents.isEmpty()) {
            this.client.appendBatch(flumeEvents);
            LOG.debug((Object)"send message to agent success");
        }
    }

    private void checkDelay(long timestamp) {
        if (System.currentTimeMillis() - timestamp > 1800000L) {
            LOG.error((Object)"client send metric delay at least 30 min!");
        }
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
        this.client.close();
    }
}

