package cn.com.duiba.linglong.client.job.consumer;

import cn.com.duiba.boot.event.ContextClosingEvent;
import cn.com.duiba.boot.event.MainContextRefreshedEvent;
import cn.com.duiba.boot.utils.NetUtils;
import cn.com.duiba.linglong.client.constant.JobInvokeType;
import cn.com.duiba.linglong.client.domain.dto.JobKey;
import cn.com.duiba.linglong.client.domain.params.JobInvokerParams;
import cn.com.duiba.linglong.client.job.jobs.WorkerScheduleJobManager;
import cn.com.duiba.linglong.client.remoteservice.RemoteWorkerStatusService;
import cn.com.duiba.linglong.client.service.channel.JobInvoker;
import cn.com.duibaboot.ext.autoconfigure.httpclient.ssre.CanAccessInsideNetwork;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:cn/com/duiba/linglong/client/job/consumer/WorkerJobConsumer.class */
public class WorkerJobConsumer extends AbstractExecutionThreadService {
    private static final Logger log = LoggerFactory.getLogger(WorkerJobConsumer.class);

    @Resource
    private List<JobConsumerAssert> jobConsumerAsserts;

    @Resource
    private WorkerScheduleJobManager workerScheduleJobManager;

    @Resource
    private RemoteWorkerStatusService remoteWorkerStatusService;

    @CanAccessInsideNetwork
    @Resource
    private RestTemplate linglongFetchJobRestTemplate;

    @Value("${spring.application.name}")
    private String appName;

    @Value("${server.port}")
    private int port;
    private final AtomicBoolean isLink = new AtomicBoolean(false);
    private String workerId;
    private Thread heartbeat;

    @PostConstruct
    public void init() {
        this.workerId = NetUtils.getLocalIp() + ":" + this.port;
    }

    @EventListener({MainContextRefreshedEvent.class})
    public void startConsumer() {
        startAsync();
        this.heartbeat = new Thread(() -> {
            this.awaitRunning();
            while (isRunning()) {
                try {
                    this.remoteWorkerStatusService.heartbeat(this.appName, this.workerId);
                    if (this.isLink.compareAndSet(false, true)) {
                        log.info("调度器通信建立完成");
                    }
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    log.error("心跳检查失败", e2);
                    this.isLink.set(false);
                    try {
                        Thread.sleep(5000L);
                    } catch (InterruptedException e3) {
                        return;
                    }
                }
            }
        });
        this.heartbeat.setDaemon(true);
        this.heartbeat.setName("linglong.client.heartbeat");
        this.heartbeat.setPriority(10);
        this.heartbeat.start();
    }

    @EventListener({ContextClosingEvent.class})
    public void preDestroy() {
        log.info("任务消费者关闭");
        if (isRunning()) {
            stopAsync();
            awaitTerminated();
        }
        this.heartbeat.interrupt();
        this.remoteWorkerStatusService.stopWorker(this.appName, this.workerId);
    }

    protected void startUp() {
        log.info("Worker job fetch starting.");
    }

    public void run() {
        while (isRunning()) {
            try {
                if (!canRunJob()) {
                    for (int i = 0; i < 3; i++) {
                        if (!isRunning()) {
                            return;
                        }
                        Thread.sleep(1000L);
                    }
                } else if (this.isLink.get()) {
                    JobInvokerParams jobInvokerParams = new JobInvokerParams();
                    jobInvokerParams.setAppId(this.appName);
                    JobInvoker jobInvoker = (JobInvoker) this.linglongFetchJobRestTemplate.postForObject("http://linglong/worker/fetch/requestJobInvoker", jobInvokerParams, JobInvoker.class, new Object[0]);
                    if (!Objects.isNull(jobInvoker)) {
                        try {
                            this.workerScheduleJobManager.submitScheduleJob(new JobKey(JobInvokeType.ACTION, jobInvoker.getHistoryId()), jobInvoker.getJobLevel());
                        } catch (Exception e) {
                            log.error("任务运行失败", e);
                        }
                    }
                } else {
                    Thread.sleep(3000L);
                }
            } catch (InterruptedException e2) {
            } catch (Exception e3) {
                log.error("任务消费异常", e3);
            }
        }
        if (Thread.currentThread().isInterrupted()) {
            return;
        }
        Thread.currentThread().interrupt();
    }

    private boolean canRunJob() {
        Iterator<JobConsumerAssert> it = this.jobConsumerAsserts.iterator();
        while (it.hasNext()) {
            if (!it.next().canConsumer()) {
                return false;
            }
        }
        return true;
    }
}
