/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.kmonitor.net;

import com.taobao.kmonitor.common.ToolUtils;
import com.taobao.kmonitor.net.thrift.Event.Event;
import com.taobao.kmonitor.net.thrift.Status;
import com.taobao.kmonitor.net.thrift.ThriftFlumeEvent;
import com.taobao.kmonitor.net.thrift.ThriftSourceProtocol;
import com.taobao.kmonitor.tool.ToolUtil;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AgentClient {
    private static final Logger log = LoggerFactory.getLogger(AgentClient.class);
    private static final int DEFAULT_POOL_SIZE = 2;
    private static final int DEFAULT_TASK_QUEUE_SIZE = 99999;
    private static final int DEFUALT_TIMEOUT = 3000;
    private HostInfo hostInfo;
    private ClientWrapper client;
    private boolean running;
    private Lock lock;
    private ExecutorService pool;

    public AgentClient(String address) {
        this(address, 2, 99999);
    }

    public AgentClient(String address, int worker, int queueSize) {
        this.hostInfo = new HostInfo(address);
        this.lock = new ReentrantLock();
        this.client = new ClientWrapper(this.hostInfo);
        this.running = false;
        this.pool = new ThreadPoolExecutor(2, Math.max(2, worker), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Math.max(1, queueSize)), ToolUtil.buildNameThreadFactory("agent-client"));
    }

    private void checkSocket() {
        if (!this.client.isStart()) {
            this.reConnect();
        }
    }

    private void reConnect() {
        this.client = new ClientWrapper(this.hostInfo);
        this.client.start();
    }

    public void append(Event event) {
        if (event != null && this.running) {
            final ThriftFlumeEvent e = new ThriftFlumeEvent(event.getHeaders(), ByteBuffer.wrap(event.getBody()));
            this.pool.submit(new Runnable(){

                @Override
                public void run() {
                    AgentClient.this.lock.lock();
                    try {
                        AgentClient.this.checkSocket();
                        Status status = AgentClient.this.client.append(e);
                        if (status != Status.OK) {
                            log.warn("client send event failed");
                        }
                    }
                    catch (TException e2) {
                        AgentClient.this.client.close();
                        log.warn("client send event error:", (Object)e2.getMessage(), (Object)ToolUtils.throwableToString(e2));
                    }
                    finally {
                        AgentClient.this.lock.unlock();
                    }
                }
            });
        }
    }

    public void appendBatch(List<Event> events) {
        if (events != null && this.running) {
            final ArrayList<ThriftFlumeEvent> es = new ArrayList<ThriftFlumeEvent>();
            for (Event e : events) {
                es.add(new ThriftFlumeEvent(e.getHeaders(), ByteBuffer.wrap(e.getBody())));
            }
            if (!es.isEmpty()) {
                this.pool.submit(new Runnable(){

                    @Override
                    public void run() {
                        AgentClient.this.lock.lock();
                        try {
                            AgentClient.this.checkSocket();
                            Status status = AgentClient.this.client.appendBatch(es);
                            if (status != Status.OK) {
                                log.warn("client send event failed");
                            }
                        }
                        catch (TException e) {
                            AgentClient.this.client.close();
                            log.warn("client send event error:", (Object)e.getMessage(), (Object)ToolUtils.throwableToString(e));
                        }
                        finally {
                            AgentClient.this.lock.unlock();
                        }
                    }
                });
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() {
        this.client.start();
        this.running = true;
    }

    public void close() {
        this.client.close();
        this.pool.shutdown();
        this.running = false;
    }

    public static class ClientWrapper {
        private TSocket socket;
        private ThriftSourceProtocol.Client client;
        private TTransport transport;
        private volatile boolean started;

        public ClientWrapper(HostInfo hostInfo) {
            this(hostInfo.host, hostInfo.port);
        }

        public ClientWrapper(String host, int port) {
            this.socket = new TSocket(host, port, 3000);
            this.started = false;
        }

        public Status append(ThriftFlumeEvent event) throws TException {
            return this.client.append(event);
        }

        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            return this.client.appendBatch(events);
        }

        public void start() {
            this.transport = new TFastFramedTransport((TTransport)this.socket);
            if (!this.transport.isOpen()) {
                try {
                    this.transport.open();
                    this.client = new ThriftSourceProtocol.Client((TProtocol)new TCompactProtocol(this.transport));
                    this.started = true;
                }
                catch (TTransportException e) {
                    log.error("connect to agent failed!", (Object)ToolUtils.throwableToString(e));
                    this.close();
                }
            }
        }

        public boolean isStart() {
            return this.started;
        }

        public void close() {
            if (this.transport != null) {
                this.transport.close();
            }
            this.started = false;
        }
    }

    private class HostInfo {
        public String host;
        public int port;

        public HostInfo(String address) {
            this.init(address);
        }

        private void init(String address) {
            String oneAddr = address.split("\\s+")[0];
            String[] hostPort = oneAddr.split(":");
            this.host = hostPort[0];
            this.port = Integer.valueOf(hostPort[1]);
        }
    }
}

