package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint2.CheckpointBackend;
import org.apache.flink.runtime.checkpoint2.InitialSubtaskSnapshot;
import org.apache.flink.runtime.checkpoint2.SubtaskCheckpointInfoCollector;
import org.apache.flink.runtime.checkpoint2.SubtaskSnapshot;
import org.apache.flink.runtime.checkpoint2.TaskCheckpointConfiguration;
import org.apache.flink.runtime.checkpoint2.file.FileCheckpointBackend;
import org.apache.flink.runtime.checkpoint2.file.FileSegmentCheckpointBackend;
import org.apache.flink.runtime.checkpoint2.heap.HeapCheckpointBackend;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamTaskConfig;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.configuration.TimerServiceOptions;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.LockAndCondition;
import org.apache.flink.util.LockGetReleaseWrapper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask, AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected OperatorChain operatorChain;
    private StreamTaskConfig configuration;
    private TaskCheckpointConfiguration checkpointConfiguration;
    private StateBackend stateBackend;
    private CheckpointBackend checkpointBackend;
    private AbstractKeyedStateBackend<?> keyedStateBackend;
    private ProcessingTimeService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private TaskStateSnapshot taskStateSnapshot;
    private InitialSubtaskSnapshot initialSnapshot;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;
    private CheckpointExceptionHandler checkpointExceptionHandler;
    private final LockAndCondition lock = new LockAndCondition();
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private long currentCheckpointID = 0;
    private List<TaskSnapshotFuture> taskSnapshotFutures = new ArrayList();

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    @VisibleForTesting
    public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
        if (processingTimeService == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = processingTimeService;
    }

    public final void invoke() throws Exception {
        try {
            try {
                LOG.info("Initializing {}.", getName());
                this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
                this.configuration = new StreamTaskConfig(getTaskConfiguration());
                this.checkpointConfiguration = this.configuration.getCheckpointConfig(getUserCodeClassLoader());
                this.checkpointExceptionHandler = createCheckpointExceptionHandlerFactory().createCheckpointExceptionHandler(this.checkpointConfiguration.isFailTaskOnCheckpointError(), getEnvironment());
                this.checkpointBackend = createCheckpointBackend();
                this.stateBackend = createStateBackend();
                this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
                if (this.timerService == null) {
                    this.timerService = new SystemProcessingTimeService(this, getCheckpointLock(), new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
                }
                this.operatorChain = new OperatorChain(this);
                init();
                if (this.canceled) {
                    throw new CancelTaskException();
                }
                LOG.info("Invoking {}", getName());
                LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
                Throwable th = null;
                try {
                    try {
                        restoreAllOperators();
                        openAllOperators();
                        if (lockGetReleaseWrapper != null) {
                            if (0 != 0) {
                                try {
                                    lockGetReleaseWrapper.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                lockGetReleaseWrapper.close();
                            }
                        }
                        if (this.canceled) {
                            throw new CancelTaskException();
                        }
                        this.isRunning = true;
                        run();
                        if (this.canceled) {
                            throw new CancelTaskException();
                        }
                        LOG.info("Finished task {}", getName());
                        LockGetReleaseWrapper lockGetReleaseWrapper2 = new LockGetReleaseWrapper(this.lock.getLock());
                        Throwable th3 = null;
                        try {
                            closeAllOperators();
                            this.timerService.quiesce();
                            this.isRunning = false;
                            if (lockGetReleaseWrapper2 != null) {
                                if (0 != 0) {
                                    try {
                                        lockGetReleaseWrapper2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    lockGetReleaseWrapper2.close();
                                }
                            }
                            this.timerService.awaitPendingAfterQuiesce();
                            LOG.info("Closed operators for task {}", getName());
                            this.operatorChain.flushOutputs();
                            tryDisposeAllOperators();
                            this.isRunning = false;
                            if (this.timerService != null && !this.timerService.isTerminated()) {
                                try {
                                    long j = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
                                    if (!this.timerService.shutdownAndAwaitPending(j, TimeUnit.MILLISECONDS)) {
                                        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j));
                                    }
                                } catch (Throwable th5) {
                                    LOG.error("Could not shut down timer service", th5);
                                }
                            }
                            try {
                                this.cancelables.close();
                                shutdownAsyncThreads();
                            } catch (Throwable th6) {
                                LOG.error("Could not shut down async checkpoint threads", th6);
                            }
                            try {
                                cleanup();
                            } catch (Throwable th7) {
                                LOG.error("Error during cleanup of stream task", th7);
                            }
                            if (1 == 0) {
                                disposeAllOperators();
                            }
                            if (this.operatorChain != null) {
                                this.operatorChain.releaseOutputs();
                            }
                        } catch (Throwable th8) {
                            if (lockGetReleaseWrapper2 != null) {
                                if (0 != 0) {
                                    try {
                                        lockGetReleaseWrapper2.close();
                                    } catch (Throwable th9) {
                                        th3.addSuppressed(th9);
                                    }
                                } else {
                                    lockGetReleaseWrapper2.close();
                                }
                            }
                            throw th8;
                        }
                    } catch (Throwable th10) {
                        th = th10;
                        throw th10;
                    }
                } catch (Throwable th11) {
                    if (lockGetReleaseWrapper != null) {
                        if (th != null) {
                            try {
                                lockGetReleaseWrapper.close();
                            } catch (Throwable th12) {
                                th.addSuppressed(th12);
                            }
                        } else {
                            lockGetReleaseWrapper.close();
                        }
                    }
                    throw th11;
                }
            } catch (Throwable th13) {
                LOG.error("Could not execute the task " + getName() + ", aborting the execution", th13);
                throw th13;
            }
        } catch (Throwable th14) {
            this.isRunning = false;
            if (this.timerService != null && !this.timerService.isTerminated()) {
                try {
                    long j2 = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TimerServiceOptions.TIMER_SERVICE_TERMINATION_AWAIT_MS);
                    if (!this.timerService.shutdownAndAwaitPending(j2, TimeUnit.MILLISECONDS)) {
                        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j2));
                    }
                } catch (Throwable th15) {
                    LOG.error("Could not shut down timer service", th15);
                }
            }
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th16) {
                LOG.error("Could not shut down async checkpoint threads", th16);
            }
            try {
                cleanup();
            } catch (Throwable th17) {
                LOG.error("Error during cleanup of stream task", th17);
            }
            if (0 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            throw th14;
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            cancelTask();
            if (this.checkpointConfiguration.isUseSharingFile()) {
                cancelEarlierCheckpoints();
            }
            if (this.checkpointBackend != null) {
                this.checkpointBackend.close();
            }
        } finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            if (next != null) {
                next.open();
            }
        }
    }

    private void closeAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperatorsTopologySorted()) {
            if (streamOperator != null) {
                streamOperator.close();
            }
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            if (next != null) {
                next.dispose();
            }
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
            while (descendingIterator.hasNext()) {
                StreamOperator<?> next = descendingIterator.next();
                if (next != null) {
                    try {
                        next.dispose();
                    } catch (Throwable th) {
                        LOG.error("Error during disposal of stream operator.", th);
                    }
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public LockAndCondition getCheckpointLock() {
        return this.lock;
    }

    public StreamTaskConfig getConfiguration() {
        return this.configuration;
    }

    public TaskCheckpointConfiguration getCheckpointConfiguration() {
        return this.checkpointConfiguration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshot = taskStateSnapshot;
    }

    public void setInitialSnapshot(InitialSubtaskSnapshot initialSubtaskSnapshot) {
        this.initialSnapshot = initialSubtaskSnapshot;
    }

    public boolean triggerCheckpoint(long j, long j2, CheckpointOptions checkpointOptions) throws Exception {
        try {
            return performCheckpoint(j, j2, checkpointOptions, new SubtaskCheckpointInfoCollector());
        } catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + j + " for operator " + getName() + '.', e);
            }
            LOG.info("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(j), getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(long j, long j2, CheckpointOptions checkpointOptions, SubtaskCheckpointInfoCollector subtaskCheckpointInfoCollector) throws Exception {
        try {
            performCheckpoint(j, j2, checkpointOptions, subtaskCheckpointInfoCollector);
        } catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(j));
            throw e;
        } catch (Exception e2) {
            throw new Exception("Could not perform checkpoint " + j + " for operator " + getName() + '.', e2);
        }
    }

    public void abortCheckpointOnBarrier(long j, Throwable th) throws Exception {
        LOG.info("Aborting checkpoint via cancel-barrier {} for task {}", Long.valueOf(j), getName());
        getEnvironment().declineCheckpoint(j, th);
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
        Throwable th2 = null;
        try {
            try {
                this.operatorChain.broadcastCheckpointCancelMarker(j);
                if (lockGetReleaseWrapper != null) {
                    if (0 == 0) {
                        lockGetReleaseWrapper.close();
                        return;
                    }
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (lockGetReleaseWrapper != null) {
                if (th2 != null) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            throw th5;
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
        Throwable th = null;
        try {
            if (this.isRunning) {
                LOG.info("Notification of complete checkpoint for task {}", getName());
                Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
                while (descendingIterator.hasNext()) {
                    StreamOperator<?> next = descendingIterator.next();
                    if (next != null) {
                        next.notifyOfCompletedCheckpoint(j);
                    }
                }
            } else {
                LOG.info("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
            if (lockGetReleaseWrapper != null) {
                if (0 == 0) {
                    lockGetReleaseWrapper.close();
                    return;
                }
                try {
                    lockGetReleaseWrapper.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            throw th3;
        }
    }

    public void notifyCheckpointSubsume(long j) throws Exception {
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
        Throwable th = null;
        try {
            if (this.isRunning) {
                LOG.info("Notification of subsumed checkpoint for task {}", getName());
                Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
                while (descendingIterator.hasNext()) {
                    StreamOperator<?> next = descendingIterator.next();
                    if (next != null) {
                        next.notifyOfSubsumedCheckpoint(j);
                    }
                }
            } else {
                LOG.info("Ignoring notification of subsumed checkpoint for not-running task {}", getName());
            }
            if (lockGetReleaseWrapper != null) {
                if (0 == 0) {
                    lockGetReleaseWrapper.close();
                    return;
                }
                try {
                    lockGetReleaseWrapper.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            throw th3;
        }
    }

    public void notifyCheckpointAbort(long j) throws Exception {
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
        Throwable th = null;
        try {
            if (this.isRunning) {
                LOG.info("Notification of aborted checkpoint for task {}", getName());
                Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
                while (descendingIterator.hasNext()) {
                    StreamOperator<?> next = descendingIterator.next();
                    if (next != null) {
                        next.notifyOfAbortedCheckpoint(j);
                    }
                }
            } else {
                LOG.info("Ignoring notification of aborted checkpoint for not-running task {}", getName());
            }
            if (lockGetReleaseWrapper != null) {
                if (0 == 0) {
                    lockGetReleaseWrapper.close();
                    return;
                }
                try {
                    lockGetReleaseWrapper.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
            throw th3;
        }
    }

    private boolean performCheckpoint(long j, long j2, CheckpointOptions checkpointOptions, SubtaskCheckpointInfoCollector subtaskCheckpointInfoCollector) throws Exception {
        LOG.info("Starting checkpoint ({}) {} on task {}", new Object[]{Long.valueOf(j), checkpointOptions.getCheckpointType(), getName()});
        this.currentCheckpointID = j;
        LockGetReleaseWrapper lockGetReleaseWrapper = new LockGetReleaseWrapper(this.lock.getLock());
        Throwable th = null;
        try {
            if (this.isRunning) {
                this.operatorChain.prepareSnapshotPreBarrier(j);
                this.operatorChain.broadcastCheckpointBarrier(j, j2, checkpointOptions);
                if (this.checkpointConfiguration.isUseSharingFile()) {
                    cancelEarlierCheckpoints();
                }
                snapshotAllOperators(j, j2, checkpointOptions, subtaskCheckpointInfoCollector);
                if (lockGetReleaseWrapper != null) {
                    if (0 != 0) {
                        try {
                            lockGetReleaseWrapper.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockGetReleaseWrapper.close();
                    }
                }
                return true;
            }
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(j);
            Exception exc = null;
            for (ResultPartitionWriter resultPartitionWriter : getEnvironment().getAllWriters()) {
                try {
                    resultPartitionWriter.broadcastEvent(cancelCheckpointMarker);
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exc);
                }
            }
            if (exc != null) {
                throw exc;
            }
            return false;
        } finally {
            if (lockGetReleaseWrapper != null) {
                if (0 != 0) {
                    try {
                        lockGetReleaseWrapper.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    lockGetReleaseWrapper.close();
                }
            }
        }
    }

    private void restoreAllOperators() throws Exception {
        Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
        while (descendingIterator.hasNext()) {
            StreamOperator<?> next = descendingIterator.next();
            next.initializeState(this.taskStateSnapshot == null ? null : this.taskStateSnapshot.getSubtaskStateByOperatorID(next.getOperatorID()));
            next.restore(this.initialSnapshot == null ? null : this.initialSnapshot.getOperatorSnapshot(next.getOperatorID()));
        }
    }

    private void snapshotAllOperators(final long j, long j2, CheckpointOptions checkpointOptions, final SubtaskCheckpointInfoCollector subtaskCheckpointInfoCollector) throws Exception {
        CheckpointBackend createCheckpointBackend;
        if (checkpointOptions.getCheckpointType() == CheckpointOptions.CheckpointType.CHECKPOINT) {
            Preconditions.checkState(this.checkpointBackend != null);
            createCheckpointBackend = this.checkpointBackend;
        } else {
            Preconditions.checkState(checkpointOptions.getTargetLocation() != null);
            createCheckpointBackend = createCheckpointBackend();
        }
        subtaskCheckpointInfoCollector.setSnapshotStartInstant(System.currentTimeMillis());
        final HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        try {
            Iterator<StreamOperator<?>> descendingIterator = this.operatorChain.getAllOperatorsTopologySorted().descendingIterator();
            while (descendingIterator.hasNext()) {
                StreamOperator<?> next = descendingIterator.next();
                hashMap.put(next.getOperatorID(), next.snapshotState(j, j2, checkpointOptions));
                hashMap2.put(next.getOperatorID(), next.snapshot(j, checkpointOptions, createCheckpointBackend));
            }
            subtaskCheckpointInfoCollector.setSnapshotEndInstant(System.currentTimeMillis());
            final TaskSnapshotFuture taskSnapshotFuture = new TaskSnapshotFuture(hashMap2);
            this.cancelables.registerCloseable(taskSnapshotFuture);
            synchronized (this.taskSnapshotFutures) {
                this.taskSnapshotFutures.add(taskSnapshotFuture);
            }
            LOG.info("Successfully complete the synchronous part of checkpoint {}, {} ms", Long.valueOf(j), Long.valueOf(subtaskCheckpointInfoCollector.getSnapshotDuration()));
            this.asyncOperationsThreadPool.submit(new Runnable() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTask.1
                @Override // java.lang.Runnable
                public void run() {
                    HashMap hashMap3 = new HashMap();
                    try {
                        subtaskCheckpointInfoCollector.setMaterializeStartInstant(System.currentTimeMillis());
                        for (Map.Entry entry : hashMap.entrySet()) {
                            OperatorID operatorID = (OperatorID) entry.getKey();
                            OperatorSnapshotResult operatorSnapshotResult = (OperatorSnapshotResult) entry.getValue();
                            OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState((OperatorStateHandle) FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getOperatorStateManagedFuture()), (OperatorStateHandle) FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getOperatorStateRawFuture()), (KeyedStateHandle) FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getKeyedStateManagedFuture()), (KeyedStateHandle) FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getKeyedStateRawFuture()));
                            if (operatorSubtaskState.hasState()) {
                                hashMap3.put(operatorID, operatorSubtaskState);
                            }
                        }
                        Map map = (Map) FutureUtil.runIfNotDoneAndGet(taskSnapshotFuture);
                        StreamTask.this.cancelables.unregisterCloseable(taskSnapshotFuture);
                        synchronized (StreamTask.this.taskSnapshotFutures) {
                            StreamTask.this.taskSnapshotFutures.remove(taskSnapshotFuture);
                        }
                        subtaskCheckpointInfoCollector.setMaterializeEndInstant(System.currentTimeMillis());
                        StreamTask.LOG.info("Successfully complete the asynchronous part of checkpoint {}, {} ms", Long.valueOf(j), Long.valueOf(subtaskCheckpointInfoCollector.getMaterializeDuration()));
                        StreamTask.LOG.info("Successfully complete checkpoint {}, {} ms", Long.valueOf(j), Long.valueOf(subtaskCheckpointInfoCollector.getDuration()));
                        StreamTask.this.getEnvironment().acknowledgeCheckpoint(j, hashMap3.isEmpty() ? null : new TaskStateSnapshot(hashMap3), new SubtaskSnapshot(subtaskCheckpointInfoCollector.getAlignStartInstant(), subtaskCheckpointInfoCollector.getAlignEndInstant(), subtaskCheckpointInfoCollector.getSnapshotStartInstant(), subtaskCheckpointInfoCollector.getSnapshotEndInstant(), subtaskCheckpointInfoCollector.getMaterializeStartInstant(), subtaskCheckpointInfoCollector.getMaterializeEndInstant(), map));
                    } catch (Exception e) {
                        StreamTask.LOG.warn("Could not properly complete the asynchronous part of checkpoint " + j + ".", e);
                        Iterator it = hashMap.values().iterator();
                        while (it.hasNext()) {
                            try {
                                ((OperatorSnapshotResult) it.next()).cancel();
                            } catch (Throwable th) {
                                StreamTask.LOG.warn("Could not properly cancel the snapshot of the operator.", th);
                            }
                        }
                        taskSnapshotFuture.cancel(true);
                        StreamTask.this.cancelables.unregisterCloseable(taskSnapshotFuture);
                        synchronized (StreamTask.this.taskSnapshotFutures) {
                            StreamTask.this.taskSnapshotFutures.remove(taskSnapshotFuture);
                            try {
                                StreamTask.this.checkpointExceptionHandler.tryHandleCheckpointException(j, e);
                            } catch (Exception e2) {
                                StreamTask.this.handleAsyncException("Failure in asynchronous checkpoint materialization", new AsynchronousException(e2));
                            }
                        }
                    }
                }
            });
        } catch (Exception e) {
            LOG.warn("Could not properly complete the synchronous part of checkpoint " + j + ".", e);
            Iterator it = hashMap.values().iterator();
            while (it.hasNext()) {
                try {
                    ((OperatorSnapshotResult) it.next()).cancel();
                } catch (Throwable th) {
                    LOG.warn("Could not properly cancel the snapshot of the operator.", th);
                }
            }
            Iterator it2 = hashMap2.values().iterator();
            while (it2.hasNext()) {
                ((RunnableFuture) it2.next()).cancel(true);
            }
            this.checkpointExceptionHandler.tryHandleCheckpointException(j, e);
        }
    }

    @VisibleForTesting
    void cancelEarlierCheckpoints() {
        LOG.info("Start to cancel earlier checkpoints for checkpoint [{}] [{}]", Long.valueOf(this.currentCheckpointID), this.taskSnapshotFutures);
        synchronized (this.taskSnapshotFutures) {
            Iterator<TaskSnapshotFuture> it = this.taskSnapshotFutures.iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                    it.remove();
                    LOG.info("{} - Canceled the asynchronous part of checkpoint before [{}]", Thread.currentThread().getName(), Long.valueOf(this.currentCheckpointID));
                } catch (IOException e) {
                    LOG.warn("{} - Could not properly cancel the asynchronous part of checkpoint before [{}].", Thread.currentThread().getName(), Long.valueOf(this.currentCheckpointID));
                }
            }
        }
    }

    private CheckpointBackend createCheckpointBackend() throws Exception {
        Path checkpointPath = this.checkpointConfiguration.getCheckpointPath();
        if (checkpointPath == null) {
            return new HeapCheckpointBackend();
        }
        Path path = new Path(checkpointPath, "data");
        int numDirectoryLevels = this.checkpointConfiguration.getNumDirectoryLevels();
        int numDirsPerLevel = this.checkpointConfiguration.getNumDirsPerLevel();
        return this.checkpointConfiguration.isUseSharingFile() ? new FileSegmentCheckpointBackend(path, this.checkpointConfiguration.getMinFileSize(), this.checkpointConfiguration.getFileSegmentWriteBufferSize(), numDirectoryLevels, numDirsPerLevel, this.checkpointConfiguration.getRetryNumWhenCloseFile()) : new FileCheckpointBackend(path, this.checkpointConfiguration.getMaxHeapSize(), numDirectoryLevels, numDirsPerLevel);
    }

    private StateBackend createStateBackend() throws Exception {
        AbstractStateBackend stateBackend = this.configuration.getStateBackend(getUserCodeClassLoader());
        if (stateBackend == null) {
            return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), this.checkpointConfiguration.getCheckpointPath(), LOG);
        }
        LOG.info("Using user-defined state backend: {}.", stateBackend);
        return stateBackend;
    }

    public OperatorStateBackend createOperatorStateBackend(StreamOperator<?> streamOperator, Collection<OperatorStateHandle> collection) throws Exception {
        OperatorStateBackend createOperatorStateBackend = this.stateBackend.createOperatorStateBackend(getEnvironment(), createOperatorIdentifier(streamOperator, streamOperator.getOperatorContext().getNodeID()));
        this.cancelables.registerCloseable(createOperatorStateBackend);
        if (null != collection) {
            createOperatorStateBackend.restore(collection);
        }
        return createOperatorStateBackend;
    }

    public <K, O> AbstractKeyedStateBackend<K> createKeyedStateBackend(StreamOperator<O> streamOperator, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange) throws Exception {
        if (this.keyedStateBackend != null) {
            return (AbstractKeyedStateBackend<K>) this.keyedStateBackend;
        }
        this.keyedStateBackend = this.stateBackend.createKeyedStateBackend(getEnvironment(), getEnvironment().getJobID(), createOperatorIdentifier(streamOperator, streamOperator.getOperatorContext().getNodeID()), typeSerializer, i, keyGroupRange, getEnvironment().getTaskKvStateRegistry());
        this.cancelables.registerCloseable(this.keyedStateBackend);
        Collection collection = null;
        if (this.taskStateSnapshot != null) {
            OperatorSubtaskState subtaskStateByOperatorID = this.taskStateSnapshot.getSubtaskStateByOperatorID(streamOperator.getOperatorID());
            collection = subtaskStateByOperatorID != null ? subtaskStateByOperatorID.getManagedKeyedState() : null;
        }
        this.keyedStateBackend.restore(collection);
        return (AbstractKeyedStateBackend<K>) this.keyedStateBackend;
    }

    public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> streamOperator) throws IOException {
        return this.stateBackend.createStreamFactory(getEnvironment().getJobID(), createOperatorIdentifier(streamOperator, streamOperator.getOperatorContext().getNodeID()));
    }

    public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> streamOperator, String str) throws IOException {
        return this.stateBackend.createSavepointStreamFactory(getEnvironment().getJobID(), createOperatorIdentifier(streamOperator, streamOperator.getOperatorContext().getNodeID()), str);
    }

    protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
        return new CheckpointExceptionHandlerFactory();
    }

    private String createOperatorIdentifier(StreamOperator<?> streamOperator, int i) {
        TaskInfo taskInfo = getEnvironment().getTaskInfo();
        return streamOperator.getClass().getSimpleName() + "_" + streamOperator.getOperatorID() + "_(" + taskInfo.getIndexOfThisSubtask() + "/" + taskInfo.getNumberOfParallelSubtasks() + ")";
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler
    public void handleAsyncException(String str, Throwable th) {
        if (this.isRunning) {
            getEnvironment().failExternally(th);
        }
    }

    public String toString() {
        return getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }
}
