package org.apache.flink.streaming.api.bundle;

import java.util.concurrent.ScheduledFuture;
import org.apache.flink.streaming.api.bundle.CoBundleTrigger;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/bundle/TimeCoBundleTrigger.class */
public class TimeCoBundleTrigger<L, R> implements CoBundleTrigger<L, R> {
    private final long timeout;
    private transient BundleTriggerCallback callback;
    private transient ProcessingTimeService timeRegistry;
    private transient ScheduledFuture scheduledFuture;

    public TimeCoBundleTrigger(long j) {
        Preconditions.checkArgument(j > 0, "capacity must be greater than 0");
        this.timeout = j;
    }

    @Override // org.apache.flink.streaming.api.bundle.CoBundleTrigger
    public void registerBundleTriggerCallback(BundleTriggerCallback bundleTriggerCallback, CoBundleTrigger.BundleTriggerContext bundleTriggerContext) {
        this.callback = (BundleTriggerCallback) Preconditions.checkNotNull(bundleTriggerCallback, "callback is null");
        this.timeRegistry = bundleTriggerContext.getProcessingTimeRegistry();
        Preconditions.checkNotNull(this.timeRegistry, "timeRegistry is null");
    }

    @Override // org.apache.flink.streaming.api.bundle.CoBundleTrigger
    public void onLeftElement(L l) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.bundle.CoBundleTrigger
    public void onRightElement(R r) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.bundle.CoBundleTrigger
    public void reset() {
        if (this.scheduledFuture != null && !this.scheduledFuture.isDone() && !this.scheduledFuture.isCancelled()) {
            this.scheduledFuture.cancel(false);
            this.scheduledFuture = null;
        }
        if (this.timeRegistry.isTerminated()) {
            throw new IllegalStateException("ProcessingTimeRegistry is terminated.");
        }
        this.scheduledFuture = this.timeRegistry.registerTimer(this.timeRegistry.getCurrentProcessingTime() + this.timeout, j -> {
            this.callback.finishBundle();
        });
    }

    @Override // org.apache.flink.streaming.api.bundle.CoBundleTrigger
    public String explain() {
        return "TimeCoBundleTrigger with timeout is " + this.timeout + " ms";
    }
}
