/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.taskmanager;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
import org.apache.flink.runtime.rest.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.FileReadCountQueryParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.FileReadStartQueryParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.LogFileNamePathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.util.FileOffsetRange;
import org.apache.flink.runtime.util.FileReadDetail;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMessage;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessageParameters>
extends AbstractHandler<RestfulGateway, EmptyRequestBody, M> {
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final TransientBlobService transientBlobService;
    private final LoadingCache<FileReadDetail, CompletableFuture<TransientBlobKey>> fileBlobKeys;

    protected AbstractTaskManagerFileHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, M> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) {
        super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders);
        this.resourceManagerGatewayRetriever = (GatewayRetriever)Preconditions.checkNotNull(resourceManagerGatewayRetriever);
        this.transientBlobService = (TransientBlobService)Preconditions.checkNotNull((Object)transientBlobService);
        this.fileBlobKeys = CacheBuilder.newBuilder().expireAfterWrite(cacheEntryDuration.toMilliseconds(), TimeUnit.MILLISECONDS).removalListener(this::removeBlob).build((CacheLoader)new CacheLoader<FileReadDetail, CompletableFuture<TransientBlobKey>>(){

            public CompletableFuture<TransientBlobKey> load(FileReadDetail fd) throws Exception {
                return AbstractTaskManagerFileHandler.this.loadTaskManagerFile(fd);
            }
        });
    }

    @Override
    protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
        CompletableFuture blobKeyFuture;
        Long fileReadCount;
        Long fileStart;
        String filename;
        ResourceID taskManagerId = (ResourceID)handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
        try {
            filename = (String)handlerRequest.getPathParameter(LogFileNamePathParameter.class);
        }
        catch (IllegalStateException e) {
            filename = null;
        }
        if (null == filename) {
            fileStart = null;
            fileReadCount = null;
        } else {
            List fileReadCountsTmp = handlerRequest.getQueryParameter(FileReadCountQueryParameter.class);
            fileReadCount = fileReadCountsTmp.isEmpty() ? FileOffsetRange.getSizeDefaultValue() : Math.abs((Long)fileReadCountsTmp.get(0));
            List fileReadStartsTmp = handlerRequest.getQueryParameter(FileReadStartQueryParameter.class);
            fileStart = fileReadStartsTmp.isEmpty() ? Long.valueOf(FileOffsetRange.getStartDefaultValue()) : (Long)fileReadStartsTmp.get(0);
        }
        FileReadDetail fd = new FileReadDetail(taskManagerId, filename, fileStart, fileReadCount);
        try {
            blobKeyFuture = (CompletableFuture)this.fileBlobKeys.get((Object)fd);
        }
        catch (ExecutionException e) {
            Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
            if (cause instanceof RestHandlerException) {
                throw (RestHandlerException)((Object)cause);
            }
            throw new RestHandlerException("Could not retrieve file blob key future.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
        }
        CompletionStage resultFuture = blobKeyFuture.thenAcceptAsync(blobKey -> {
            File file;
            try {
                file = this.transientBlobService.getFile((TransientBlobKey)blobKey);
            }
            catch (IOException e) {
                throw new CompletionException(new FlinkException("Could not retrieve file from transient blob store.", (Throwable)e));
            }
            try {
                this.transferFile(ctx, file, httpRequest);
            }
            catch (FlinkException e) {
                throw new CompletionException(new FlinkException("Could not transfer file to client.", (Throwable)e));
            }
        }, (Executor)ctx.executor());
        return ((CompletableFuture)resultFuture).whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                HttpResponseStatus httpResponseStatus;
                ErrorResponseBody errorResponseBody;
                this.log.debug("Failed to transfer file from TaskExecutor {}.", (Object)taskManagerId, throwable);
                this.fileBlobKeys.invalidate((Object)taskManagerId);
                Throwable strippedThrowable = ExceptionUtils.stripCompletionException((Throwable)throwable);
                if (strippedThrowable instanceof UnknownTaskExecutorException) {
                    errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.');
                    httpResponseStatus = HttpResponseStatus.NOT_FOUND;
                } else {
                    errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.');
                    httpResponseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
                }
                HandlerUtils.sendErrorResponse(ctx, httpRequest, errorResponseBody, httpResponseStatus, (Map<String, String>)this.responseHeaders);
            }
        });
    }

    protected abstract CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway var1, FileReadDetail var2);

    private CompletableFuture<TransientBlobKey> loadTaskManagerFile(FileReadDetail fd) throws RestHandlerException {
        this.log.debug("Load file range from FileReadDetail:[{}].", (Object)fd);
        ResourceManagerGateway resourceManagerGateway = this.getResourceManagerGateway(this.resourceManagerGatewayRetriever);
        return this.requestFileUpload(resourceManagerGateway, fd);
    }

    private void removeBlob(RemovalNotification<FileReadDetail, CompletableFuture<TransientBlobKey>> removalNotification) {
        this.log.debug("Remove cached file for TaskExecutor {}.", removalNotification.getKey());
        CompletableFuture value = (CompletableFuture)removalNotification.getValue();
        if (value != null) {
            value.thenAccept(this.transientBlobService::deleteFromCache);
        }
    }

    private void transferFile(ChannelHandlerContext ctx, File file, HttpRequest httpRequest) throws FlinkException {
        RandomAccessFile randomAccessFile;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");
        }
        catch (FileNotFoundException e) {
            throw new FlinkException("Can not find file " + file + ".", (Throwable)e);
        }
        try {
            long fileLength = randomAccessFile.length();
            FileChannel fileChannel = randomAccessFile.getChannel();
            try {
                ChannelFuture lastContentFuture;
                DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                response.headers().set("Content-Type", (Object)"text/plain");
                if (HttpHeaders.isKeepAlive((HttpMessage)httpRequest)) {
                    response.headers().set("Connection", (Object)"keep-alive");
                }
                HttpHeaders.setContentLength((HttpMessage)response, (long)fileLength);
                ctx.write((Object)response);
                GenericFutureListener completionListener = future -> {
                    fileChannel.close();
                    randomAccessFile.close();
                };
                if (ctx.pipeline().get(SslHandler.class) == null) {
                    ctx.write((Object)new DefaultFileRegion(fileChannel, 0L, fileLength), (ChannelPromise)ctx.newProgressivePromise()).addListener(completionListener);
                    lastContentFuture = ctx.writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
                } else {
                    lastContentFuture = ctx.writeAndFlush((Object)new HttpChunkedInput((ChunkedInput)new ChunkedFile(randomAccessFile, 0L, fileLength, 8192)), (ChannelPromise)ctx.newProgressivePromise()).addListener(completionListener);
                }
                if (!HttpHeaders.isKeepAlive((HttpMessage)httpRequest)) {
                    lastContentFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                }
            }
            catch (IOException ex) {
                fileChannel.close();
                throw ex;
            }
        }
        catch (IOException ioe) {
            try {
                randomAccessFile.close();
            }
            catch (IOException e) {
                throw new FlinkException("Close file or channel error.", (Throwable)e);
            }
            throw new FlinkException("Could not transfer file " + file + " to the client.", (Throwable)ioe);
        }
    }
}

