package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.checkpointlock.CheckpointLockDelegate;
import org.apache.flink.util.checkpointlock.FairLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);

    @VisibleForTesting
    static final int DEFAULT_MAX_RECORDED_ABOTRED_CHECKPOINTS = 128;
    private CheckpointLockDelegate lockDelegate;
    protected final StreamConfig configuration;
    protected OperatorChain operatorChain;
    protected final StreamTaskConfigSnapshot streamTaskConfig;
    protected StateBackend stateBackend;
    private CheckpointStorage checkpointStorage;
    protected ProcessingTimeService timerService;
    private AccumulatorRegistry accumulatorRegistry;
    private final CloseableRegistry cancelables;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;
    private CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
    private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
    private final List<StreamRecordWriter<StreamRecord<?>>> streamRecordWriters;
    private final TreeMap<Long, AsyncCheckpointRunnable> asyncCheckpointOperations;
    private final NavigableSet<Long> abortedCheckpointIds;
    protected final int maxConcurrentCheckpoints;
    private long initTime;
    private StreamTaskStateInitializer streamTaskStateInitializer;
    private final SynchronousCheckpointLatch syncCheckpointLatch;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$AsyncCheckpointExceptionHandler.class */
    public static final class AsyncCheckpointExceptionHandler implements CheckpointExceptionHandler {
        final StreamTask<?, ?> owner;
        final CheckpointExceptionHandler synchronousCheckpointExceptionHandler;

        AsyncCheckpointExceptionHandler(StreamTask<?, ?> streamTask) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.synchronousCheckpointExceptionHandler = (CheckpointExceptionHandler) Preconditions.checkNotNull(((StreamTask) streamTask).synchronousCheckpointExceptionHandler);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandler
        public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exc) {
            try {
                this.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, exc);
            } catch (Exception e) {
                this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", new AsynchronousException(e));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$AsyncCheckpointRunnable.class */
    public static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        private final StreamTask<?, ?> owner;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(CheckpointingOperation.AsyncCheckpointState.RUNNING);
        private volatile boolean cancelled = false;

        AsyncCheckpointRunnable(StreamTask<?, ?> streamTask, Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long j) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.operatorSnapshotsInProgress = (Map) Preconditions.checkNotNull(map);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.asyncStartNanos = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                try {
                    TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                    TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                    for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
                        OperatorID key = entry.getKey();
                        OperatorSnapshotFinalizer operatorSnapshotFinalizer = new OperatorSnapshotFinalizer(entry.getValue());
                        taskStateSnapshot.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getJobManagerOwnedState());
                        taskStateSnapshot2.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getTaskLocalState());
                    }
                    long nanoTime = (System.nanoTime() - this.asyncStartNanos) / 1000000;
                    this.checkpointMetrics.setAsyncDurationMillis(nanoTime);
                    if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                        reportCompletedSnapshotStates(taskStateSnapshot, taskStateSnapshot2, nanoTime);
                    } else {
                        StreamTask.LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                    }
                    ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                    if (this.owner.maxConcurrentCheckpoints > 0) {
                        synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                            ((StreamTask) this.owner).asyncCheckpointOperations.remove(Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                        }
                    }
                } catch (Exception e) {
                    StreamTask.LOG.warn("Async checkpoint failed", e);
                    handleExecutionException(e);
                    ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                    if (this.owner.maxConcurrentCheckpoints > 0) {
                        synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                            ((StreamTask) this.owner).asyncCheckpointOperations.remove(Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                        }
                    }
                }
            } catch (Throwable th) {
                ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                if (this.owner.maxConcurrentCheckpoints > 0) {
                    synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                        ((StreamTask) this.owner).asyncCheckpointOperations.remove(Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                    }
                }
                throw th;
            }
        }

        private void reportCompletedSnapshotStates(TaskStateSnapshot taskStateSnapshot, TaskStateSnapshot taskStateSnapshot2, long j) {
            TaskStateManager taskStateManager = this.owner.getEnvironment().getTaskStateManager();
            boolean hasState = taskStateSnapshot.hasState();
            boolean hasState2 = taskStateSnapshot2.hasState();
            Preconditions.checkState(hasState || !hasState2, "Found cached state but no corresponding primary state is reported to the job manager. This indicates a problem.");
            taskStateManager.reportTaskStateSnapshots(this.checkpointMetaData, this.checkpointMetrics, hasState ? taskStateSnapshot : null, hasState2 ? taskStateSnapshot2 : null);
            StreamTask.LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(j)});
            StreamTask.LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), taskStateSnapshot});
        }

        private void handleExecutionException(Exception exc) {
            boolean z = false;
            CheckpointingOperation.AsyncCheckpointState asyncCheckpointState = this.asyncCheckpointState.get();
            while (true) {
                CheckpointingOperation.AsyncCheckpointState asyncCheckpointState2 = asyncCheckpointState;
                if (CheckpointingOperation.AsyncCheckpointState.DISCARDED == asyncCheckpointState2) {
                    break;
                }
                if (this.asyncCheckpointState.compareAndSet(asyncCheckpointState2, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                    z = true;
                    try {
                        cleanup();
                    } catch (Exception e) {
                        exc.addSuppressed(e);
                    }
                    ((StreamTask) this.owner).asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', exc));
                    asyncCheckpointState = CheckpointingOperation.AsyncCheckpointState.DISCARDED;
                } else {
                    asyncCheckpointState = this.asyncCheckpointState.get();
                }
            }
            if (z) {
                return;
            }
            StreamTask.LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", exc);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (!this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                logFailedCleanupAttempt();
                return;
            }
            try {
                cleanup();
            } catch (Exception e) {
                StreamTask.LOG.warn("Could not properly clean up the async checkpoint runnable.", e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markCancel() {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }

        private void cleanup() throws Exception {
            StreamTask.LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
            Exception exc = null;
            for (OperatorSnapshotFutures operatorSnapshotFutures : this.operatorSnapshotsInProgress.values()) {
                if (operatorSnapshotFutures != null) {
                    try {
                        operatorSnapshotFutures.cancel();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                    }
                }
            }
            if (null != exc) {
                throw exc;
            }
        }

        private void logFailedCleanupAttempt() {
            StreamTask.LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation.class */
    public static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private final boolean needToCancelCheckpoints;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation$AsyncCheckpointState.class */
        public enum AsyncCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED
        }

        public CheckpointingOperation(StreamTask<?, ?> streamTask, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory, CheckpointMetrics checkpointMetrics) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointOptions = (CheckpointOptions) Preconditions.checkNotNull(checkpointOptions);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.storageLocation = (CheckpointStreamFactory) Preconditions.checkNotNull(checkpointStreamFactory);
            ArrayList arrayList = new ArrayList(streamTask.operatorChain.getAllOperatorsTopologySorted());
            Collections.reverse(arrayList);
            this.allOperators = (StreamOperator[]) arrayList.toArray(new StreamOperator[0]);
            this.operatorSnapshotsInProgress = new HashMap(this.allOperators.length);
            this.needToCancelCheckpoints = streamTask.maxConcurrentCheckpoints > 0;
        }

        public void executeCheckpointing() throws Exception {
            long checkpointId = this.checkpointMetaData.getCheckpointId();
            this.owner.removeAbortedCheckpointsOnTriggerOrExecution(checkpointId);
            if (this.owner.checkpointAlreadyAborted(checkpointId)) {
                StreamTask.LOG.info("Checkpoint {} has been notified as aborted, would not execute any checkpointing.", Long.valueOf(checkpointId));
                return;
            }
            if (this.needToCancelCheckpoints) {
                synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                    while (((StreamTask) this.owner).asyncCheckpointOperations.size() >= this.owner.maxConcurrentCheckpoints) {
                        Map.Entry pollFirstEntry = ((StreamTask) this.owner).asyncCheckpointOperations.pollFirstEntry();
                        long longValue = ((Long) pollFirstEntry.getKey()).longValue();
                        AsyncCheckpointRunnable asyncCheckpointRunnable = (AsyncCheckpointRunnable) pollFirstEntry.getValue();
                        Preconditions.checkState(longValue < this.checkpointMetaData.getCheckpointId(), "Unexpected checkpoint " + longValue + " to discard before executing checkpoint " + this.checkpointMetaData.getCheckpointId());
                        StreamTask.LOG.info("{} - Cancel the asynchronous part of previous checkpoint {} before executing checkpoint {}.", new Object[]{this.owner.getName(), Long.valueOf(longValue), Long.valueOf(this.checkpointMetaData.getCheckpointId())});
                        asyncCheckpointRunnable.markCancel();
                        asyncCheckpointRunnable.close();
                    }
                }
            }
            this.startSyncPartNano = System.nanoTime();
            try {
                for (StreamOperator<?> streamOperator : this.allOperators) {
                    checkpointStreamOperator(streamOperator);
                }
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
                }
                this.startAsyncPartNano = System.nanoTime();
                this.checkpointMetrics.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000);
                AsyncCheckpointRunnable asyncCheckpointRunnable2 = new AsyncCheckpointRunnable(this.owner, this.operatorSnapshotsInProgress, this.checkpointMetaData, this.checkpointMetrics, this.startAsyncPartNano);
                if (this.needToCancelCheckpoints) {
                    synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                        ((StreamTask) this.owner).asyncCheckpointOperations.put(Long.valueOf(this.checkpointMetaData.getCheckpointId()), asyncCheckpointRunnable2);
                    }
                }
                ((StreamTask) this.owner).cancelables.registerCloseable(asyncCheckpointRunnable2);
                ((StreamTask) this.owner).asyncOperationsThreadPool.submit(asyncCheckpointRunnable2);
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("{} - finished synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                }
            } catch (Exception e) {
                for (OperatorSnapshotFutures operatorSnapshotFutures : this.operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotFutures) {
                        try {
                            operatorSnapshotFutures.cancel();
                        } catch (Exception e2) {
                            StreamTask.LOG.warn("Could not properly cancel an operator snapshot result.", e2);
                        }
                    }
                }
                if (this.needToCancelCheckpoints) {
                    synchronized (((StreamTask) this.owner).asyncCheckpointOperations) {
                        ((StreamTask) this.owner).asyncCheckpointOperations.remove(Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                    }
                }
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                }
                if (this.checkpointOptions.getCheckpointType().isSynchronous()) {
                    throw e;
                }
                ((StreamTask) this.owner).synchronousCheckpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, e);
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> streamOperator) throws Exception {
            if (null != streamOperator) {
                this.operatorSnapshotsInProgress.put(streamOperator.getOperatorID(), streamOperator.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions, this.storageLocation));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment) {
        this(environment, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable ProcessingTimeService processingTimeService) {
        super(environment);
        this.cancelables = new CloseableRegistry();
        this.initTime = -1L;
        this.streamTaskStateInitializer = null;
        this.timerService = processingTimeService;
        this.streamTaskConfig = StreamTaskConfigCache.deserializeFrom(new StreamTaskConfig(super.getTaskConfiguration()), getUserCodeClassLoader());
        this.streamRecordWriters = createStreamRecordWriters(this.streamTaskConfig, environment);
        List<StreamConfig> chainedHeadNodeConfigs = this.streamTaskConfig.getChainedHeadNodeConfigs();
        this.configuration = chainedHeadNodeConfigs.size() == 0 ? new StreamConfig(new Configuration()) : chainedHeadNodeConfigs.get(0);
        this.maxConcurrentCheckpoints = this.configuration.getConfiguration().getInteger(CheckpointingOptions.CHECKPOINTS_MAX_CONCURRENT_NUM, ((Integer) CheckpointingOptions.CHECKPOINTS_MAX_CONCURRENT_NUM.defaultValue()).intValue());
        this.asyncCheckpointOperations = new TreeMap<>();
        this.abortedCheckpointIds = new ConcurrentSkipListSet();
        this.syncCheckpointLatch = new SynchronousCheckpointLatch();
        this.lockDelegate = new CheckpointLockDelegate(environment.getTaskManagerInfo().getConfiguration().getBoolean(CoreOptions.CHECKPOINT_LOCK_ENABLE_FAIRNESS) ? new FairLock() : new Object());
        LOG.info("Use checkpoint lock type: " + (this.lockDelegate.isFair() ? "fair lock" : "synchronized lock"));
    }

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    @VisibleForTesting
    public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
        if (processingTimeService == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = processingTimeService;
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        if (this.streamTaskStateInitializer == null) {
            this.streamTaskStateInitializer = new StreamTaskStateInitializerImpl(getEnvironment(), this.stateBackend, this.timerService);
        }
        return this.streamTaskStateInitializer;
    }

    @VisibleForTesting
    SynchronousCheckpointLatch getSyncCheckpointLatch() {
        return this.syncCheckpointLatch;
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    protected void finishTask() throws Exception {
    }

    public final void invoke() throws Exception {
        try {
            try {
                LOG.debug("Initializing {}.", getName());
                long currentTimeMillis = System.currentTimeMillis();
                this.initTime = -1L;
                getEnvironment().getMetricGroup().gauge("taskInitTimeMs", () -> {
                    return Long.valueOf(this.initTime);
                });
                this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
                this.synchronousCheckpointExceptionHandler = createCheckpointExceptionHandlerFactory().createCheckpointExceptionHandler(getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment());
                this.asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
                this.stateBackend = createStateBackend();
                this.checkpointStorage = this.stateBackend.createCheckpointStorage(getEnvironment().getJobID());
                this.accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
                if (this.timerService == null) {
                    this.timerService = new SystemProcessingTimeService(this, getCheckpointLock(), new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()));
                }
                this.operatorChain = new OperatorChain(this, this.streamRecordWriters);
                init();
                if (this.canceled) {
                    throw new CancelTaskException();
                }
                LOG.debug("Invoking {}", getName());
                this.lockDelegate.lockAndRun(() -> {
                    initializeState();
                    openAllOperators();
                });
                if (this.canceled) {
                    throw new CancelTaskException();
                }
                this.initTime = System.currentTimeMillis() - currentTimeMillis;
                this.isRunning = true;
                run();
                if (this.canceled) {
                    throw new CancelTaskException();
                }
                LOG.debug("Finished task {}", getName());
                this.lockDelegate.lockAndRun(() -> {
                    closeAllOperators();
                    this.timerService.quiesce();
                    this.isRunning = false;
                });
                this.timerService.awaitPendingAfterQuiesce();
                LOG.debug("Closed operators for task {}", getName());
                this.operatorChain.flushOutputs();
                tryDisposeAllOperators();
                this.isRunning = false;
                tryShutdownTimerService();
                try {
                    this.cancelables.close();
                    shutdownAsyncThreads();
                } catch (Throwable th) {
                    LOG.error("Could not shut down async checkpoint threads", th);
                }
                try {
                    cleanup();
                } catch (Throwable th2) {
                    LOG.error("Error during cleanup of stream task", th2);
                }
                if (1 == 0) {
                    disposeAllOperators();
                }
                if (this.operatorChain != null) {
                    this.lockDelegate.lockAndRun(() -> {
                        this.operatorChain.releaseOutputs();
                    });
                }
            } catch (Throwable th3) {
                LOG.error("Could not execute the task " + getName() + ", aborting the execution", th3);
                throw th3;
            }
        } catch (Throwable th4) {
            this.isRunning = false;
            tryShutdownTimerService();
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th5) {
                LOG.error("Could not shut down async checkpoint threads", th5);
            }
            try {
                cleanup();
            } catch (Throwable th6) {
                LOG.error("Error during cleanup of stream task", th6);
            }
            if (0 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.lockDelegate.lockAndRun(() -> {
                    this.operatorChain.releaseOutputs();
                });
            }
            throw th4;
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            this.syncCheckpointLatch.cancelCheckpointLatch();
            cancelTask();
        } finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            if (next != null) {
                next.open();
            }
        }
    }

    private void closeAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperatorsTopologySorted()) {
            if (streamOperator != null) {
                streamOperator.close();
            }
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            if (next != null) {
                next.dispose();
            }
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
            while (descendingIterator.hasNext()) {
                StreamOperator<?> next = descendingIterator.next();
                if (next != null) {
                    try {
                        next.dispose();
                    } catch (Throwable th) {
                        LOG.error("Error during disposal of stream operator.", th);
                    }
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.streamTaskConfig.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lockDelegate.getLock();
    }

    public CheckpointStorage getCheckpointStorage() {
        return this.checkpointStorage;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public StreamTaskConfigSnapshot getStreamTaskConfig() {
        return this.streamTaskConfig;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    @VisibleForTesting
    int getAbortedCheckpointSize() {
        return this.abortedCheckpointIds.size();
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean z) throws Exception {
        long checkpointId = checkpointMetaData.getCheckpointId();
        removeAbortedCheckpointsOnTriggerOrExecution(checkpointId);
        if (checkpointAlreadyAborted(checkpointId)) {
            LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", Long.valueOf(checkpointId));
            return true;
        }
        try {
            return performCheckpoint(checkpointMetaData, checkpointOptions, new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L), z);
        } catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointId + " for operator " + getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(checkpointId), getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        try {
            if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false) && this.syncCheckpointLatch.isSet()) {
                this.syncCheckpointLatch.blockUntilCheckpointIsAcknowledged();
            }
        } catch (Exception e) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
        } catch (CancelTaskException e2) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(checkpointMetaData.getCheckpointId()));
            throw e2;
        }
    }

    public void abortCheckpointOnBarrier(long j, Throwable th) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", Long.valueOf(j), getName());
        getEnvironment().declineCheckpoint(j, th);
        this.lockDelegate.lockAndRun(() -> {
            this.operatorChain.broadcastCheckpointCancelMarker(j);
        });
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean z) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), checkpointOptions.getCheckpointType(), getName()});
        long checkpointId = checkpointMetaData.getCheckpointId();
        return ((Boolean) this.lockDelegate.lockAndRun(() -> {
            if (this.isRunning) {
                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    this.syncCheckpointLatch.setCheckpointId(checkpointId);
                    if (z) {
                        advanceToEndOfEventTime();
                    }
                }
                this.operatorChain.prepareSnapshotPreBarrier(checkpointId);
                this.operatorChain.broadcastCheckpointBarrier(checkpointId, checkpointMetaData.getTimestamp(), checkpointOptions);
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exc = null;
            Iterator<StreamRecordWriter<StreamRecord<?>>> it = this.streamRecordWriters.iterator();
            while (it.hasNext()) {
                try {
                    it.next().broadcastEvent(cancelCheckpointMarker);
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exc);
                }
            }
            if (exc != null) {
                throw exc;
            }
            return false;
        })).booleanValue();
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (((Boolean) this.lockDelegate.lockAndRun(() -> {
            if (!this.isRunning) {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
                return false;
            }
            LOG.debug("Notification of complete checkpoint {} for task {}", Long.valueOf(j), getName());
            Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
            while (descendingIterator.hasNext()) {
                StreamOperator<?> next = descendingIterator.next();
                if (next != null) {
                    next.notifyCheckpointComplete(j);
                }
            }
            return true;
        })).booleanValue()) {
            this.syncCheckpointLatch.acknowledgeCheckpointAndTrigger(j, this::finishTask);
        }
    }

    public void notifyCheckpointSubsume(long j) throws Exception {
        this.lockDelegate.lockAndRun(() -> {
            if (!this.isRunning) {
                LOG.info("Ignoring notification of subsumed checkpoint for not-running task {}", getName());
                return;
            }
            LOG.info("Notification of subsumed checkpoint {} for task {}", Long.valueOf(j), getName());
            Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
            while (descendingIterator.hasNext()) {
                StreamOperator<?> next = descendingIterator.next();
                if (next != null) {
                    next.notifyCheckpointSubsume(j);
                }
            }
        });
    }

    public void notifyCheckpointAbort(long j) throws Exception {
        AsyncCheckpointRunnable asyncCheckpointRunnable = (AsyncCheckpointRunnable) this.lockDelegate.lockAndRun(() -> {
            AsyncCheckpointRunnable asyncCheckpointRunnable2 = null;
            if (this.isRunning) {
                LOG.info("Notification of aborted checkpoint {} for task {}", Long.valueOf(j), getName());
                if (this.abortedCheckpointIds.size() >= 128) {
                    this.abortedCheckpointIds.pollFirst();
                }
                this.abortedCheckpointIds.add(Long.valueOf(j));
                synchronized (this.asyncCheckpointOperations) {
                    asyncCheckpointRunnable2 = this.asyncCheckpointOperations.remove(Long.valueOf(j));
                }
                Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
                while (descendingIterator.hasNext()) {
                    StreamOperator<?> next = descendingIterator.next();
                    if (next != null) {
                        next.notifyCheckpointAbort(j);
                    }
                }
            } else {
                LOG.info("Ignoring notification of aborted checkpoint for not-running task {}", getName());
            }
            return asyncCheckpointRunnable2;
        });
        if (asyncCheckpointRunnable != null) {
            LOG.info("Abort running async checkpoint {}.", Long.valueOf(j));
            asyncCheckpointRunnable.markCancel();
            asyncCheckpointRunnable.close();
        }
    }

    private void tryShutdownTimerService() {
        if (this.timerService == null || this.timerService.isTerminated()) {
            return;
        }
        try {
            long j = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
            if (!this.timerService.shutdownServiceUninterruptible(j)) {
                LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j));
            }
        } catch (Throwable th) {
            LOG.error("Could not shut down timer service", th);
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        new CheckpointingOperation(this, checkpointMetaData, checkpointOptions, this.checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()), checkpointMetrics).executeCheckpointing();
    }

    private void initializeState() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            if (null != next) {
                next.initializeState();
            }
        }
    }

    private StateBackend createStateBackend() throws Exception {
        return StateBackendLoader.fromApplicationOrConfigOrDefault(this.streamTaskConfig.getStateBackend(), getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
    }

    protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
        return new CheckpointExceptionHandlerFactory();
    }

    private String createOperatorIdentifier(StreamOperator<?> streamOperator) {
        TaskInfo taskInfo = getEnvironment().getTaskInfo();
        return new OperatorSubtaskDescriptionText(streamOperator.getOperatorID(), streamOperator.getClass().getSimpleName(), taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()).toString();
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @VisibleForTesting
    public Map<Long, AsyncCheckpointRunnable> getAsyncCheckpointOperations() {
        return Collections.unmodifiableMap(this.asyncCheckpointOperations);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler
    public void handleAsyncException(String str, Throwable th) {
        if (this.isRunning) {
            getEnvironment().failExternally(th);
        }
    }

    public String toString() {
        return getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeAbortedCheckpointsOnTriggerOrExecution(long j) {
        Iterator<Long> it = this.abortedCheckpointIds.iterator();
        while (it.hasNext() && it.next().longValue() < j) {
            it.remove();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkpointAlreadyAborted(long j) {
        return this.abortedCheckpointIds.remove(Long.valueOf(j));
    }

    @VisibleForTesting
    public static List<StreamRecordWriter<StreamRecord<?>>> createStreamRecordWriters(StreamTaskConfigSnapshot streamTaskConfigSnapshot, Environment environment) {
        ArrayList arrayList = new ArrayList();
        List<StreamEdge> outStreamEdgesOfChain = streamTaskConfigSnapshot.getOutStreamEdgesOfChain();
        Map<Integer, StreamConfig> chainedNodeConfigs = streamTaskConfigSnapshot.getChainedNodeConfigs();
        for (int i = 0; i < outStreamEdgesOfChain.size(); i++) {
            StreamEdge streamEdge = outStreamEdgesOfChain.get(i);
            arrayList.add(createStreamRecordWriter(streamEdge, i, environment, environment.getTaskInfo().getTaskName(), chainedNodeConfigs.get(Integer.valueOf(streamEdge.getSourceId())).getBufferTimeout()));
        }
        return arrayList;
    }

    private static StreamRecordWriter<StreamRecord<?>> createStreamRecordWriter(StreamEdge streamEdge, int i, Environment environment, String str, long j) {
        int numTargetKeyGroups;
        Serializable partitioner = streamEdge.getPartitioner();
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{partitioner, Integer.valueOf(i), str});
        ResultPartitionWriter writer = environment.getWriter(i);
        if ((partitioner instanceof ConfigurableStreamPartitioner) && 0 < (numTargetKeyGroups = writer.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner) partitioner).configure(numTargetKeyGroups);
        }
        StreamRecordWriter<StreamRecord<?>> streamRecordWriter = new StreamRecordWriter<>(writer, partitioner, j, str);
        streamRecordWriter.setMetricGroup(environment.getMetricGroup().getIOMetricGroup(), environment.getExecutionConfig().isTracingMetricsEnabled(), environment.getExecutionConfig().getTracingMetricsInterval());
        return streamRecordWriter;
    }
}
