package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.RemoteFencedMessage;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.class */
public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler implements FencedMainThreadExecutable, FencedRpcGateway<F> {
    private final Supplier<F> fencingTokenSupplier;

    public FencedAkkaInvocationHandler(String str, String str2, ActorRef actorRef, Time time, long j, @Nullable CompletableFuture<Void> completableFuture, Supplier<F> supplier) {
        super(str, str2, actorRef, time, j, completableFuture);
        this.fencingTokenSupplier = (Supplier) Preconditions.checkNotNull(supplier);
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();
        return (declaringClass.equals(FencedMainThreadExecutable.class) || declaringClass.equals(FencedRpcGateway.class)) ? method.invoke(this, objArr) : super.invoke(obj, method, objArr);
    }

    @Override // org.apache.flink.runtime.rpc.FencedMainThreadExecutable
    public void runAsyncWithoutFencing(Runnable runnable) {
        Preconditions.checkNotNull(runnable, "runnable");
        if (!this.isLocal) {
            throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported.");
        }
        getActorRef().tell(new UnfencedMessage(new RunAsync(runnable, 0L)), ActorRef.noSender());
    }

    @Override // org.apache.flink.runtime.rpc.FencedMainThreadExecutable
    public <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time time) {
        Preconditions.checkNotNull(callable, "callable");
        Preconditions.checkNotNull(time, "timeout");
        if (this.isLocal) {
            return FutureUtils.toJava(Patterns.ask(getActorRef(), new UnfencedMessage(new CallAsync(callable)), time.toMilliseconds()));
        }
        throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported.");
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler
    public void tell(Object obj) {
        super.tell(fenceMessage(obj));
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler
    public CompletableFuture<?> ask(Object obj, Time time) {
        return super.ask(fenceMessage(obj), time);
    }

    @Override // org.apache.flink.runtime.rpc.FencedRpcGateway
    public F getFencingToken() {
        return this.fencingTokenSupplier.get();
    }

    private <P> FencedMessage<F, P> fenceMessage(P p) {
        if (this.isLocal) {
            return new LocalFencedMessage(this.fencingTokenSupplier.get(), p);
        }
        if (p instanceof Serializable) {
            return new RemoteFencedMessage(this.fencingTokenSupplier.get(), (Serializable) p);
        }
        throw new RuntimeException("Trying to send a non-serializable message " + p + " to a remote RpcEndpoint. Please make sure that the message implements java.io.Serializable.");
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.RpcServer
    public /* bridge */ /* synthetic */ CompletableFuture getTerminationFuture() {
        return super.getTerminationFuture();
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.RpcGateway
    public /* bridge */ /* synthetic */ String getHostname() {
        return super.getHostname();
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.RpcGateway
    public /* bridge */ /* synthetic */ String getAddress() {
        return super.getAddress();
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.StartStoppable
    public /* bridge */ /* synthetic */ void stop() {
        super.stop();
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.StartStoppable
    public /* bridge */ /* synthetic */ void start() {
        super.start();
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.MainThreadExecutable
    public /* bridge */ /* synthetic */ CompletableFuture callAsync(Callable callable, Time time) {
        return super.callAsync(callable, time);
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.MainThreadExecutable
    public /* bridge */ /* synthetic */ void scheduleRunAsync(Runnable runnable, long j) {
        super.scheduleRunAsync(runnable, j);
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.MainThreadExecutable
    public /* bridge */ /* synthetic */ void runAsync(Runnable runnable) {
        super.runAsync(runnable);
    }

    @Override // org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler, org.apache.flink.runtime.rpc.akka.AkkaBasedEndpoint
    public /* bridge */ /* synthetic */ ActorRef getActorRef() {
        return super.getActorRef();
    }
}
