/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.fault.tolerant;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator;
import org.apache.flink.table.runtime.fault.tolerant.TriggerableOperator;

public class FaultTolerantableKeyedCoProcessOperator<K, IN1, IN2, OUT>
extends KeyedCoProcessOperator<K, IN1, IN2, OUT>
implements FaultTolerantableOperator<OUT> {
    private static final long serialVersionUID = 1L;
    private boolean faultTolerantEnable = false;

    public FaultTolerantableKeyedCoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
        super(flatMapper);
    }

    @Override
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        Triggerable<K, N> t;
        if (this.faultTolerantEnable) {
            Counter ignoreCnt = this.getMetricGroup().counter("ignore_cnt");
            t = new TriggerableOperator.TriggerableProxy<K, N>(ignoreCnt, triggerable);
        } else {
            t = triggerable;
        }
        return super.getInternalTimerService(name, namespaceSerializer, t);
    }
}

