/*
 * Decompiled with CFR 0.152.
 */
package com.taobao.api.internal.stream;

import com.taobao.api.internal.stream.StreamImplementation;
import com.taobao.api.internal.stream.connect.HttpResponse;
import com.taobao.api.internal.stream.message.StreamMsgConsumeFactory;
import com.taobao.api.internal.stream.message.TopCometMessageListener;
import java.io.IOException;
import java.util.concurrent.RejectedExecutionException;
import org.apache.log4j.Logger;

public abstract class AbstractStreamImpl
implements StreamImplementation {
    private static final Logger log = Logger.getLogger(AbstractStreamImpl.class);
    protected HttpResponse response;
    private StreamMsgConsumeFactory msgConsumeFactory;
    protected boolean streamAlive = true;

    public AbstractStreamImpl(StreamMsgConsumeFactory msgConsumeFactory, HttpResponse response) {
        this.msgConsumeFactory = msgConsumeFactory;
        this.response = response;
    }

    public void nextMsg() throws IOException {
        if (!this.streamAlive) {
            throw new IOException("Stream closed");
        }
        try {
            String line = this.response.getMsg();
            if (line == null) {
                this.streamAlive = false;
                this.response.close();
                return;
            }
            this.msgConsumeFactory.consume(new StreamEvent(line));
        }
        catch (IOException e) {
            this.response.close();
            this.streamAlive = false;
            throw e;
        }
        catch (RejectedExecutionException rejectException) {
            log.error((Object)"Message consume thread pool is full:", (Throwable)rejectException);
        }
        catch (NullPointerException npe) {
            log.error((Object)"Null point exception:", (Throwable)npe);
        }
    }

    public boolean isAlive() {
        return this.streamAlive;
    }

    public abstract TopCometMessageListener getMessageListener();

    class StreamEvent
    implements Runnable {
        String msg;

        StreamEvent(String msg) {
            this.msg = msg;
        }

        public void run() {
            String line = null;
            try {
                line = AbstractStreamImpl.this.parseLine(this.msg);
            }
            catch (Exception e) {
                log.error((Object)("parse error line:" + this.msg), (Throwable)e);
            }
            if (line != null) {
                AbstractStreamImpl.this.getMessageListener().onReceiveMsg(line);
            }
        }
    }
}

