/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
extends StreamTask<OUT, OP> {
    private volatile boolean externallyInducedCheckpoints;
    protected OP headOperator;
    private CheckpointLockDelegate lockDelegate;
    private volatile boolean isFinished = false;

    public SourceStreamTask(Environment env) {
        super(env);
    }

    @Override
    protected void init() {
        this.headOperator = this.getHeadOperator();
        this.lockDelegate = new CheckpointLockDelegate(this.getCheckpointLock());
        SourceFunction source = (SourceFunction)((AbstractUdfStreamOperator)this.headOperator).getUserFunction();
        if (source instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger(){

                @Override
                public void triggerCheckpoint(long checkpointId) throws FlinkException {
                    CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
                    long timestamp = System.currentTimeMillis();
                    CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
                    try {
                        SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions, false);
                    }
                    catch (RuntimeException | FlinkException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new FlinkException(e.getMessage(), (Throwable)e);
                    }
                }
            };
            ((ExternallyInducedSource)source).setCheckpointTrigger(triggerHook);
        }
    }

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        ((StreamSource)this.headOperator).advanceToEndOfEventTime();
    }

    @Override
    protected void finishTask() throws Exception {
        this.isFinished = true;
        this.cancelTask();
    }

    @Override
    protected void cleanup() {
    }

    @Override
    protected void run() throws Exception {
        block3: {
            try {
                ((StreamSource)this.headOperator).run(this.lockDelegate.getLock(), this.getStreamStatusMaintainer());
            }
            catch (Exception e) {
                if (this.isFinished) break block3;
                throw e;
            }
        }
        if (!((StreamSource)this.headOperator).isCanceledOrStopped()) {
            this.lockDelegate.lockAndRun(() -> {
                for (StreamOperator<?> operator : this.operatorChain.getAllOperatorsTopologySorted()) {
                    if (!(operator instanceof OneInputStreamOperator)) continue;
                    ((OneInputStreamOperator)operator).endInput();
                }
            });
        }
    }

    @Override
    protected void cancelTask() throws Exception {
        if (this.headOperator != null) {
            ((StreamSource)this.headOperator).cancel();
        }
    }

    protected OP getHeadOperator() {
        Preconditions.checkState((this.operatorChain.getHeadOperators().length == 1 ? 1 : 0) != 0, (Object)("There should only one head operator, not " + this.operatorChain.getHeadOperators().length));
        return (OP)((StreamSource)this.operatorChain.getHeadOperators()[0]);
    }

    @Override
    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) throws Exception {
        if (!this.externallyInducedCheckpoints) {
            return super.triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
        }
        return (Boolean)this.lockDelegate.lockAndRun(() -> this.isRunning());
    }
}

