package cn.com.duiba.tuia.ssp.center.api.util.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/com/duiba/tuia/ssp/center/api/util/concurrent/EnhanceCompletionService.class */
public class EnhanceCompletionService<V, S> {
    private static final Logger logger = LoggerFactory.getLogger(EnhanceCompletionService.class);
    private ExecutorCompletionService<V> ecs;
    private AtomicBoolean isAllTaskSubmitted = new AtomicBoolean(false);
    private ReentrantLock lock = new ReentrantLock();
    private Condition consumerCondition = this.lock.newCondition();
    private AtomicInteger dealingTaskCount = new AtomicInteger(0);
    private AbstractResultConsumer<V, S> resultConsumer;

    /* loaded from: input_file:cn/com/duiba/tuia/ssp/center/api/util/concurrent/EnhanceCompletionService$CompletionQueueConsumerTask.class */
    class CompletionQueueConsumerTask extends Thread {
        private AbstractResultConsumer<V, S> resultHandler;

        CompletionQueueConsumerTask(AbstractResultConsumer<V, S> abstractResultConsumer) {
            this.resultHandler = abstractResultConsumer;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                if (EnhanceCompletionService.this.isAllTaskSubmitted.get() && EnhanceCompletionService.this.dealingTaskCount.get() <= 0) {
                    return;
                }
                EnhanceCompletionService.this.lock.lock();
                try {
                    if (!EnhanceCompletionService.this.isAllTaskSubmitted.get() && EnhanceCompletionService.this.dealingTaskCount.get() <= 0) {
                        EnhanceCompletionService.this.consumerCondition.await();
                    }
                } catch (InterruptedException e) {
                    EnhanceCompletionService.logger.warn("CompletionQueueConsumerTask增强服务结果消费线程在消费等待时发生InterruptedException", e);
                } finally {
                    EnhanceCompletionService.this.lock.unlock();
                }
                int i = EnhanceCompletionService.this.dealingTaskCount.get();
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        try {
                            this.resultHandler.consume(EnhanceCompletionService.this.ecs.take().get());
                            EnhanceCompletionService.this.dealingTaskCount.decrementAndGet();
                        } catch (Throwable th) {
                            EnhanceCompletionService.logger.error(th.getMessage(), th);
                            EnhanceCompletionService.this.dealingTaskCount.decrementAndGet();
                        }
                    } catch (Throwable th2) {
                        EnhanceCompletionService.this.dealingTaskCount.decrementAndGet();
                        throw th2;
                    }
                }
            }
        }
    }

    public EnhanceCompletionService(Executor executor, AbstractResultConsumer<V, S> abstractResultConsumer) {
        if (executor instanceof ThreadPoolExecutor) {
            ((ThreadPoolExecutor) executor).setRejectedExecutionHandler((runnable, threadPoolExecutor) -> {
                try {
                    threadPoolExecutor.getQueue().put(runnable);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            });
        }
        this.ecs = new ExecutorCompletionService<>(executor, new LinkedBlockingDeque());
        this.resultConsumer = abstractResultConsumer;
    }

    public void submit(Callable<V> callable) {
        this.ecs.submit(callable);
        this.dealingTaskCount.incrementAndGet();
        notifyFutureConsumerThread();
    }

    public synchronized S execute(AbstractTaskProvider abstractTaskProvider) {
        CompletionQueueConsumerTask completionQueueConsumerTask = new CompletionQueueConsumerTask(this.resultConsumer);
        completionQueueConsumerTask.start();
        try {
            try {
                abstractTaskProvider.offerTasks(this);
                this.isAllTaskSubmitted.compareAndSet(false, true);
                notifyFutureConsumerThread();
                try {
                    completionQueueConsumerTask.join();
                } catch (InterruptedException e) {
                    logger.warn("-EnhanceCompletionService执行任务异常", e);
                }
            } catch (Throwable th) {
                logger.error("-EnhanceCompletionService执行任务异常", th);
                this.isAllTaskSubmitted.compareAndSet(false, true);
                notifyFutureConsumerThread();
                try {
                    completionQueueConsumerTask.join();
                } catch (InterruptedException e2) {
                    logger.warn("-EnhanceCompletionService执行任务异常", e2);
                }
            }
            return this.resultConsumer.getResult();
        } catch (Throwable th2) {
            this.isAllTaskSubmitted.compareAndSet(false, true);
            notifyFutureConsumerThread();
            try {
                completionQueueConsumerTask.join();
            } catch (InterruptedException e3) {
                logger.warn("-EnhanceCompletionService执行任务异常", e3);
            }
            throw th2;
        }
    }

    private void notifyFutureConsumerThread() {
        this.lock.lock();
        try {
            this.consumerCondition.signal();
        } finally {
            this.lock.unlock();
        }
    }
}
