package cn.com.duiba.nezha.compute.biz.app.streaming;

import cn.com.duiba.nezha.compute.api.enums.LogTopicEnum;
import cn.com.duiba.nezha.compute.biz.conf.KafkaConf;
import cn.com.duiba.nezha.compute.biz.support.Topic;
import cn.com.duiba.nezha.compute.common.params.Params;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;

/* compiled from: DirectKafkaTuiaAdvertLog2.scala */
/* loaded from: input_file:cn/com/duiba/nezha/compute/biz/app/streaming/DirectKafkaTuiaAdvertLog2$.class */
public final class DirectKafkaTuiaAdvertLog2$ {
    public static final DirectKafkaTuiaAdvertLog2$ MODULE$ = null;

    static {
        new DirectKafkaTuiaAdvertLog2$();
    }

    public void run(Params.AdvertLogParams advertLogParams) {
        String str = KafkaConf.brokers;
        LogTopicEnum topic = Topic.getTopic(advertLogParams.topic());
        if (topic == null) {
            Predef$.MODULE$.println("get topic null,app break...");
            return;
        }
        Set set = Predef$.MODULE$.refArrayOps(topic.getTopic().split(",")).toSet();
        String stringBuilder = new StringBuilder().append("DirectKafkaTuiaAdvertLog_Topic_").append(topic.getTopic()).toString();
        Map apply = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("metadata.broker.list"), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("group.id"), new StringBuilder().append("DirectKafkaTuiaAdvertLog_Topic_").append(topic.getTopic()).toString())}));
        Predef$.MODULE$.println(new StringBuilder().append("init spark context ...,appName= ").append(stringBuilder).toString());
        SparkConf master = new SparkConf().setAppName(stringBuilder).setMaster("local[2]");
        if (!advertLogParams.localRun()) {
            master = new SparkConf().setAppName(stringBuilder);
        }
        StreamingContext streamingContext = new StreamingContext(master, Seconds$.MODULE$.apply(advertLogParams.interval()));
        KafkaUtils$.MODULE$.createDirectStream(streamingContext, apply, set, ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(String.class), ClassTag$.MODULE$.apply(StringDecoder.class), ClassTag$.MODULE$.apply(StringDecoder.class)).map(new DirectKafkaTuiaAdvertLog2$$anonfun$1(), ClassTag$.MODULE$.apply(String.class)).filter(new DirectKafkaTuiaAdvertLog2$$anonfun$2()).foreachRDD(new DirectKafkaTuiaAdvertLog2$$anonfun$run$1(advertLogParams, topic));
        streamingContext.start();
        streamingContext.awaitTermination();
    }

    private DirectKafkaTuiaAdvertLog2$() {
        MODULE$ = this;
    }
}
