/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.legacy;

import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

public class JobCancellationWithSavepointHandlers {
    private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint";
    private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
    private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
    private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
    private final Object lock = new Object();
    private final Map<JobID, Long> inProgress = new HashMap<JobID, Long>();
    private final Map<Long, Object> completed = new HashMap<Long, Object>();
    private long requestCounter;
    private final TriggerHandler triggerHandler;
    private final InProgressHandler inProgressHandler;
    private final String defaultSavepointDirectory;

    public JobCancellationWithSavepointHandlers(ExecutionGraphCache currentGraphs, Executor executor) {
        this(currentGraphs, executor, null);
    }

    public JobCancellationWithSavepointHandlers(ExecutionGraphCache currentGraphs, Executor executor, @Nullable String defaultSavepointDirectory) {
        this.triggerHandler = new TriggerHandler(currentGraphs, executor);
        this.inProgressHandler = new InProgressHandler();
        this.defaultSavepointDirectory = defaultSavepointDirectory;
    }

    public TriggerHandler getTriggerHandler() {
        return this.triggerHandler;
    }

    public InProgressHandler getInProgressHandler() {
        return this.inProgressHandler;
    }

    class InProgressHandler
    implements RequestHandler {
        private static final int NUM_GHOST_REQUEST_IDS = 16;
        private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque(16);

        InProgressHandler() {
        }

        @Override
        public String[] getPaths() {
            return new String[]{JobCancellationWithSavepointHandlers.CANCELLATION_IN_PROGRESS_REST_PATH};
        }

        @Override
        public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
            JobID jobId = JobID.fromHexString((String)pathParams.get("jobid"));
            long requestId = Long.parseLong(pathParams.get("requestId"));
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Object object = JobCancellationWithSavepointHandlers.this.lock;
                    synchronized (object) {
                        Object result = JobCancellationWithSavepointHandlers.this.completed.remove(requestId);
                        if (result != null) {
                            this.recentlyCompleted.add((Tuple2<Long, Object>)new Tuple2((Object)requestId, result));
                            if (this.recentlyCompleted.size() > 16) {
                                this.recentlyCompleted.remove();
                            }
                            if (result.getClass() == String.class) {
                                String savepointPath = (String)result;
                                return this.createSuccessResponse(requestId, savepointPath);
                            }
                            Throwable cause = (Throwable)result;
                            return this.createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
                        }
                        Long inProgressRequestId = (Long)JobCancellationWithSavepointHandlers.this.inProgress.get(jobId);
                        if (inProgressRequestId != null) {
                            if (inProgressRequestId == requestId) {
                                return this.createInProgressResponse(requestId);
                            }
                            String msg = "Request ID does not belong to JobID";
                            return this.createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
                        }
                        for (Tuple2<Long, Object> recent : this.recentlyCompleted) {
                            if ((Long)recent.f0 != requestId) continue;
                            if (recent.f1.getClass() == String.class) {
                                String savepointPath = (String)recent.f1;
                                return this.createSuccessResponse(requestId, savepointPath);
                            }
                            Throwable cause = (Throwable)recent.f1;
                            return this.createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
                        }
                        return this.createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
                    }
                }
                catch (Exception e) {
                    throw new CompletionException(new FlinkException("Could not handle in progress request.", (Throwable)e));
                }
            });
        }

        private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
            StringWriter writer = new StringWriter();
            JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator((Writer)writer);
            gen.writeStartObject();
            gen.writeStringField("status", "success");
            gen.writeNumberField("request-id", requestId);
            gen.writeStringField("savepoint-path", savepointPath);
            gen.writeEndObject();
            gen.close();
            String json = writer.toString();
            byte[] bytes = json.getBytes(ENCODING);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CREATED, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Content-Type", (Object)("application/json; charset=" + ENCODING.name()));
            response.headers().set("Content-Length", (Object)response.content().readableBytes());
            return response;
        }

        private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
            StringWriter writer = new StringWriter();
            JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator((Writer)writer);
            gen.writeStartObject();
            gen.writeStringField("status", "in-progress");
            gen.writeNumberField("request-id", requestId);
            gen.writeEndObject();
            gen.close();
            String json = writer.toString();
            byte[] bytes = json.getBytes(ENCODING);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Content-Type", (Object)("application/json; charset=" + ENCODING.name()));
            response.headers().set("Content-Length", (Object)response.content().readableBytes());
            return response;
        }

        private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
            StringWriter writer = new StringWriter();
            JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator((Writer)writer);
            gen.writeStartObject();
            gen.writeStringField("status", "failed");
            gen.writeNumberField("request-id", requestId);
            gen.writeStringField("cause", errMsg);
            gen.writeEndObject();
            gen.close();
            String json = writer.toString();
            byte[] bytes = json.getBytes(ENCODING);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, code, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Content-Type", (Object)("application/json; charset=" + ENCODING.name()));
            response.headers().set("Content-Length", (Object)response.content().readableBytes());
            return response;
        }
    }

    class TriggerHandler
    implements RequestHandler {
        private final ExecutionGraphCache currentGraphs;
        private final Executor executor;

        public TriggerHandler(ExecutionGraphCache currentGraphs, Executor executor) {
            this.currentGraphs = (ExecutionGraphCache)Preconditions.checkNotNull((Object)currentGraphs);
            this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        }

        @Override
        public String[] getPaths() {
            return new String[]{JobCancellationWithSavepointHandlers.CANCEL_WITH_SAVEPOINT_REST_PATH, JobCancellationWithSavepointHandlers.CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
        }

        @Override
        public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
            if (jobManagerGateway != null) {
                JobID jobId = JobID.fromHexString((String)pathParams.get("jobid"));
                CompletableFuture<AccessExecutionGraph> graphFuture = this.currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
                return graphFuture.handleAsync((graph, throwable) -> {
                    if (throwable != null) {
                        throw new CompletionException((Throwable)((Object)new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
                    }
                    CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration();
                    if (jobCheckpointingConfiguration == null) {
                        throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job."));
                    }
                    String targetDirectory = (String)pathParams.get("targetDirectory");
                    if (targetDirectory == null) {
                        if (JobCancellationWithSavepointHandlers.this.defaultSavepointDirectory == null) {
                            throw new IllegalStateException("No savepoint directory configured. You can either specify a directory when triggering this savepoint or configure a cluster-wide default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
                        }
                        targetDirectory = JobCancellationWithSavepointHandlers.this.defaultSavepointDirectory;
                    }
                    try {
                        return this.handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout());
                    }
                    catch (IOException e) {
                        throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", (Throwable)e));
                    }
                }, this.executor);
            }
            return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
            boolean isNewRequest;
            long requestId;
            Object object = JobCancellationWithSavepointHandlers.this.lock;
            synchronized (object) {
                if (JobCancellationWithSavepointHandlers.this.inProgress.containsKey(jobId)) {
                    requestId = (Long)JobCancellationWithSavepointHandlers.this.inProgress.get(jobId);
                    isNewRequest = false;
                } else {
                    requestId = ++JobCancellationWithSavepointHandlers.this.requestCounter;
                    JobCancellationWithSavepointHandlers.this.inProgress.put(jobId, requestId);
                    isNewRequest = true;
                }
            }
            if (isNewRequest) {
                Object cancelJobFuture;
                boolean success = false;
                try {
                    cancelJobFuture = jobManagerGateway.cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds((long)checkpointTimeout));
                    ((CompletableFuture)cancelJobFuture).whenCompleteAsync((path, throwable) -> {
                        try {
                            if (throwable != null) {
                                JobCancellationWithSavepointHandlers.this.completed.put(requestId, throwable);
                            } else {
                                JobCancellationWithSavepointHandlers.this.completed.put(requestId, path);
                            }
                        }
                        finally {
                            Object object = JobCancellationWithSavepointHandlers.this.lock;
                            synchronized (object) {
                                JobCancellationWithSavepointHandlers.this.inProgress.remove(jobId);
                            }
                        }
                    }, this.executor);
                    success = true;
                }
                finally {
                    cancelJobFuture = JobCancellationWithSavepointHandlers.this.lock;
                    synchronized (cancelJobFuture) {
                        if (!success) {
                            JobCancellationWithSavepointHandlers.this.inProgress.remove(jobId);
                        }
                    }
                }
            }
            String location = JobCancellationWithSavepointHandlers.CANCELLATION_IN_PROGRESS_REST_PATH.replace(":jobid", jobId.toString()).replace(":requestId", Long.toString(requestId));
            StringWriter writer = new StringWriter();
            JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator((Writer)writer);
            gen.writeStartObject();
            gen.writeStringField("status", "accepted");
            gen.writeNumberField("request-id", requestId);
            gen.writeStringField("location", location);
            gen.writeEndObject();
            gen.close();
            String json = writer.toString();
            byte[] bytes = json.getBytes(ENCODING);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.ACCEPTED, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Location", (Object)location);
            response.headers().set("Content-Type", (Object)("application/json; charset=" + ENCODING.name()));
            response.headers().set("Content-Length", (Object)response.content().readableBytes());
            DefaultFullHttpResponse accepted = response;
            return accepted;
        }
    }
}

