package com.aliyun.openservices.loghub.stormspout;

import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ClientFetcher;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/aliyun/openservices/loghub/stormspout/ShardStateManager.class */
public class ShardStateManager {
    private static final Logger logger = Logger.getLogger(LogHubSpout.class);
    private final LogHubSpoutConfig mConfig;
    private final Map<String, LocalShardState> mShardStates = new ConcurrentHashMap();
    private final ClientFetcher mFetcher;

    public ShardStateManager(LogHubSpoutConfig logHubSpoutConfig, ClientFetcher clientFetcher) {
        this.mConfig = logHubSpoutConfig;
        this.mFetcher = clientFetcher;
    }

    public void activate() {
    }

    public void deactivate() {
        commitShardStates();
    }

    public void makeLocalState(String str) {
        if (this.mShardStates.containsKey(str)) {
            logger.warn("Already have when trying to create local state for shard " + str);
            return;
        }
        this.mShardStates.put(str, new LocalShardState(str, this.mConfig.getRetryLimit()));
        logger.debug("Successfully make one local state for shard " + str);
    }

    public void commitAndCleanLocalState(String str) {
        if (!this.mShardStates.containsKey(str)) {
            logger.warn("Doesn't have when trying to delete local state for shard " + str);
            return;
        }
        commitShardState(str);
        this.mShardStates.remove(str);
        logger.debug("Successfully clean local state for shard " + str);
    }

    public void ack(String str, String str2) {
        LocalShardState localShardState = this.mShardStates.get(str);
        if (localShardState != null) {
            localShardState.ack(str2);
            logger.debug("acked for shardId " + str + " with cursor " + str2);
        }
    }

    public void fail(String str, String str2) {
        LocalShardState localShardState = this.mShardStates.get(str);
        if (localShardState != null) {
            localShardState.fail(str2);
            logger.debug("failed for shardId " + str + " with cursor " + str2);
        }
    }

    public void emit(String str, List<LogGroupData> list, String str2, boolean z) {
        LocalShardState safeGetShardState = safeGetShardState(str);
        if (safeGetShardState != null) {
            safeGetShardState.emit(list, str2, z);
        }
    }

    public FetchedLogGroup messageToRetry() {
        FetchedLogGroup fetchedLogGroup = null;
        Iterator<Map.Entry<String, LocalShardState>> it = this.mShardStates.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            LocalShardState value = it.next().getValue();
            if (value.shouldRetry()) {
                fetchedLogGroup = value.messageToRetry();
                break;
            }
        }
        return fetchedLogGroup;
    }

    public void commitShardStates() {
        logger.debug("Current shard list count:" + this.mShardStates.size());
        Iterator<String> it = this.mShardStates.keySet().iterator();
        while (it.hasNext()) {
            commitShardState(it.next());
        }
    }

    private void commitShardState(String str) {
        LocalShardState safeGetShardState = safeGetShardState(str);
        if (safeGetShardState == null || !safeGetShardState.isDirty()) {
            logger.debug("Local shard state for " + str + " was not dirty - not doing anything");
            return;
        }
        try {
            String latestValidCursor = safeGetShardState.getLatestValidCursor();
            if (latestValidCursor != null) {
                this.mFetcher.saveCheckPoint(Integer.valueOf(str).intValue(), latestValidCursor, true);
                safeGetShardState.commit(latestValidCursor);
                logger.debug("Advanced checkpoint for " + str + " to " + safeGetShardState.getLatestValidCursor());
            }
        } catch (LogHubCheckPointException e) {
            logger.error(" could not commit state for shardId=" + str + ". The server-side state is now out of date.", e);
        }
    }

    private LocalShardState safeGetShardState(String str) {
        LocalShardState localShardState = this.mShardStates.get(str);
        if (localShardState == null) {
            logger.warn("Shard state map inconsistent with shard assignment (could not get shardId=" + str + ").");
        }
        return localShardState;
    }
}
