package org.apache.tez.http.async.netty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.Request;
import com.ning.http.client.RequestBuilder;
import com.ning.http.client.Response;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URL;
import org.apache.commons.io.IOUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.http.SSLFactory;
import org.apache.tez.http.async.netty.TezBodyDeferringAsyncHandler;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/http/async/netty/AsyncHttpConnection.class */
public class AsyncHttpConnection extends BaseHttpConnection {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
    private final JobTokenSecretManager jobTokenSecretMgr;
    private String encHash;
    private String msgToEncode;
    private final HttpConnectionParams httpConnParams;
    private final Stopwatch stopWatch = new Stopwatch();
    private final URL url;
    private static volatile AsyncHttpClient httpAsyncClient;
    private final TezBodyDeferringAsyncHandler handler;
    private final PipedOutputStream pos;
    private final PipedInputStream pis;
    private Response response;
    private ListenableFuture<Response> responseFuture;
    private TezBodyDeferringAsyncHandler.BodyDeferringInputStream dis;

    private void initClient(HttpConnectionParams httpConnectionParams) throws IOException {
        if (httpAsyncClient == null && httpAsyncClient == null) {
            synchronized (AsyncHttpConnection.class) {
                if (httpAsyncClient == null) {
                    LOG.info("Initializing AsyncClient (TezBodyDeferringAsyncHandler)");
                    AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
                    if (httpConnectionParams.isSslShuffle()) {
                        SSLFactory sslFactory = httpConnectionParams.getSslFactory();
                        Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
                        sslFactory.configure(builder);
                    }
                    builder.setAllowPoolingConnection(httpConnectionParams.isKeepAlive()).setAllowSslConnectionPool(httpConnectionParams.isKeepAlive()).setCompressionEnabled(false).setMaximumConnectionsPerHost(1).setConnectionTimeoutInMs(httpConnectionParams.getConnectionTimeout()).setRequestTimeoutInMs(httpConnectionParams.getReadTimeout()).setUseRawUrl(true).build();
                    httpAsyncClient = new AsyncHttpClient(builder.build());
                }
            }
        }
    }

    public AsyncHttpConnection(URL url, HttpConnectionParams httpConnectionParams, String str, JobTokenSecretManager jobTokenSecretManager) throws IOException {
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.httpConnParams = httpConnectionParams;
        this.url = url;
        if (LOG.isDebugEnabled()) {
            LOG.debug("MapOutput URL :" + url.toString());
        }
        initClient(this.httpConnParams);
        this.pos = new PipedOutputStream();
        this.pis = new PipedInputStream(this.pos, this.httpConnParams.getBufferSize());
        this.handler = new TezBodyDeferringAsyncHandler(this.pos, url, 60000);
    }

    @VisibleForTesting
    public void computeEncHash() throws IOException {
        this.msgToEncode = SecureShuffleUtils.buildMsgFrom(this.url);
        this.encHash = SecureShuffleUtils.hashFromString(this.msgToEncode, this.jobTokenSecretMgr);
    }

    @Override // org.apache.tez.http.BaseHttpConnection
    public boolean connect() throws IOException, InterruptedException {
        computeEncHash();
        RequestBuilder requestBuilder = new RequestBuilder();
        requestBuilder.setHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH, this.encHash);
        requestBuilder.setHeader(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
        requestBuilder.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
        Request build = requestBuilder.setUrl(this.url.toString()).build();
        LOG.debug("Request url={}, encHash={}, id={}", this.url, this.encHash);
        try {
            this.responseFuture = httpAsyncClient.executeRequest(build, this.handler);
            this.dis = new TezBodyDeferringAsyncHandler.BodyDeferringInputStream(this.responseFuture, this.handler, this.pis);
            this.response = this.dis.getAsapResponse();
            if (this.response == null) {
                throw new IOException("Response is null");
            }
            int statusCode = this.response.getStatusCode();
            if (statusCode == 200) {
                return true;
            }
            LOG.debug("Request url={}, id={}", this.response.getUri());
            throw new IOException("Got invalid response code " + statusCode + " from " + this.url + ": " + this.response.getStatusText());
        } catch (IOException e) {
            throw e;
        }
    }

    @Override // org.apache.tez.http.BaseHttpConnection
    public void validate() throws IOException {
        if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(this.response.getHeader(ShuffleHeader.HTTP_HEADER_NAME)) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(this.response.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
            throw new IOException("Incompatible shuffle response version");
        }
        String header = this.response.getHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
        if (header == null) {
            throw new IOException("security validation of TT Map output failed");
        }
        LOG.debug("url={};encHash={};replyHash={}", new Object[]{this.msgToEncode, this.encHash, header});
        SecureShuffleUtils.verifyReply(header, this.encHash, this.jobTokenSecretMgr);
        LOG.info("for url={} sent hash and receievd reply {} ms", this.url, Long.valueOf(this.stopWatch.elapsedMillis()));
    }

    @Override // org.apache.tez.http.BaseHttpConnection
    public DataInputStream getInputStream() throws IOException, InterruptedException {
        Preconditions.checkState(this.response != null, "Response can not be null");
        return new DataInputStream(this.dis);
    }

    @VisibleForTesting
    public void close() {
        httpAsyncClient.close();
        httpAsyncClient = null;
    }

    @Override // org.apache.tez.http.BaseHttpConnection
    public void cleanup(boolean z) throws IOException {
        if (this.response != null) {
            this.dis.close();
        }
        IOUtils.closeQuietly(this.pos);
        IOUtils.closeQuietly(this.pis);
        this.response = null;
    }
}
