package com.aliyun.openservices.loghub.stormspout;

import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ClientFetcher;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubShardListener;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;

/* loaded from: input_file:com/aliyun/openservices/loghub/stormspout/LogHubSpout.class */
public class LogHubSpout implements IRichSpout, Serializable {
    private static final long serialVersionUID = 4842990237818907231L;
    public static final String LOGHUB_INSTANCE_NAME_PREFIX = "LogHubSpout-";
    public String mSpoutInstanceName;
    public static final String FIELD_SHARD_ID = "shardId";
    public static final String FIELD_LOGGROUPS = "logGroups";
    private final LogHubSpoutConfig mConfig;
    private ClientFetcher mClientFetcher;
    private ShardStateManager mStateManager;
    private transient SpoutOutputCollector mCollector;
    private transient TopologyContext mContext;
    private transient long mLastCommitTime;
    private final long mEmptyDataSleepIntervalInMillis = 500;
    private final long mDefualtHeartbeatIntervalMillis = 30000;
    private final String streamName;
    private static final Logger logger = Logger.getLogger(LogHubSpout.class);

    /* loaded from: input_file:com/aliyun/openservices/loghub/stormspout/LogHubSpout$ShardListener.class */
    private class ShardListener implements ILogHubShardListener {
        private final long mWaitingForCheckPointToShutdownInMills = 500;

        private ShardListener() {
            this.mWaitingForCheckPointToShutdownInMills = 500L;
        }

        public void ShardAdded(int i) {
            LogHubSpout.this.mStateManager.makeLocalState(String.valueOf(i));
        }

        public void ShardDeleted(int i) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
            LogHubSpout.this.mStateManager.commitAndCleanLocalState(String.valueOf(i));
        }
    }

    public LogHubSpout(LogHubSpoutConfig logHubSpoutConfig) {
        this.mConfig = logHubSpoutConfig;
        this.streamName = this.mConfig.getStormOutputStreamName();
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.mCollector = spoutOutputCollector;
        this.mContext = topologyContext;
    }

    public void close() {
    }

    public void activate() {
        this.mSpoutInstanceName = LOGHUB_INSTANCE_NAME_PREFIX + this.mContext.getThisTaskIndex();
        try {
            this.mClientFetcher = new ClientFetcher(this.mConfig.getCursorPosition() == LogHubCursorPosition.SPECIAL_TIMER_CURSOR ? new LogHubConfig(this.mConfig.getGroupName(), this.mSpoutInstanceName, this.mConfig.getEndpoint(), this.mConfig.getProject(), this.mConfig.getStreamName(), this.mConfig.getAccessId(), this.mConfig.getAccessKey(), this.mConfig.getStartTimeStampSec(), 30000L, true) : new LogHubConfig(this.mConfig.getGroupName(), this.mSpoutInstanceName, this.mConfig.getEndpoint(), this.mConfig.getProject(), this.mConfig.getStreamName(), this.mConfig.getAccessId(), this.mConfig.getAccessKey(), this.mConfig.getCursorPosition(), 30000L, true));
            this.mClientFetcher.registerShardListener(new ShardListener());
            this.mStateManager = new ShardStateManager(this.mConfig, this.mClientFetcher);
            this.mClientFetcher.start();
            this.mLastCommitTime = System.currentTimeMillis();
            logger.info("spout is activated with instance name" + this.mSpoutInstanceName);
        } catch (LogHubClientWorkerException e) {
            logger.error("failed to start the client to fetch data", e);
        }
    }

    public void deactivate() {
        this.mClientFetcher.shutdown();
        this.mStateManager.deactivate();
        logger.info("spout is deactivated with instance name" + this.mSpoutInstanceName);
    }

    public void nextTuple() {
        boolean z = false;
        FetchedLogGroup messageToRetry = this.mStateManager.messageToRetry();
        if (messageToRetry != null) {
            z = true;
        } else {
            messageToRetry = this.mClientFetcher.nextNoBlock();
        }
        if (messageToRetry != null) {
            String valueOf = String.valueOf(messageToRetry.mShardId);
            String str = messageToRetry.mEndCursor;
            ArrayList arrayList = new ArrayList(messageToRetry.mFetchedData);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(valueOf);
            arrayList2.add(arrayList);
            this.mStateManager.emit(valueOf, arrayList, str, z);
            this.mCollector.emit(this.streamName, arrayList2, MessageIdUtil.constructMessageId(valueOf, str));
            if (z) {
                logger.warn("Emitted retry data: shardId: " + valueOf + " log groups' size: " + arrayList.size() + " cursor: " + str);
            } else if (logger.isDebugEnabled()) {
                logger.debug("Emitted: shardId: " + valueOf + " log groups' size: " + arrayList.size() + " cursor: " + str + " isRetry: " + z);
            }
        } else {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                logger.info("no-data sleep is interrupted inside spout!");
            }
        }
        if (System.currentTimeMillis() - this.mLastCommitTime >= this.mConfig.getCommitCheckpointIntervalMillis()) {
            logger.info("save checkpoint");
            this.mStateManager.commitShardStates();
            this.mLastCommitTime = System.currentTimeMillis();
        }
    }

    public void ack(Object obj) {
        String cursorOfMessageId = MessageIdUtil.cursorOfMessageId((String) obj);
        String shardIdOfMessageId = MessageIdUtil.shardIdOfMessageId((String) obj);
        this.mStateManager.ack(shardIdOfMessageId, cursorOfMessageId);
        if (logger.isDebugEnabled()) {
            logger.debug("acked for message: " + cursorOfMessageId + " : " + shardIdOfMessageId);
        }
    }

    public void fail(Object obj) {
        this.mStateManager.fail(MessageIdUtil.shardIdOfMessageId((String) obj), MessageIdUtil.cursorOfMessageId((String) obj));
        if (logger.isDebugEnabled()) {
            logger.debug("failed for message: " + obj);
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    private List<LogGroupData> deepcopyLogs(List<LogGroupData> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<LogGroupData> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new LogGroupData(it.next()));
        }
        return arrayList;
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(this.streamName, new Fields(new String[]{FIELD_SHARD_ID, FIELD_LOGGROUPS}));
    }
}
