/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.fault.tolerant;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator;
import org.apache.flink.table.runtime.fault.tolerant.NonnegligibleException;
import org.apache.flink.table.runtime.fault.tolerant.OutputProxy;

public class FaultTolerantableAsyncWaitOperator<IN, OUT>
extends AsyncWaitOperator<IN, OUT>
implements FaultTolerantableOperator<OUT> {
    private final String operatorName;
    private boolean faultTolerantEnable;
    private Counter ignoreCnt;
    private JobVertexID jobVertexId;

    public FaultTolerantableAsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode, String operatorName) {
        super(asyncFunction, timeout, capacity, outputMode);
        this.operatorName = operatorName;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output2) {
        if (this.faultTolerantEnable) {
            super.setup(containingTask, config, new OutputProxy<StreamRecord<OUT>>(output2));
            this.jobVertexId = containingTask.getEnvironment().getJobVertexId();
        } else {
            super.setup(containingTask, config, output2);
        }
    }

    public void open() throws Exception {
        super.open();
        if (this.faultTolerantEnable) {
            this.ignoreCnt = this.getMetricGroup().counter("ignore_cnt");
        }
    }

    public void failOperator(Throwable throwable) {
        if (this.faultTolerantEnable) {
            if (throwable instanceof Exception) {
                if (throwable instanceof NonnegligibleException) {
                    super.failOperator(throwable);
                } else {
                    LOG.error("An exception is ignored in operator [{}] of vertex [{}].\nThe output maybe discarded.", new Object[]{this.operatorName, this.jobVertexId, throwable});
                    this.ignoreCnt.inc();
                }
            } else {
                super.failOperator(throwable);
            }
        } else {
            super.failOperator(throwable);
        }
    }

    @Override
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }
}

