/*
 * Decompiled with CFR 0.152.
 */
package cn.com.duibaboot.ext.autoconfigure.batch;

import cn.com.duiba.wolf.threadpool.NamedThreadFactory;
import cn.com.duibaboot.ext.autoconfigure.batch.BatchHandler;
import cn.com.duibaboot.ext.autoconfigure.batch.BatchProperties;
import cn.com.duibaboot.ext.autoconfigure.batch.ReqBucket;
import cn.com.duibaboot.ext.autoconfigure.batch.ReqContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.event.EventListener;

public class ReqPacker {
    private static final Logger logger = LoggerFactory.getLogger(ReqPacker.class);
    private ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new NamedThreadFactory("merge-request-scheduled"));
    private ThreadPoolExecutor processPool;
    @Resource
    private ReqBucket reqBucket;
    @Resource
    private BatchHandler handler;
    @Resource
    private BatchProperties properties;

    @EventListener(value={EnvironmentChangeEvent.class})
    public void refresh() {
        this.processPool.setCorePoolSize(this.properties.getPoolsize());
        this.processPool.setMaximumPoolSize(this.processPool.getPoolSize());
        logger.warn("merge-request-thread-pool refresh poolSize:{}", (Object)this.properties.getPoolsize());
    }

    @PostConstruct
    public void init() {
        this.processPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(this.properties.getPoolsize(), (ThreadFactory)new NamedThreadFactory("merge-request"));
        this.scheduled.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    Map<String, List<ReqContext>> reqs = ReqPacker.this.reqBucket.poll(ReqPacker.this.properties.getBatchsize());
                    if (!reqs.isEmpty()) {
                        ReqPacker.this.process(reqs);
                    }
                }
                catch (Exception e) {
                    logger.error("ReqBucket.poll()", (Throwable)e);
                }
            }
        }, 1L, 1L, TimeUnit.MILLISECONDS);
        logger.warn("merge-request-scheduled started poolSize:{} batchSize:{}", (Object)this.properties.getPoolsize(), (Object)this.properties.getBatchsize());
    }

    private void process(Map<String, List<ReqContext>> batch) {
        for (Map.Entry<String, List<ReqContext>> entry : batch.entrySet()) {
            try {
                List<ReqContext> reqs = entry.getValue();
                if (reqs.isEmpty()) continue;
                this.doBatch(entry.getKey(), reqs);
            }
            catch (RejectedExecutionException rejected) {
                this.doNotify(entry.getValue(), rejected);
            }
        }
    }

    private void doBatch(final String bucketId, final List<ReqContext> reqs) {
        this.processPool.submit(new Runnable(){

            @Override
            public void run() {
                Exception exception = null;
                try {
                    ReqPacker.this.handler.doBatch(bucketId, reqs);
                }
                catch (Exception e) {
                    exception = e;
                    logger.error(bucketId, (Throwable)e);
                }
                finally {
                    ReqPacker.this.doNotify(reqs, exception);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doNotify(List<ReqContext> reqs, Exception exception) {
        for (ReqContext req : reqs) {
            if (exception != null && req.getVolatileResult() == null) {
                req.setResult(exception);
            }
            if (req.getAnnotation().oneway()) continue;
            ReqContext reqContext = req;
            synchronized (reqContext) {
                req.setState(2);
                req.notify();
            }
        }
    }
}

