/*
 * Decompiled with CFR 0.152.
 */
package com.huawei.openstack4j.openstack.message.queue.internal;

import com.huawei.openstack4j.common.RestService;
import com.huawei.openstack4j.core.transport.Config;
import com.huawei.openstack4j.model.common.ActionResponse;
import com.huawei.openstack4j.openstack.common.AsyncHandler;
import com.huawei.openstack4j.openstack.internal.OSClientSession;
import com.huawei.openstack4j.openstack.message.queue.constant.ConsumeStatus;
import com.huawei.openstack4j.openstack.message.queue.domain.ConsumeConfirmRequest;
import com.huawei.openstack4j.openstack.message.queue.domain.ConsumeConfirmResponse;
import com.huawei.openstack4j.openstack.message.queue.domain.ConsumeRequest;
import com.huawei.openstack4j.openstack.message.queue.domain.ProduceRequest;
import com.huawei.openstack4j.openstack.message.queue.domain.QueueMessage;
import com.huawei.openstack4j.openstack.message.queue.domain.QueueMessageWithHandler;
import com.huawei.openstack4j.openstack.message.queue.internal.BaseMessageQueueServices;
import com.huawei.openstack4j.openstack.message.queue.internal.QueueMessageService;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MessageQueueAsyncService
extends BaseMessageQueueServices
implements RestService {
    private static final int DEFAULT_THREAD_POOL_SIZE = 100;
    private final ExecutorService executorService;
    private QueueMessageService queueMessageService = new QueueMessageService();

    public MessageQueueAsyncService() {
        ExecutorService executor = null;
        Config config = OSClientSession.getCurrent().getConfig();
        if (config != null) {
            executor = config.getExecutor();
        }
        this.executorService = null == executor ? Executors.newFixedThreadPool(100) : executor;
    }

    public Future<ActionResponse> produceAsync(String queueId, QueueMessage message, AsyncHandler<ActionResponse> asyncHandler) {
        ProduceRequest request = ProduceRequest.builder().queueId(queueId).message(message).build();
        return this.submit(request, asyncHandler, new InnerExecutor<ProduceRequest, ActionResponse>(){

            @Override
            public ActionResponse innerExecute(ProduceRequest request) {
                return MessageQueueAsyncService.this.queueMessageService.produce(request.getQueueId(), request.getMessage());
            }
        });
    }

    public Future<List<QueueMessageWithHandler>> consumeAsync(String queueId, String consumerGroupId, Integer maxMessages, Integer timeWait, AsyncHandler<List<QueueMessageWithHandler>> asyncHandler) {
        ConsumeRequest request = ConsumeRequest.builder().queueId(queueId).consumerGroupId(consumerGroupId).maxMessages(maxMessages).timeWait(timeWait).build();
        return this.submit(request, asyncHandler, new InnerExecutor<ConsumeRequest, List<QueueMessageWithHandler>>(){

            @Override
            public List<QueueMessageWithHandler> innerExecute(ConsumeRequest request) {
                return MessageQueueAsyncService.this.queueMessageService.consume(request.getQueueId(), request.getConsumerGroupId(), request.getMaxMessages(), request.getTimeWait());
            }
        });
    }

    public Future<ConsumeConfirmResponse> confirmConsumingAsync(String queueId, String consumerGroupId, Map<String, ConsumeStatus> consumeResult, AsyncHandler<ConsumeConfirmResponse> asyncHandler) {
        ConsumeConfirmRequest request = ConsumeConfirmRequest.builder().queueId(queueId).consumerGroupId(consumerGroupId).consumeResult(consumeResult).build();
        return this.submit(request, asyncHandler, new InnerExecutor<ConsumeConfirmRequest, ConsumeConfirmResponse>(){

            @Override
            public ConsumeConfirmResponse innerExecute(ConsumeConfirmRequest request) {
                return MessageQueueAsyncService.this.queueMessageService.confirmConsuming(request.getQueueId(), request.getConsumerGroupId(), request.getConsumeResult());
            }
        });
    }

    public void closePool() {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    private <REQUEST, RESULT> Future<RESULT> submit(final REQUEST request, final AsyncHandler<RESULT> asyncHandler, final InnerExecutor<REQUEST, RESULT> innerExecutor) {
        final OSClientSession session = OSClientSession.getCurrent();
        return this.executorService.submit(new Callable<RESULT>(){

            @Override
            public RESULT call() throws Exception {
                OSClientSession.setCurrent(session);
                Object result = null;
                try {
                    result = innerExecutor.innerExecute(request);
                }
                catch (Exception ex) {
                    if (asyncHandler != null) {
                        asyncHandler.onError(ex);
                    }
                    throw ex;
                }
                if (asyncHandler != null) {
                    asyncHandler.onSuccess(result);
                }
                return result;
            }
        });
    }

    protected static interface InnerExecutor<REQUEST, RESULT> {
        public RESULT innerExecute(REQUEST var1);
    }
}

