package zipkin2.collector.kafka;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.kafka.KafkaCollector;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:zipkin2/collector/kafka/KafkaCollectorWorker.class */
public final class KafkaCollectorWorker implements Runnable {
    static final Logger LOG = LoggerFactory.getLogger(KafkaCollectorWorker.class);
    static final Callback<Void> NOOP = new Callback<Void>() { // from class: zipkin2.collector.kafka.KafkaCollectorWorker.1
        public void onSuccess(Void r2) {
        }

        public void onError(Throwable th) {
        }
    };
    final Properties properties;
    final List<String> topics;
    final Collector collector;
    final CollectorMetrics metrics;
    final AtomicReference<List<TopicPartition>> assignedPartitions = new AtomicReference<>(Collections.emptyList());
    final AtomicBoolean running = new AtomicBoolean(true);

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCollectorWorker(KafkaCollector.Builder builder) {
        this.properties = builder.properties;
        this.topics = Arrays.asList(builder.topic.split(","));
        this.collector = builder.delegate.build();
        this.metrics = builder.metrics;
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);
                Throwable th = null;
                try {
                    kafkaConsumer.subscribe(this.topics, new ConsumerRebalanceListener() { // from class: zipkin2.collector.kafka.KafkaCollectorWorker.2
                        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                            KafkaCollectorWorker.this.assignedPartitions.set(Collections.emptyList());
                        }

                        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                            KafkaCollectorWorker.this.assignedPartitions.set(Collections.unmodifiableList(new ArrayList(collection)));
                        }
                    });
                    LOG.info("Kafka consumer starting polling loop.");
                    while (this.running.get()) {
                        ConsumerRecords poll = kafkaConsumer.poll(Duration.of(1000L, ChronoUnit.MILLIS));
                        LOG.debug("Kafka polling returned batch of {} messages.", Integer.valueOf(poll.count()));
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            byte[] bArr = (byte[]) ((ConsumerRecord) it.next()).value();
                            this.metrics.incrementMessages();
                            this.metrics.incrementBytes(bArr.length);
                            if (bArr.length != 0) {
                                if (bArr.length < 2) {
                                    this.metrics.incrementMessagesDropped();
                                } else if (protobuf3(bArr) || bArr[0] > 16 || bArr[0] == 12) {
                                    this.collector.acceptSpans(bArr, NOOP);
                                } else {
                                    try {
                                        this.collector.accept(Collections.singletonList((Span) SpanBytesDecoder.THRIFT.decodeOne(bArr)), NOOP);
                                    } catch (RuntimeException e) {
                                        this.metrics.incrementMessagesDropped();
                                    }
                                }
                            }
                        }
                    }
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    LOG.info("Kafka consumer polling loop stopped. Kafka consumer closed.");
                } catch (Throwable th3) {
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                LOG.info("Kafka consumer polling loop stopped. Kafka consumer closed.");
                throw th5;
            }
        } catch (Error | RuntimeException e2) {
            LOG.warn("Unexpected error in polling loop spans", e2);
            throw e2;
        }
    }

    public void stop() {
        this.running.set(false);
    }

    static boolean protobuf3(byte[] bArr) {
        return bArr[0] == 10 && bArr[1] != 0;
    }
}
