package cn.com.duiba.nezha.compute.biz.spark.bac;

import cn.com.duiba.nezha.compute.biz.conf.KafkaConf;
import cn.com.duiba.nezha.compute.biz.params.PSModelParams;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies$;
import org.apache.spark.streaming.kafka010.KafkaUtils$;
import org.apache.spark.streaming.kafka010.LocationStrategies$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
import scala.collection.mutable.StringBuilder;

/* compiled from: PsModelStreaming.scala */
/* loaded from: input_file:cn/com/duiba/nezha/compute/biz/spark/bac/PsModelStreaming$.class */
public final class PsModelStreaming$ {
    public static final PsModelStreaming$ MODULE$ = null;

    static {
        new PsModelStreaming$();
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    public void run(PSModelParams pSModelParams, double d) {
        Logger.getLogger(getClass());
        if (pSModelParams.topic() == null) {
            Predef$.MODULE$.println("get topic null,app break...");
            return;
        }
        Set apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{pSModelParams.topic()}));
        String stringBuilder = new StringBuilder().append("ps_model_").append(pSModelParams.psModelId()).append("_on_model_").append(pSModelParams.onLineModelId()).append("_data_").append(pSModelParams.topic()).toString();
        String stringBuilder2 = new StringBuilder().append("g_").append(pSModelParams.onLineModelId()).toString();
        Predef$.MODULE$.println(new StringBuilder().append("init spark context ...,KafkaConf.brokers = ").append(KafkaConf.brokers).toString());
        Map apply2 = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), KafkaConf.brokers), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("group.id"), stringBuilder2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("key.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("value.deserializer"), StringDeserializer.class), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("auto.offset.reset"), "latest"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), Predef$.MODULE$.boolean2Boolean(false))}));
        Predef$.MODULE$.println(new StringBuilder().append("init spark context ...,appName= ").append(stringBuilder).toString());
        SparkConf master = new SparkConf().setAppName(stringBuilder).setMaster("local[2]");
        if (!pSModelParams.isLocal()) {
            master = new SparkConf().setAppName(stringBuilder);
        }
        master.set("spark.streaming.kafka.maxRatePerPartition", "1000");
        StreamingContext streamingContext = new StreamingContext(master, Seconds$.MODULE$.apply(pSModelParams.batchInterval()));
        KafkaUtils$.MODULE$.createDirectStream(streamingContext, LocationStrategies$.MODULE$.PreferConsistent(), ConsumerStrategies$.MODULE$.Subscribe(apply, apply2)).foreachRDD(new PsModelStreaming$$anonfun$run$1());
        streamingContext.start();
        streamingContext.awaitTermination();
    }

    private PsModelStreaming$() {
        MODULE$ = this;
        Logger.getLogger("org").setLevel(Level.ERROR);
    }
}
