package cn.com.duiba.boot.concurrent;

import cn.com.duiba.boot.perftest.InternalPerfTestContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

@ThreadSafe
/* loaded from: input_file:BOOT-INF/lib/spring-boot-ext-api-2.0.0-g7.jar:cn/com/duiba/boot/concurrent/BatchConsumeBlockingQueue.class */
public class BatchConsumeBlockingQueue<T> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) BatchConsumeBlockingQueue.class);
    private LinkedBlockingQueue<T> queue4Normal;
    private LinkedBlockingQueue<T> queue4PerfTest;
    private final int batchSize;
    private final int maxWaitTimeMillis;
    private final Consumer<List<T>> asyncBatchConsumer;
    private Thread queueThread;
    private volatile long lastConsumeTimeMillis4Normal = 0;
    private volatile long lastConsumeTimeMillis4PerfTest = 0;
    private volatile boolean isStarted = false;

    public BatchConsumeBlockingQueue(int i, int i2, int i3, Consumer<List<T>> consumer) {
        Assert.notNull(consumer, "batchConsumer must not be null");
        if (i <= 0) {
            this.queue4Normal = new LinkedBlockingQueue<>();
            this.queue4PerfTest = new LinkedBlockingQueue<>();
        } else {
            this.queue4Normal = new LinkedBlockingQueue<>(i);
            this.queue4PerfTest = new LinkedBlockingQueue<>(i);
        }
        this.batchSize = i2;
        this.maxWaitTimeMillis = i3;
        this.asyncBatchConsumer = consumer;
        if (i3 > 0) {
            initQueueThread();
        }
    }

    private void initQueueThread() {
        this.queueThread = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(this.maxWaitTimeMillis);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                if (Thread.currentThread().isInterrupted()) {
                    return;
                } else {
                    consumeQueueWhenTimeout();
                }
            }
        });
    }

    private void consumeQueueWhenTimeout() {
        try {
            BlockingQueue<T> determineQueue = determineQueue();
            if (System.currentTimeMillis() - getLastConsumeTimeMillis() >= this.maxWaitTimeMillis && !determineQueue.isEmpty()) {
                tryBatchConsumeDataWhenTimeout(determineQueue);
            }
            InternalPerfTestContext.markAsPerfTest(null, false);
            try {
                BlockingQueue<T> determineQueue2 = determineQueue();
                if (System.currentTimeMillis() - getLastConsumeTimeMillis() >= this.maxWaitTimeMillis && !determineQueue2.isEmpty()) {
                    tryBatchConsumeDataWhenTimeout(determineQueue2);
                }
                InternalPerfTestContext.markAsNormal();
            } catch (Throwable th) {
                InternalPerfTestContext.markAsNormal();
                throw th;
            }
        } catch (Throwable th2) {
            logger.error("触发消费时发生异常", th2);
        }
    }

    public synchronized void start() {
        if (this.isStarted) {
            return;
        }
        this.isStarted = true;
        if (this.maxWaitTimeMillis > 0) {
            this.queueThread.start();
        }
    }

    public synchronized void stop() {
        if (this.isStarted) {
            this.isStarted = false;
            if (this.maxWaitTimeMillis > 0) {
                this.queueThread.interrupt();
            }
            LinkedBlockingQueue<T> linkedBlockingQueue = this.queue4Normal;
            while (!linkedBlockingQueue.isEmpty()) {
                tryBatchConsumeDataWhenTimeout(linkedBlockingQueue);
            }
        }
    }

    private BlockingQueue<T> determineQueue() {
        if (this.isStarted) {
            return InternalPerfTestContext.isCurrentInPerfTestMode() ? this.queue4PerfTest : this.queue4Normal;
        }
        throw new IllegalStateException("not started yet!");
    }

    private void setLastConsumeTimeMillis() {
        if (InternalPerfTestContext.isCurrentInPerfTestMode()) {
            this.lastConsumeTimeMillis4PerfTest = System.currentTimeMillis();
        } else {
            this.lastConsumeTimeMillis4Normal = System.currentTimeMillis();
        }
    }

    private long getLastConsumeTimeMillis() {
        return InternalPerfTestContext.isCurrentInPerfTestMode() ? this.lastConsumeTimeMillis4PerfTest : this.lastConsumeTimeMillis4Normal;
    }

    private void tryBatchConsumeDataWhenExceedBatchSize(BlockingQueue<T> blockingQueue) {
        while (blockingQueue.size() >= this.batchSize) {
            ArrayList arrayList = new ArrayList();
            synchronized (this) {
                if (blockingQueue.size() >= this.batchSize) {
                    blockingQueue.drainTo(arrayList, this.batchSize);
                }
                setLastConsumeTimeMillis();
            }
            this.asyncBatchConsumer.accept(arrayList);
        }
    }

    private void tryBatchConsumeDataWhenTimeout(BlockingQueue<T> blockingQueue) {
        if (blockingQueue.size() >= 0) {
            ArrayList arrayList = new ArrayList();
            synchronized (this) {
                if (blockingQueue.size() >= 0) {
                    blockingQueue.drainTo(arrayList, this.batchSize);
                }
            }
            this.asyncBatchConsumer.accept(arrayList);
        }
    }

    public boolean add(T t) {
        BlockingQueue<T> determineQueue = determineQueue();
        boolean add = determineQueue.add(t);
        if (add) {
            tryBatchConsumeDataWhenExceedBatchSize(determineQueue);
        }
        return add;
    }

    public boolean offer(T t) {
        BlockingQueue<T> determineQueue = determineQueue();
        boolean offer = determineQueue.offer(t);
        if (offer) {
            tryBatchConsumeDataWhenExceedBatchSize(determineQueue);
        }
        return offer;
    }

    public void put(T t) throws InterruptedException {
        BlockingQueue<T> determineQueue = determineQueue();
        determineQueue.put(t);
        tryBatchConsumeDataWhenExceedBatchSize(determineQueue);
    }

    public boolean offer(T t, long j, TimeUnit timeUnit) throws InterruptedException {
        BlockingQueue<T> determineQueue = determineQueue();
        boolean offer = determineQueue.offer(t, j, timeUnit);
        if (offer) {
            tryBatchConsumeDataWhenExceedBatchSize(determineQueue);
        }
        return offer;
    }

    public int remainingCapacity() {
        return determineQueue().remainingCapacity();
    }

    public boolean addAll(Collection<? extends T> collection) {
        if (collection == null) {
            throw new NullPointerException();
        }
        if (collection == this) {
            throw new IllegalArgumentException();
        }
        boolean z = false;
        Iterator<? extends T> it = collection.iterator();
        while (it.hasNext()) {
            if (add(it.next())) {
                z = true;
            }
        }
        return z;
    }

    public int size() {
        return determineQueue().size();
    }

    public boolean isEmpty() {
        return determineQueue().isEmpty();
    }

    public boolean contains(Object obj) {
        return determineQueue().contains(obj);
    }

    public Iterator<T> iterator() {
        return determineQueue().iterator();
    }

    public Object[] toArray() {
        return determineQueue().toArray();
    }

    public T[] toArray(T[] tArr) {
        return (T[]) determineQueue().toArray(tArr);
    }

    public String toString() {
        return determineQueue().toString();
    }
}
