/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.fault.tolerant;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.runtime.fault.tolerant.FaultTolerantableOperator;
import org.apache.flink.table.runtime.fault.tolerant.NonnegligibleException;

public abstract class ProcessingTimeCallbackOperator<OUT>
extends AbstractStreamOperator<OUT>
implements ProcessingTimeCallback,
FaultTolerantableOperator<OUT> {
    private transient Counter ignoreCnt;
    private boolean faultTolerantEnable = false;

    public void open() throws Exception {
        super.open();
        if (this.faultTolerantEnable) {
            this.ignoreCnt = this.getMetricGroup().counter("ignore_cnt");
        }
    }

    @Override
    public void enableFaultTolerant() {
        this.faultTolerantEnable = true;
    }

    protected ProcessingTimeService getProcessingTimeService() {
        final ProcessingTimeService processingTimeService = super.getProcessingTimeService();
        if (!this.faultTolerantEnable) {
            return processingTimeService;
        }
        return new ProcessingTimeService(){

            public long getCurrentProcessingTime() {
                return processingTimeService.getCurrentProcessingTime();
            }

            public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
                ProcessingTimeCallback t = timestamp1 -> {
                    try {
                        target.onProcessingTime(timestamp1);
                    }
                    catch (Exception e2) {
                        if (e2 instanceof NonnegligibleException) {
                            throw e2;
                        }
                        LOG.error("An exception is ignored trigger on ProcessingTime.\nThe output maybe discarded.\nTimestamp : [{}].", (Object)timestamp1, (Object)e2);
                        ProcessingTimeCallbackOperator.this.ignoreCnt.inc();
                    }
                };
                return processingTimeService.registerTimer(timestamp, t);
            }

            public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
                return processingTimeService.scheduleAtFixedRate(callback, initialDelay, period);
            }

            public boolean isTerminated() {
                return processingTimeService.isTerminated();
            }

            public void quiesce() throws InterruptedException {
                processingTimeService.quiesce();
            }

            public void awaitPendingAfterQuiesce() throws InterruptedException {
                processingTimeService.awaitPendingAfterQuiesce();
            }

            public void shutdownService() {
                processingTimeService.shutdownService();
            }

            public boolean shutdownServiceUninterruptible(long timeoutMs) {
                return processingTimeService.shutdownServiceUninterruptible(timeoutMs);
            }

            public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
                return processingTimeService.shutdownAndAwaitPending(time, timeUnit);
            }
        };
    }
}

