package com.aliyun.hitsdb.client.consumer;

import com.aliyun.hitsdb.client.Config;
import com.aliyun.hitsdb.client.http.HttpClient;
import com.aliyun.hitsdb.client.queue.DataQueue;
import com.aliyun.hitsdb.client.util.guava.RateLimiter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/hitsdb/client/consumer/DefaultBatchPutConsumer.class */
public class DefaultBatchPutConsumer implements Consumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBatchPutConsumer.class);
    private DataQueue dataQueue;
    private ExecutorService threadPool;
    private ExecutorService multiFieldThreadPool;
    private ExecutorService pointsThreadPool;
    private int batchPutConsumerThreadCount;
    private int multiFieldBatchPutConsumerThreadCount;
    private int pointsBatchPutConsumerThreadCount;
    private HttpClient httpclient;
    private Config config;
    private RateLimiter rateLimiter;
    private CountDownLatch countDownLatch;

    public DefaultBatchPutConsumer(DataQueue dataQueue, HttpClient httpClient, RateLimiter rateLimiter, Config config) {
        this.dataQueue = dataQueue;
        this.httpclient = httpClient;
        this.config = config;
        this.batchPutConsumerThreadCount = config.getBatchPutConsumerThreadCount();
        this.multiFieldBatchPutConsumerThreadCount = config.getMultiFieldBatchPutConsumerThreadCount();
        this.pointsBatchPutConsumerThreadCount = Math.max(this.batchPutConsumerThreadCount, this.multiFieldBatchPutConsumerThreadCount);
        this.rateLimiter = rateLimiter;
        if (this.batchPutConsumerThreadCount > 0) {
            this.threadPool = Executors.newFixedThreadPool(this.batchPutConsumerThreadCount, new BatchPutThreadFactory("batch-put-thread"));
        }
        if (this.multiFieldBatchPutConsumerThreadCount > 0) {
            this.multiFieldThreadPool = Executors.newFixedThreadPool(this.multiFieldBatchPutConsumerThreadCount, new BatchPutThreadFactory("multi-field-batch-put-thread"));
        }
        if (this.pointsBatchPutConsumerThreadCount > 0) {
            this.pointsThreadPool = Executors.newFixedThreadPool(this.pointsBatchPutConsumerThreadCount, new BatchPutThreadFactory("points-batch-put-thread"));
        }
        this.countDownLatch = new CountDownLatch(config.getBatchPutConsumerThreadCount() + config.getMultiFieldBatchPutConsumerThreadCount() + this.pointsBatchPutConsumerThreadCount);
    }

    @Override // com.aliyun.hitsdb.client.consumer.Consumer
    public void start() {
        for (int i = 0; i < this.batchPutConsumerThreadCount; i++) {
            this.threadPool.submit(new BatchPutRunnable(this.dataQueue, this.httpclient, this.config, this.countDownLatch, this.rateLimiter));
        }
        for (int i2 = 0; i2 < this.multiFieldBatchPutConsumerThreadCount; i2++) {
            this.multiFieldThreadPool.submit(new MultiFieldBatchPutRunnable(this.dataQueue, this.httpclient, this.config, this.countDownLatch, this.rateLimiter));
        }
        for (int i3 = 0; i3 < this.pointsBatchPutConsumerThreadCount; i3++) {
            this.pointsThreadPool.submit(new PointsCollectionPutRunnable(this.dataQueue, this.httpclient, this.countDownLatch, this.config, this.rateLimiter));
        }
    }

    @Override // com.aliyun.hitsdb.client.consumer.Consumer
    public void stop() {
        stop(false);
    }

    @Override // com.aliyun.hitsdb.client.consumer.Consumer
    public void stop(boolean z) {
        if (z) {
            if (this.threadPool != null) {
                this.threadPool.shutdownNow();
            }
            if (this.multiFieldThreadPool != null) {
                this.multiFieldThreadPool.shutdownNow();
            }
            if (this.pointsThreadPool != null) {
                this.pointsThreadPool.shutdownNow();
            }
        } else {
            if (this.threadPool != null) {
                while (true) {
                    if (this.threadPool.isShutdown() && this.threadPool.isTerminated()) {
                        break;
                    } else {
                        this.threadPool.shutdownNow();
                    }
                }
            }
            if (this.multiFieldThreadPool != null) {
                while (true) {
                    if (this.multiFieldThreadPool.isShutdown() && this.multiFieldThreadPool.isTerminated()) {
                        break;
                    } else {
                        this.multiFieldThreadPool.shutdownNow();
                    }
                }
            }
            try {
                if (this.pointsThreadPool != null) {
                    while (true) {
                        if (!this.pointsThreadPool.isShutdown() || !this.pointsThreadPool.isTerminated()) {
                            this.pointsThreadPool.shutdownNow();
                        }
                    }
                    this.countDownLatch.await();
                }
                this.countDownLatch.await();
            } catch (InterruptedException e) {
                LOGGER.error("An error occurred waiting for the consumer thread to close", e);
            }
        }
        if (this.dataQueue != null) {
            this.dataQueue = null;
        }
    }
}
