/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.fault.tolerant;

import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator;
import org.apache.flink.table.runtime.fault.tolerant.NonnegligibleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TriggerableOperator<K, N, OUT>
extends AbstractStreamOperator<OUT>
implements Triggerable<K, N>,
FaultTolerantableOperator<OUT> {
    private transient Counter ignoreCnt;
    private boolean faultTolerantEnable = false;

    public void open() throws Exception {
        super.open();
        if (this.faultTolerantEnable) {
            this.ignoreCnt = this.getMetricGroup().counter("ignore_cnt");
        }
    }

    @Override
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        Triggerable<K, N> t = this.faultTolerantEnable ? new TriggerableProxy<K, N>(this.ignoreCnt, triggerable) : triggerable;
        return super.getInternalTimerService(name, namespaceSerializer, t);
    }

    static class TriggerableProxy<K, N>
    implements Triggerable<K, N>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private static final Logger LOG = LoggerFactory.getLogger(TriggerableProxy.class);
        private final Counter ignoreCnt;
        private final Triggerable<K, N> triggerable;

        public TriggerableProxy(Counter ignoreCnt, Triggerable<K, N> triggerable) {
            this.ignoreCnt = ignoreCnt;
            this.triggerable = triggerable;
        }

        public void onEventTime(InternalTimer<K, N> timer) throws Exception {
            try {
                this.triggerable.onEventTime(timer);
            }
            catch (Exception e2) {
                if (e2 instanceof NonnegligibleException) {
                    throw e2;
                }
                LOG.error("An exception is ignored trigger on EventTime.\nThe output maybe discarded.\nDetail information about timer is: \ntimestamp : [{}], \nkey : [{}], \nnamespace : [{}].", new Object[]{timer.getTimestamp(), timer.getKey(), timer.getNamespace(), e2});
                this.ignoreCnt.inc();
            }
        }

        public void onProcessingTime(InternalTimer<K, N> timer) throws Exception {
            try {
                this.triggerable.onProcessingTime(timer);
            }
            catch (Exception e2) {
                if (e2 instanceof NonnegligibleException) {
                    throw e2;
                }
                LOG.error("An exception is ignored trigger on ProcessingTime.\nThe output maybe discarded.\nDetail information about timer is: \ntimestamp : [{}], \nkey : [{}], \nnamespace : [{}].", new Object[]{timer.getTimestamp(), timer.getKey(), timer.getNamespace(), e2});
                this.ignoreCnt.inc();
            }
        }
    }
}

