package org.apache.flink.table.runtime.fault.tolerant;

import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/fault/tolerant/TriggerableOperator.class */
public abstract class TriggerableOperator<K, N, OUT> extends AbstractStreamOperator<OUT> implements Triggerable<K, N>, FaultTolerantableOperator<OUT> {
    private transient Counter ignoreCnt;
    private boolean faultTolerantEnable = false;

    /* loaded from: input_file:org/apache/flink/table/runtime/fault/tolerant/TriggerableOperator$TriggerableProxy.class */
    static class TriggerableProxy<K, N> implements Triggerable<K, N>, Serializable {
        private static final long serialVersionUID = 1;
        private static final Logger LOG = LoggerFactory.getLogger(TriggerableProxy.class);
        private final Counter ignoreCnt;
        private final Triggerable<K, N> triggerable;

        public TriggerableProxy(Counter counter, Triggerable<K, N> triggerable) {
            this.ignoreCnt = counter;
            this.triggerable = triggerable;
        }

        public void onEventTime(InternalTimer<K, N> internalTimer) throws Exception {
            try {
                this.triggerable.onEventTime(internalTimer);
            } catch (Exception e) {
                if (e instanceof NonnegligibleException) {
                    throw e;
                }
                LOG.error("An exception is ignored trigger on EventTime.\nThe output maybe discarded.\nDetail information about timer is: \ntimestamp : [{}], \nkey : [{}], \nnamespace : [{}].", new Object[]{Long.valueOf(internalTimer.getTimestamp()), internalTimer.getKey(), internalTimer.getNamespace(), e});
                this.ignoreCnt.inc();
            }
        }

        public void onProcessingTime(InternalTimer<K, N> internalTimer) throws Exception {
            try {
                this.triggerable.onProcessingTime(internalTimer);
            } catch (Exception e) {
                if (e instanceof NonnegligibleException) {
                    throw e;
                }
                LOG.error("An exception is ignored trigger on ProcessingTime.\nThe output maybe discarded.\nDetail information about timer is: \ntimestamp : [{}], \nkey : [{}], \nnamespace : [{}].", new Object[]{Long.valueOf(internalTimer.getTimestamp()), internalTimer.getKey(), internalTimer.getNamespace(), e});
                this.ignoreCnt.inc();
            }
        }
    }

    public void open() throws Exception {
        super.open();
        if (this.faultTolerantEnable) {
            this.ignoreCnt = getMetricGroup().counter(FaultTolerantUtil.IGNORE_CNT_METRIC_NAME);
        }
    }

    @Override // org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<N> typeSerializer, Triggerable<K, N> triggerable) {
        return super.getInternalTimerService(str, typeSerializer, this.faultTolerantEnable ? new TriggerableProxy(this.ignoreCnt, triggerable) : triggerable);
    }
}
