package cn.com.duiba.kjy.base.api.queue;

import cn.com.duiba.boot.perftest.InternalPerfTestContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:cn/com/duiba/kjy/base/api/queue/BatchConsumeBlockingQueue.class */
public class BatchConsumeBlockingQueue<T> {
    private static final Logger log = LoggerFactory.getLogger(BatchConsumeBlockingQueue.class);
    private final int batchSize;
    private final int maxWaitSize;
    private final int maxWaitTimeMillis;
    private final Consumer<List<T>> batchConsumer;
    private final LinkedBlockingQueue<T> queue4Normal;
    private final LinkedBlockingQueue<T> queue4PerfTest;
    private Thread scheduleThread;
    private final Thread batchConsumeNormalThread;
    private final Thread batchConsumePerfThread;
    private volatile long lastConsumeTimeMillis4Normal = 0;
    private volatile long lastConsumeTimeMillis4PerfTest = 0;
    private volatile boolean isStarted = false;
    private final ReentrantLock consumeNormalLock = new ReentrantLock();
    private final Condition normalCondition = this.consumeNormalLock.newCondition();
    private final ReentrantLock consumePerfLock = new ReentrantLock();
    private final Condition perfCondition = this.consumePerfLock.newCondition();
    private volatile String lastPerfTestSceneId = null;
    private volatile boolean isLastTestCluster = false;

    public static void main(String[] strArr) {
        log.info("asd");
    }

    public BatchConsumeBlockingQueue(int i, int i2, int i3, int i4, Consumer<List<T>> consumer) {
        Assert.notNull(consumer, "batchConsumer must not be null");
        Assert.isTrue(i3 <= 0 || i2 <= i3, "batchSize must be less than or equal to maxWaitSize");
        Assert.isTrue(i3 > 0 || i4 > 0, "maxWaitSize and maxWaitTimeMillis cannot both be less than or equal to zero");
        if (i <= 0) {
            this.queue4Normal = new LinkedBlockingQueue<>();
            this.queue4PerfTest = new LinkedBlockingQueue<>();
        } else {
            this.queue4Normal = new LinkedBlockingQueue<>(i);
            this.queue4PerfTest = new LinkedBlockingQueue<>(i);
        }
        this.batchSize = i2;
        this.maxWaitSize = i3;
        this.maxWaitTimeMillis = i4;
        this.batchConsumer = consumer;
        if (i4 > 0) {
            this.scheduleThread = new Thread(this::schedule);
        }
        this.batchConsumeNormalThread = new Thread(this::batchConsumeNormalQueue);
        this.batchConsumePerfThread = new Thread(this::batchConsumePerfQueue);
    }

    private void schedule() {
        while (true) {
            try {
                Thread.sleep(this.maxWaitTimeMillis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (Thread.currentThread().isInterrupted() || !this.isStarted) {
                return;
            }
            signalNormal();
            signalPerf();
        }
    }

    private void batchConsumeNormalQueue() {
        InternalPerfTestContext.markAsNormal();
        while (true) {
            try {
                batchConsume(this.queue4Normal, this.consumeNormalLock, this.normalCondition);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                log.error("触发消费时发生异常", th);
            }
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            if (!this.isStarted && this.queue4Normal.isEmpty()) {
                return;
            }
        }
    }

    private void batchConsumePerfQueue() {
        do {
            InternalPerfTestContext.markAsPerfTest(this.lastPerfTestSceneId, this.isLastTestCluster);
            try {
                try {
                    try {
                        batchConsume(this.queue4PerfTest, this.consumePerfLock, this.perfCondition);
                        InternalPerfTestContext.markAsNormal();
                    } catch (Throwable th) {
                        log.error("触发消费时发生异常", th);
                        InternalPerfTestContext.markAsNormal();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    InternalPerfTestContext.markAsNormal();
                }
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
            } catch (Throwable th2) {
                InternalPerfTestContext.markAsNormal();
                throw th2;
            }
        } while (this.isStarted);
    }

    private void batchConsume(BlockingQueue<T> blockingQueue, ReentrantLock reentrantLock, Condition condition) throws InterruptedException {
        reentrantLock.lockInterruptibly();
        try {
            if (this.isStarted) {
                while (needWait(blockingQueue) && this.isStarted) {
                    condition.await();
                }
            }
            List<T> drainTo = drainTo(blockingQueue);
            reentrantLock.unlock();
            if (drainTo.isEmpty()) {
                return;
            }
            this.batchConsumer.accept(drainTo);
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    private void signal() {
        if (InternalPerfTestContext.isCurrentInPerfTestMode()) {
            signalPerf();
        } else {
            signalNormal();
        }
    }

    private void signalNormal() {
        ReentrantLock reentrantLock = this.consumeNormalLock;
        reentrantLock.lock();
        try {
            this.normalCondition.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    private void signalPerf() {
        ReentrantLock reentrantLock = this.consumePerfLock;
        reentrantLock.lock();
        try {
            this.perfCondition.signal();
        } finally {
            reentrantLock.unlock();
        }
    }

    private List<T> drainTo(BlockingQueue<T> blockingQueue) {
        ArrayList arrayList = new ArrayList();
        blockingQueue.drainTo(arrayList, this.batchSize);
        setLastConsumeTimeMillis();
        return arrayList;
    }

    private BlockingQueue<T> determineQueue() {
        if (!this.isStarted) {
            throw new IllegalStateException("not started yet!");
        }
        boolean isCurrentInPerfTestMode = InternalPerfTestContext.isCurrentInPerfTestMode();
        if (isCurrentInPerfTestMode) {
            this.lastPerfTestSceneId = InternalPerfTestContext.getCurrentSceneId();
            this.isLastTestCluster = InternalPerfTestContext.isTestCluster();
        }
        return isCurrentInPerfTestMode ? this.queue4PerfTest : this.queue4Normal;
    }

    public synchronized void start() {
        if (this.isStarted) {
            return;
        }
        this.isStarted = true;
        if (this.maxWaitTimeMillis > 0) {
            this.scheduleThread.start();
        }
        this.batchConsumeNormalThread.start();
        this.batchConsumePerfThread.start();
    }

    public synchronized void stop() {
        if (this.isStarted) {
            this.isStarted = false;
            if (this.maxWaitTimeMillis > 0) {
                this.scheduleThread.interrupt();
            }
            signalNormal();
            this.batchConsumePerfThread.interrupt();
        }
    }

    private boolean needWait(BlockingQueue<T> blockingQueue) {
        return sizeWait(blockingQueue) && timeOutWait(blockingQueue);
    }

    private boolean sizeWait(BlockingQueue<T> blockingQueue) {
        return this.maxWaitSize <= 0 || blockingQueue.size() < this.maxWaitSize;
    }

    private boolean timeOutWait(BlockingQueue<T> blockingQueue) {
        return this.maxWaitTimeMillis <= 0 || blockingQueue.isEmpty() || getWaitTime() < ((long) this.maxWaitTimeMillis);
    }

    private long getWaitTime() {
        return System.currentTimeMillis() - getLastConsumeTimeMillis();
    }

    private void setLastConsumeTimeMillis() {
        if (InternalPerfTestContext.isCurrentInPerfTestMode()) {
            this.lastConsumeTimeMillis4PerfTest = System.currentTimeMillis();
        } else {
            this.lastConsumeTimeMillis4Normal = System.currentTimeMillis();
        }
    }

    private long getLastConsumeTimeMillis() {
        return InternalPerfTestContext.isCurrentInPerfTestMode() ? this.lastConsumeTimeMillis4PerfTest : this.lastConsumeTimeMillis4Normal;
    }

    public boolean add(T t) {
        boolean add = determineQueue().add(t);
        if (add) {
            signal();
        }
        return add;
    }

    public boolean offer(T t) {
        boolean offer = determineQueue().offer(t);
        if (offer) {
            signal();
        }
        return offer;
    }

    public boolean offer(T t, long j, TimeUnit timeUnit) throws InterruptedException {
        boolean offer = determineQueue().offer(t, j, timeUnit);
        if (offer) {
            signal();
        }
        return offer;
    }

    public void put(T t) throws InterruptedException {
        determineQueue().put(t);
        signal();
    }

    public int size() {
        return determineQueue().size();
    }

    public boolean isEmpty() {
        return determineQueue().isEmpty();
    }

    public boolean contains(Object obj) {
        return determineQueue().contains(obj);
    }

    public Iterator<T> iterator() {
        return determineQueue().iterator();
    }

    public Object[] toArray() {
        return determineQueue().toArray();
    }

    public String toString() {
        return determineQueue().toString();
    }
}
