/*
 * Decompiled with CFR 0.152.
 */
package cn.com.duiba.nezha.compute.biz.spark.ffm;

import cn.com.duiba.nezha.compute.biz.bo.ModelBo;
import cn.com.duiba.nezha.compute.biz.bo.SampleBo;
import cn.com.duiba.nezha.compute.biz.bo.SyncBo;
import cn.com.duiba.nezha.compute.biz.bo.TrainOpt;
import cn.com.duiba.nezha.compute.biz.params.PSModelParams;
import cn.com.duiba.nezha.compute.biz.spark.test.PsFMModelTest$;
import cn.com.duiba.nezha.compute.core.enums.DateStyle;
import cn.com.duiba.nezha.compute.core.util.DateUtil;
import java.util.List;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.IntRef;

public final class PsModelBasedOnHbaseMsgFFM$ {
    public static final PsModelBasedOnHbaseMsgFFM$ MODULE$;

    static {
        new PsModelBasedOnHbaseMsgFFM$();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void run(PSModelParams params) {
        Logger logger = Logger.getLogger(PsFMModelTest$.MODULE$.getClass());
        String appName = new StringBuilder().append((Object)"ps_model_").append((Object)params.psModelId()).append((Object)"_on_model_").append((Object)params.onLineModelId()).toString();
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"init spark context ...,appName= ").append((Object)appName).toString());
        SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster("local[3]");
        if (!params.isLocal()) {
            sparkConf = new SparkConf().setAppName(appName);
        }
        SparkContext sc = new SparkContext(sparkConf);
        try {
            String lastParseTime = null;
            int spaceCnt = 0;
            boolean continues = true;
            boolean trainStatus = false;
            int delayRT = 10000000;
            String currentTime = DateUtil.getCurrentTime((DateStyle)DateStyle.YYYY_MM_DD_HH_MM);
            String currentParseTime = null;
            while (continues) {
                try {
                    Thread.sleep(1000L);
                    if (spaceCnt > 1000) {
                        continues = false;
                    }
                    trainStatus = false;
                }
                catch (Exception exception) {
                    logger.error((Object)exception);
                    continue;
                }
                currentTime = DateUtil.getCurrentTime((DateStyle)DateStyle.YYYY_MM_DD_HH_MM);
                delayRT = Predef$.MODULE$.Integer2int(DateUtil.getDiffMinutes((String)currentTime, (String)(currentParseTime = lastParseTime == null ? SampleBo.getTimeMinuteAdd(currentTime, -1 * params.delay()) : SampleBo.getTimeMinuteAdd(lastParseTime, params.stepSize())), (DateStyle)DateStyle.YYYY_MM_DD_HH_MM));
                if (delayRT > params.delay() * 2) {
                    Predef$.MODULE$.println((Object)"delay too long \uff0creset train status ");
                    currentParseTime = SampleBo.getTimeMinuteAdd(currentTime, -1 * params.delay());
                    delayRT = params.delay();
                }
                if (delayRT < params.delay()) {
                    int sleepTime = package$.MODULE$.min(params.delay() - delayRT, params.delay());
                    Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"sleep=", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)sleepTime)})));
                    Thread.sleep(10000 * sleepTime);
                }
                if (delayRT >= params.delay()) {
                    trainStatus = true;
                }
                if (trainStatus) {
                    int limitPerPart;
                    Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"currentTime=", ",lastParseTime=", ",currentParseTime=", ",stepSize=", ",delayRT=", " minute"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{currentTime, lastParseTime, currentParseTime, BoxesRunTime.boxToInteger((int)params.stepSize()), BoxesRunTime.boxToInteger((int)delayRT)})));
                    spaceCnt = 0;
                    lastParseTime = currentParseTime;
                    List<String> secondInterval = SampleBo.getMinuteSecondInterval(currentParseTime, params.stepSize());
                    String[] orderIds = SampleBo.getOrderList(secondInterval, params.isDisplayAds());
                    if (orderIds == null || Predef$.MODULE$.refArrayOps((Object[])orderIds).size() <= 100) continue;
                    Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"sleep=0, orderSize =", " "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)Predef$.MODULE$.refArrayOps((Object[])orderIds).size())})));
                    IntRef partNums = IntRef.create((int)params.partNums());
                    DoubleRef sampleRatio = DoubleRef.create((double)params.sampleRatio());
                    int n = limitPerPart = params.isDisplayAds() ? 60000 : 3000;
                    if (params.dynamicPartNums()) {
                        partNums.elem = TrainOpt.getParNumsFFM(params.partNums(), Predef$.MODULE$.refArrayOps((Object[])orderIds).size(), limitPerPart);
                        sampleRatio.elem = TrainOpt.getSampleRatioFFM(partNums.elem, params.isCtr(), Predef$.MODULE$.refArrayOps((Object[])orderIds).size(), limitPerPart);
                    }
                    sc.parallelize((Seq)Predef$.MODULE$.wrapRefArray((Object[])orderIds), sc.parallelize$default$2(), ClassTag$.MODULE$.apply(String.class)).repartition(partNums.elem, (Ordering)Ordering.String$.MODULE$).foreachPartition((Function1)new Serializable(params, partNums, sampleRatio){
                        public static final long serialVersionUID = 0L;
                        private final PSModelParams params$1;
                        private final IntRef partNums$1;
                        private final DoubleRef sampleRatio$1;

                        public final void apply(Iterator<String> partitionOfRecords) {
                            ModelBo.runOnMsg(this.params$1.model(), partitionOfRecords, this.sampleRatio$1.elem, this.params$1.isReplay(), this.partNums$1.elem);
                        }
                        {
                            this.params$1 = params$1;
                            this.partNums$1 = partNums$1;
                            this.sampleRatio$1 = sampleRatio$1;
                        }
                    });
                    SyncBo.syncModel(params.model(), params.isSync());
                    continue;
                }
                ++spaceCnt;
            }
            return;
        }
        catch (Exception exception) {
            logger.error((Object)exception);
        }
    }

    private PsModelBasedOnHbaseMsgFFM$() {
        MODULE$ = this;
        Logger.getLogger((String)"org").setLevel(Level.ERROR);
    }
}

