/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.rocketmq.integration;

import cn.com.duibaboot.ext.stream.binder.BinderMessageHandler;
import cn.com.duibaboot.ext.stream.support.TopicOperater;
import java.util.Optional;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import org.springframework.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;

public class RocketMQMessageHandler
extends AbstractMessageHandler
implements BinderMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(RocketMQMessageHandler.class);
    private final RocketMQTemplate rocketMQTemplate;
    private final String groupName;
    private final InstrumentationManager instrumentationManager;
    private boolean sync = false;

    public RocketMQMessageHandler(RocketMQTemplate rocketMQTemplate, String groupName, InstrumentationManager instrumentationManager) {
        this.rocketMQTemplate = rocketMQTemplate;
        this.groupName = groupName;
        this.instrumentationManager = instrumentationManager;
    }

    public void start() {
        this.instrumentationManager.addHealthInstrumentation(new Instrumentation(this.groupName));
        try {
            this.rocketMQTemplate.afterPropertiesSet();
            this.instrumentationManager.getHealthInstrumentation(this.groupName).markStartedSuccessfully();
        }
        catch (Exception e) {
            this.instrumentationManager.getHealthInstrumentation(this.groupName).markStartFailed(e);
            log.error("RocketMQTemplate startup failed, Caused by " + e.getMessage());
            throw new MessagingException(MessageBuilder.withPayload((Object)("RocketMQTemplate startup failed, Caused by " + e.getMessage())).build(), (Throwable)e);
        }
    }

    public void stop() {
        this.rocketMQTemplate.destroy();
    }

    protected void handleMessageInternal(Message<?> message) {
        try {
            final StringBuilder topicWithTags = new StringBuilder(TopicOperater.findTopic(message));
            String tags = Optional.ofNullable(message.getHeaders().get((Object)"TAGS")).orElse("").toString();
            if (!StringUtils.isEmpty((Object)tags)) {
                topicWithTags.append(":").append(tags);
            }
            SendResult sendRes = null;
            int delayLevel = 0;
            try {
                Object delayLevelObj = message.getHeaders().getOrDefault((Object)"DELAY", (Object)0);
                if (delayLevelObj instanceof Number) {
                    delayLevel = ((Number)delayLevelObj).intValue();
                } else if (delayLevelObj instanceof String) {
                    delayLevel = Integer.parseInt((String)delayLevelObj);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (this.sync) {
                sendRes = this.rocketMQTemplate.syncSend(topicWithTags.toString(), message, this.rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel);
                log.debug("sync send to topic " + topicWithTags + " " + sendRes);
            } else {
                this.rocketMQTemplate.asyncSend(topicWithTags.toString(), message, new SendCallback(){

                    public void onSuccess(SendResult sendResult) {
                        log.debug("async send to topic " + topicWithTags + " " + sendResult);
                    }

                    public void onException(Throwable e) {
                        log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
                    }
                });
            }
            if (sendRes != null && !sendRes.getSendStatus().equals((Object)SendStatus.SEND_OK)) {
                throw new MessagingException(message, (Throwable)new MQClientException("message hasn't been sent", null));
            }
        }
        catch (Exception e) {
            log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage());
            throw new MessagingException(message, (Throwable)e);
        }
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }
}

