package org.unidal.cat.message.storage.internals;

import com.dianping.cat.Cat;
import com.dianping.cat.configuration.NetworkInterfaceManager;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.statistic.ServerStatisticManager;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.configuration.DataConfiguration;
import org.unidal.cat.message.storage.Block;
import org.unidal.cat.message.storage.BlockWriter;
import org.unidal.cat.message.storage.Bucket;
import org.unidal.cat.message.storage.BucketManager;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named(type = BlockWriter.class, instantiationStrategy = Named.PER_LOOKUP)
/* loaded from: input_file:WEB-INF/lib/cat-hadoop-3.0.2.jar:org/unidal/cat/message/storage/internals/DefaultBlockWriter.class */
public class DefaultBlockWriter implements BlockWriter {

    @Inject({"local"})
    private BucketManager m_bucketManager;

    @Inject
    private ServerStatisticManager m_statisticManager;
    private int m_index;
    private BlockingQueue<Block> m_queue;
    private long m_hour;
    private int m_count;
    private AtomicBoolean m_enabled;
    private CountDownLatch m_latch;

    @Override // org.unidal.helper.Threads.Task
    public String getName() {
        return getClass().getSimpleName() + " " + new SimpleDateFormat(DataConfiguration.DEFAULT_DATE_FORMAT).format(new Date(TimeUnit.HOURS.toMillis(this.m_hour))) + HelpFormatter.DEFAULT_OPT_PREFIX + this.m_index;
    }

    @Override // org.unidal.cat.message.storage.BlockWriter
    public void initialize(int i, int i2, BlockingQueue<Block> blockingQueue) {
        this.m_hour = i;
        this.m_index = i2;
        this.m_queue = blockingQueue;
        this.m_enabled = new AtomicBoolean(true);
        this.m_latch = new CountDownLatch(1);
    }

    private void processBlock(String str, Block block) {
        try {
            try {
                try {
                    Bucket bucket = this.m_bucketManager.getBucket(block.getDomain(), str, block.getHour(), true);
                    int i = this.m_count + 1;
                    this.m_count = i;
                    if (i % 1000 == 0) {
                        Transaction newTransaction = Cat.newTransaction("Block", block.getDomain());
                        try {
                            bucket.puts(block.getData(), block.getOffsets());
                        } catch (Exception e) {
                            Cat.logError(str, e);
                            newTransaction.setStatus("0");
                        }
                        newTransaction.setStatus("0");
                        newTransaction.complete();
                    } else {
                        try {
                            bucket.puts(block.getData(), block.getOffsets());
                        } catch (Exception e2) {
                            Cat.logError(str, e2);
                        }
                    }
                    block.clear();
                } catch (Error e3) {
                    Cat.logError(str, e3);
                    block.clear();
                }
            } catch (Exception e4) {
                Cat.logError(str, e4);
                block.clear();
            }
        } catch (Throwable th) {
            block.clear();
            throw th;
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        String localHostAddress = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
        while (true) {
            try {
                if (!this.m_enabled.get() && this.m_queue.isEmpty()) {
                    break;
                }
                Block poll = this.m_queue.poll(5L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    long currentTimeMillis = System.currentTimeMillis();
                    processBlock(localHostAddress, poll);
                    this.m_statisticManager.addBlockTime(System.currentTimeMillis() - currentTimeMillis);
                }
            } catch (InterruptedException e) {
            }
        }
        this.m_latch.countDown();
    }

    @Override // org.unidal.helper.Threads.Task
    public void shutdown() {
        this.m_enabled.set(false);
        try {
            this.m_latch.await();
        } catch (InterruptedException e) {
        }
        while (true) {
            Block poll = this.m_queue.poll();
            if (poll == null) {
                return;
            } else {
                processBlock(NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), poll);
            }
        }
    }
}
