/*
 * Decompiled with CFR 0.152.
 */
package com.dangdang.ddframe.job.internal.sharding;

import com.dangdang.ddframe.job.api.config.JobConfiguration;
import com.dangdang.ddframe.job.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.internal.election.LeaderElectionService;
import com.dangdang.ddframe.job.internal.env.LocalHostService;
import com.dangdang.ddframe.job.internal.execution.ExecutionService;
import com.dangdang.ddframe.job.internal.reg.BlockUtils;
import com.dangdang.ddframe.job.internal.reg.ItemUtils;
import com.dangdang.ddframe.job.internal.server.ServerService;
import com.dangdang.ddframe.job.internal.sharding.ShardingNode;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategy;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyFactory;
import com.dangdang.ddframe.job.internal.sharding.strategy.JobShardingStrategyOption;
import com.dangdang.ddframe.job.internal.storage.JobNodePath;
import com.dangdang.ddframe.job.internal.storage.JobNodeStorage;
import com.dangdang.ddframe.job.internal.storage.TransactionExecutionCallback;
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;
import java.beans.ConstructorProperties;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardingService {
    private static final Logger log = LoggerFactory.getLogger(ShardingService.class);
    private final String jobName;
    private final JobNodeStorage jobNodeStorage;
    private final LocalHostService localHostService = new LocalHostService();
    private final LeaderElectionService leaderElectionService;
    private final ConfigurationService configService;
    private final ServerService serverService;
    private final ExecutionService executionService;
    private final JobNodePath jobNodePath;

    public ShardingService(CoordinatorRegistryCenter coordinatorRegistryCenter, JobConfiguration jobConfiguration) {
        this.jobName = jobConfiguration.getJobName();
        this.jobNodeStorage = new JobNodeStorage(coordinatorRegistryCenter, jobConfiguration);
        this.leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
        this.configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
        this.serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
        this.executionService = new ExecutionService(coordinatorRegistryCenter, jobConfiguration);
        this.jobNodePath = new JobNodePath(jobConfiguration.getJobName());
    }

    public void setReshardingFlag() {
        this.jobNodeStorage.createJobNodeIfNeeded("leader/sharding/necessary");
    }

    public boolean isNeedSharding() {
        return this.jobNodeStorage.isJobNodeExisted("leader/sharding/necessary");
    }

    public void shardingIfNecessary() {
        if (!this.isNeedSharding()) {
            return;
        }
        if (!this.leaderElectionService.isLeader().booleanValue()) {
            this.blockUntilShardingCompleted();
            return;
        }
        if (this.configService.isMonitorExecution()) {
            this.waitingOtherJobCompleted();
        }
        log.debug("Elastic job: sharding begin.");
        this.jobNodeStorage.fillEphemeralJobNode("leader/sharding/processing", "");
        this.clearShardingInfo();
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(this.configService.getJobShardingStrategyClass());
        JobShardingStrategyOption option = new JobShardingStrategyOption(this.jobName, this.configService.getShardingTotalCount(), this.configService.getShardingItemParameters());
        this.jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(this.serverService.getAvailableServers(), option)));
        log.debug("Elastic job: sharding completed.");
    }

    private void blockUntilShardingCompleted() {
        while (!this.leaderElectionService.isLeader().booleanValue() && (this.jobNodeStorage.isJobNodeExisted("leader/sharding/necessary") || this.jobNodeStorage.isJobNodeExisted("leader/sharding/processing"))) {
            log.debug("Elastic job: sleep short time until sharding completed.");
            BlockUtils.waitingShortTime();
        }
    }

    private void waitingOtherJobCompleted() {
        while (this.executionService.hasRunningItems()) {
            log.debug("Elastic job: sleep short time until other job completed.");
            BlockUtils.waitingShortTime();
        }
    }

    private void clearShardingInfo() {
        for (String each : this.serverService.getAllServers()) {
            this.jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getShardingNode(each));
        }
    }

    public List<Integer> getLocalHostShardingItems() {
        String ip = this.localHostService.getIp();
        if (!this.jobNodeStorage.isJobNodeExisted(ShardingNode.getShardingNode(ip))) {
            return Collections.emptyList();
        }
        return ItemUtils.toItemList(this.jobNodeStorage.getJobNodeDataDirectly(ShardingNode.getShardingNode(ip)));
    }

    class PersistShardingInfoTransactionExecutionCallback
    implements TransactionExecutionCallback {
        private final Map<String, List<Integer>> shardingItems;

        @Override
        public void execute(CuratorTransactionFinal curatorTransactionFinal) throws Exception {
            for (Map.Entry<String, List<Integer>> entry : this.shardingItems.entrySet()) {
                ((CuratorTransactionBridge)curatorTransactionFinal.create().forPath(ShardingService.this.jobNodePath.getFullPath(ShardingNode.getShardingNode(entry.getKey())), ItemUtils.toItemsString(entry.getValue()).getBytes())).and();
            }
            ((CuratorTransactionBridge)curatorTransactionFinal.delete().forPath(ShardingService.this.jobNodePath.getFullPath("leader/sharding/necessary"))).and();
            ((CuratorTransactionBridge)curatorTransactionFinal.delete().forPath(ShardingService.this.jobNodePath.getFullPath("leader/sharding/processing"))).and();
        }

        @ConstructorProperties(value={"shardingItems"})
        public PersistShardingInfoTransactionExecutionCallback(Map<String, List<Integer>> shardingItems) {
            this.shardingItems = shardingItems;
        }
    }
}

