package cn.com.duibaboot.ext.stream.support;

import cn.com.duibaboot.ext.stream.binding.BindingService;
import cn.com.duibaboot.ext.stream.channel.ChannelKey;
import cn.com.duibaboot.ext.stream.channel.ChannelType;
import cn.com.duibaboot.ext.stream.config.BindingProperties;
import cn.com.duibaboot.ext.stream.config.BindingServiceProperties;
import com.google.common.collect.Maps;
import java.util.LinkedHashMap;
import java.util.Objects;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:cn/com/duibaboot/ext/stream/support/StreamTemplate.class */
public class StreamTemplate {
    private static final Logger log = LoggerFactory.getLogger(StreamTemplate.class);

    @Resource
    private CompositeMessageConverter compositeMessageConverter;

    @Resource
    private BindingService bindingService;

    @Resource
    private BindingServiceProperties bindingServiceProperties;

    public void send(String str, Object obj) {
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        if (obj instanceof StreamMesseage) {
            String str2 = ((StreamMesseage) obj).topic();
            Assert.isTrue(!StringUtils.isEmpty(str2), "StreamMesseage 的 topic不能为空");
            newLinkedHashMap.put(StreamMessageHeaders.TOPIC, str2);
        } else {
            BindingProperties bindingProperties = this.bindingServiceProperties.getBindings().get(str);
            Assert.notNull(bindingProperties, "未配置[" + str + "]对应的通道");
            String defaultTopic = bindingProperties.getProducer().getDefaultTopic();
            Assert.isTrue(!StringUtils.isEmpty(defaultTopic), "未能解析出消息中的topic,请确保事件对象实现StreamMesseage接口或者给binding设置默认topic");
            newLinkedHashMap.put(StreamMessageHeaders.TOPIC, defaultTopic);
        }
        Message message = this.compositeMessageConverter.toMessage(obj, new MessageHeaders(newLinkedHashMap));
        if (Objects.isNull(message)) {
            log.warn("Stream事件[" + obj.getClass().getName() + "]未能正常发布,请提供对应的MessageConverter转化器");
        } else {
            this.bindingService.getBinding(new ChannelKey(str, ChannelType.OUTPUT)).getMessageChannel().send(message);
        }
    }
}
