package cn.com.duiba.order.center.biz.consumer.ordercreate;

import cn.com.duiba.activity.center.api.dto.manual.ManualLotteryOrderDto;
import cn.com.duiba.order.center.api.dto.OrderCreateContext;
import cn.com.duiba.order.center.api.dto.crecord.ConsumerExchangeRecordDto;
import cn.com.duiba.order.center.biz.bo.ordercreate.ObjectOrderCreateBo;
import cn.com.duiba.order.center.biz.constant.TopicConstant;
import cn.com.duiba.order.center.biz.tool.KafkaClient;
import com.alibaba.fastjson.JSONObject;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:cn/com/duiba/order/center/biz/consumer/ordercreate/HuichangAlipayCodeOrderCreateConsumer.class */
public class HuichangAlipayCodeOrderCreateConsumer extends OrderCreateContextTool implements OrderCreateConsumer {

    @Autowired
    private KafkaClient kafkaClient;

    @Autowired
    private TopicConstant topicConstant;

    @Autowired
    private ObjectOrderCreateBo objectOrderCreateBo;
    private ExecutorService executorService;
    private static Logger log = LoggerFactory.getLogger(CouponOrderCreateConsumer.class);
    private static int pool = 2;

    @Override // cn.com.duiba.order.center.biz.consumer.ordercreate.OrderCreateConsumer
    public synchronized void start() {
        if (this.executorService != null) {
            return;
        }
        this.executorService = Executors.newFixedThreadPool(pool, new ThreadFactory() { // from class: cn.com.duiba.order.center.biz.consumer.ordercreate.HuichangAlipayCodeOrderCreateConsumer.1
            private int i = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                StringBuilder append = new StringBuilder().append("Thread-phonebill-Consumer-");
                int i = this.i;
                this.i = i + 1;
                return new Thread(runnable, append.append(i).toString());
            }
        });
        for (int i = 0; i < pool; i++) {
            this.executorService.submit(new Runnable() { // from class: cn.com.duiba.order.center.biz.consumer.ordercreate.HuichangAlipayCodeOrderCreateConsumer.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Properties properties = new Properties();
                        properties.put("bootstrap.servers", HuichangAlipayCodeOrderCreateConsumer.this.kafkaClient.getBootstrapServers());
                        properties.put("group.id", "default");
                        properties.put("enable.auto.commit", "false");
                        properties.put("session.timeout.ms", "30000");
                        properties.put("auto.offset.reset", "earliest");
                        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                        kafkaConsumer.subscribe(Arrays.asList(HuichangAlipayCodeOrderCreateConsumer.this.topicConstant.getPhonebillCallback()));
                        while (!Thread.currentThread().isInterrupted()) {
                            ConsumerRecords poll = kafkaConsumer.poll(100L);
                            for (TopicPartition topicPartition : poll.partitions()) {
                                for (ConsumerRecord consumerRecord : poll.records(topicPartition)) {
                                    try {
                                        try {
                                            OrderCreateContext orderCreateContext = (OrderCreateContext) JSONObject.parseObject((String) consumerRecord.value(), OrderCreateContext.class);
                                            HuichangAlipayCodeOrderCreateConsumer.this.objectOrderCreateBo.innerManualLotteryAsynCreate(HuichangAlipayCodeOrderCreateConsumer.this.ordersSimpleService.find(orderCreateContext.getOrderId(), orderCreateContext.getConsumerId()), orderCreateContext.getParams(), HuichangAlipayCodeOrderCreateConsumer.this.remoteConsumerService.find(orderCreateContext.getConsumerId()), (ManualLotteryOrderDto) orderCreateContext.getParamsMap().get("manualLotteryOrder"), (ConsumerExchangeRecordDto) orderCreateContext.getParamsMap().get("record"));
                                            kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
                                        } catch (Throwable th) {
                                            kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
                                            throw th;
                                        }
                                    } catch (Exception e) {
                                        HuichangAlipayCodeOrderCreateConsumer.log.error("PhonebillCallback consumer error:", e);
                                        kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1)));
                                    }
                                }
                            }
                        }
                        kafkaConsumer.close();
                    } catch (Exception e2) {
                        HuichangAlipayCodeOrderCreateConsumer.log.error("error:", e2);
                    }
                }
            });
        }
    }

    @Override // cn.com.duiba.order.center.biz.consumer.ordercreate.OrderCreateConsumer
    public synchronized void stop() {
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
    }
}
