package com.alibaba.ons.open.trace.core.dispatch.impl;

import com.alibaba.ons.open.trace.core.common.OnsTraceConstants;
import com.alibaba.ons.open.trace.core.dispatch.AsyncAppender;
import com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;

/* loaded from: input_file:lib/ons-client-1.2.7.Final.jar:com/alibaba/ons/open/trace/core/dispatch/impl/AsyncTraceDispatcher.class */
public class AsyncTraceDispatcher extends AsyncDispatcher {
    private static final Logger clientlog = ClientLogger.getLog();
    private final Object[] entries;
    private final int queueSize;
    private final int indexMask;
    private final int notifyThreshold;
    private final int maxDelayTime = 20;
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private AtomicLong putIndex;
    private AtomicLong discardCount;
    private AtomicLong takeIndex;
    private AsyncAppender appender;
    private String workerName;
    private Thread worker;
    private AtomicBoolean running;

    /* loaded from: input_file:lib/ons-client-1.2.7.Final.jar:com/alibaba/ons/open/trace/core/dispatch/impl/AsyncTraceDispatcher$AsyncRunnable.class */
    class AsyncRunnable implements Runnable {
        AsyncRunnable() {
        }

        /* JADX WARN: Type inference failed for: r0v13, types: [java.util.concurrent.atomic.AtomicLong] */
        @Override // java.lang.Runnable
        public void run() {
            AsyncTraceDispatcher asyncTraceDispatcher = AsyncTraceDispatcher.this;
            int i = asyncTraceDispatcher.indexMask;
            int i2 = asyncTraceDispatcher.queueSize;
            String str = asyncTraceDispatcher.workerName;
            Object[] objArr = asyncTraceDispatcher.entries;
            AtomicLong atomicLong = asyncTraceDispatcher.putIndex;
            ?? r0 = asyncTraceDispatcher.takeIndex;
            AtomicLong atomicLong2 = asyncTraceDispatcher.discardCount;
            AtomicBoolean atomicBoolean = asyncTraceDispatcher.running;
            ReentrantLock reentrantLock = asyncTraceDispatcher.lock;
            Condition condition = asyncTraceDispatcher.notEmpty;
            long millis = TimeUnit.MINUTES.toMillis(1L);
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                try {
                    atomicBoolean.set(true);
                    long j = r0.get();
                    long j2 = atomicLong.get() - j;
                    if (j2 > 0) {
                        do {
                            int i3 = ((int) j) & i;
                            Object obj = objArr[i3];
                            while (obj == null) {
                                Thread.yield();
                                obj = objArr[i3];
                            }
                            objArr[i3] = null;
                            long j3 = j + 1;
                            j = r0;
                            r0.set(j3);
                            j2--;
                            asyncTraceDispatcher.appender.append(obj);
                        } while (j2 > 0);
                        asyncTraceDispatcher.appender.flush();
                        if (atomicLong2.get() > 0) {
                            long currentTimeMillis2 = System.currentTimeMillis();
                            if (currentTimeMillis2 - currentTimeMillis > millis) {
                                atomicLong2.get();
                                atomicLong2.lazySet(0L);
                                currentTimeMillis = currentTimeMillis2;
                            }
                        }
                    } else if (reentrantLock.tryLock()) {
                        try {
                            atomicBoolean.set(false);
                            condition.await(20L, TimeUnit.MILLISECONDS);
                            reentrantLock.unlock();
                        } catch (Throwable th) {
                            reentrantLock.unlock();
                            throw th;
                            break;
                        }
                    }
                } catch (InterruptedException e) {
                    AsyncTraceDispatcher.clientlog.info("[WARN] " + str + " async thread is iterrupted");
                    atomicBoolean.set(false);
                    return;
                } catch (Exception e2) {
                    AsyncTraceDispatcher.clientlog.info("[ERROR] Fail to async write log");
                }
            }
        }
    }

    public AsyncTraceDispatcher(Properties properties) {
        int numberOfLeadingZeros = 1 << (32 - Integer.numberOfLeadingZeros(Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")) - 1));
        this.queueSize = numberOfLeadingZeros;
        this.entries = new Object[numberOfLeadingZeros];
        this.indexMask = numberOfLeadingZeros - 1;
        this.notifyThreshold = Integer.parseInt(properties.getProperty(OnsTraceConstants.WakeUpNum, "1"));
        this.putIndex = new AtomicLong(0L);
        this.discardCount = new AtomicLong(0L);
        this.takeIndex = new AtomicLong(0L);
        this.running = new AtomicBoolean(false);
        this.lock = new ReentrantLock(false);
        this.notEmpty = this.lock.newCondition();
    }

    @Override // com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher
    public void start(AsyncAppender asyncAppender, String str) {
        this.appender = asyncAppender;
        this.workerName = str;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncDispatcher-Thread-" + str);
        this.worker.setDaemon(true);
        this.worker.start();
    }

    public int size() {
        return (int) (this.putIndex.get() - this.takeIndex.get());
    }

    @Override // com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher
    public boolean append(Object obj) {
        long j;
        long j2;
        long j3 = this.queueSize;
        do {
            j = this.putIndex.get();
            j2 = j - this.takeIndex.get();
            if (j2 >= j3) {
                clientlog.info("msgtrace buffer is full " + obj);
                return false;
            }
        } while (!this.putIndex.compareAndSet(j, j + 1));
        this.entries[((int) j) & this.indexMask] = obj;
        if (j2 < this.notifyThreshold || this.running.get() || !this.lock.tryLock()) {
            return true;
        }
        try {
            try {
                this.notEmpty.signal();
                this.lock.unlock();
                return true;
            } catch (Exception e) {
                clientlog.info("fail to signal notEmpty,maybe block!");
                this.lock.unlock();
                return true;
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher
    public void flush() throws IOException {
        long currentTimeMillis = System.currentTimeMillis() + 500;
        while (size() > 0 && System.currentTimeMillis() <= currentTimeMillis) {
            if (this.running.get()) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    return;
                }
            } else if (this.lock.tryLock()) {
                try {
                    try {
                        this.notEmpty.signal();
                        this.lock.unlock();
                    } catch (Exception e2) {
                        clientlog.info("fail to signal notEmpty,maybe block!");
                        this.lock.unlock();
                    }
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
        }
    }

    @Override // com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher
    public void registerShutDownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.alibaba.ons.open.trace.core.dispatch.impl.AsyncTraceDispatcher.1
            private volatile boolean hasShutdown = false;

            @Override // java.lang.Runnable
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        try {
                            AsyncTraceDispatcher.this.flush();
                        } catch (IOException e) {
                            AsyncTraceDispatcher.clientlog.error("system mqtrace hook shutdown failed ,maybe loss some trace data");
                        }
                    }
                }
            }
        }, "ShutdownHookMQTrace"));
    }
}
