package org.apache.flink.table.runtime.fault.tolerant;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

/* loaded from: input_file:org/apache/flink/table/runtime/fault/tolerant/FaultTolerantableAsyncWaitOperator.class */
public class FaultTolerantableAsyncWaitOperator<IN, OUT> extends AsyncWaitOperator<IN, OUT> implements FaultTolerantableOperator<OUT> {
    private final String operatorName;
    private boolean faultTolerantEnable;
    private Counter ignoreCnt;
    private JobVertexID jobVertexId;

    public FaultTolerantableAsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long j, int i, AsyncDataStream.OutputMode outputMode, String str) {
        super(asyncFunction, j, i, outputMode);
        this.operatorName = str;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        if (!this.faultTolerantEnable) {
            super.setup(streamTask, streamConfig, output);
        } else {
            super.setup(streamTask, streamConfig, new OutputProxy(output));
            this.jobVertexId = streamTask.getEnvironment().getJobVertexId();
        }
    }

    public void open() throws Exception {
        super.open();
        if (this.faultTolerantEnable) {
            this.ignoreCnt = getMetricGroup().counter(FaultTolerantUtil.IGNORE_CNT_METRIC_NAME);
        }
    }

    public void failOperator(Throwable th) {
        if (!this.faultTolerantEnable) {
            super.failOperator(th);
            return;
        }
        if (!(th instanceof Exception)) {
            super.failOperator(th);
        } else if (th instanceof NonnegligibleException) {
            super.failOperator(th);
        } else {
            LOG.error("An exception is ignored in operator [{}] of vertex [{}].\nThe output maybe discarded.", new Object[]{this.operatorName, this.jobVertexId, th});
            this.ignoreCnt.inc();
        }
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }
}
