package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask, AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    private StreamConfig configuration;
    private StateBackend stateBackend;
    private AbstractKeyedStateBackend<?> keyedStateBackend;
    private ProcessingTimeService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private TaskStateHandles restoreStateHandles;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;
    private final Object lock = new Object();
    private final CloseableRegistry cancelables = new CloseableRegistry();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$AsyncCheckpointRunnable.class */
    public static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        private final StreamTask<?, ?> owner;
        private final List<OperatorSnapshotResult> snapshotInProgressList;
        private RunnableFuture<KeyedStateHandle> futureKeyedBackendStateHandles;
        private RunnableFuture<KeyedStateHandle> futureKeyedStreamStateHandles;
        private List<StreamStateHandle> nonPartitionedStateHandles;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<>(CheckpointingOperation.AsynCheckpointState.RUNNING);

        AsyncCheckpointRunnable(StreamTask<?, ?> streamTask, List<StreamStateHandle> list, List<OperatorSnapshotResult> list2, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long j) {
            OperatorSnapshotResult operatorSnapshotResult;
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.snapshotInProgressList = (List) Preconditions.checkNotNull(list2);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.nonPartitionedStateHandles = list;
            this.asyncStartNanos = j;
            if (list2.isEmpty() || null == (operatorSnapshotResult = list2.get(list2.size() - 1))) {
                return;
            }
            this.futureKeyedBackendStateHandles = operatorSnapshotResult.getKeyedStateManagedFuture();
            this.futureKeyedStreamStateHandles = operatorSnapshotResult.getKeyedStateRawFuture();
        }

        @Override // java.lang.Runnable
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                try {
                    KeyedStateHandle keyedStateHandle = (KeyedStateHandle) FutureUtil.runIfNotDoneAndGet(this.futureKeyedBackendStateHandles);
                    KeyedStateHandle keyedStateHandle2 = (KeyedStateHandle) FutureUtil.runIfNotDoneAndGet(this.futureKeyedStreamStateHandles);
                    ArrayList arrayList = new ArrayList(this.snapshotInProgressList.size());
                    ArrayList arrayList2 = new ArrayList(this.snapshotInProgressList.size());
                    for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                        if (null != operatorSnapshotResult) {
                            arrayList.add(FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getOperatorStateManagedFuture()));
                            arrayList2.add(FutureUtil.runIfNotDoneAndGet(operatorSnapshotResult.getOperatorStateRawFuture()));
                        } else {
                            arrayList.add(null);
                            arrayList2.add(null);
                        }
                    }
                    long nanoTime = (System.nanoTime() - this.asyncStartNanos) / 1000000;
                    this.checkpointMetrics.setAsyncDurationMillis(nanoTime);
                    SubtaskState createSubtaskStateFromSnapshotStateHandles = createSubtaskStateFromSnapshotStateHandles(new ChainedStateHandle<>(this.nonPartitionedStateHandles), new ChainedStateHandle<>(arrayList), new ChainedStateHandle<>(arrayList2), keyedStateHandle, keyedStateHandle2);
                    if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
                        this.owner.getEnvironment().acknowledgeCheckpoint(this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics, createSubtaskStateFromSnapshotStateHandles);
                        if (StreamTask.LOG.isDebugEnabled()) {
                            StreamTask.LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(nanoTime)});
                        }
                    } else {
                        StreamTask.LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                    }
                    ((StreamTask) this.owner).cancelables.unregisterClosable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                } catch (Exception e) {
                    this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.COMPLETED, CheckpointingOperation.AsynCheckpointState.RUNNING);
                    try {
                        cleanup();
                    } catch (Exception e2) {
                        e.addSuppressed(e2);
                    }
                    this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", new AsynchronousException(new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', e)));
                    ((StreamTask) this.owner).cancelables.unregisterClosable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                }
            } catch (Throwable th) {
                ((StreamTask) this.owner).cancelables.unregisterClosable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                throw th;
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                cleanup();
            } catch (Exception e) {
                StreamTask.LOG.warn("Could not properly clean up the async checkpoint runnable.", e);
            }
        }

        private SubtaskState createSubtaskStateFromSnapshotStateHandles(ChainedStateHandle<StreamStateHandle> chainedStateHandle, ChainedStateHandle<OperatorStateHandle> chainedStateHandle2, ChainedStateHandle<OperatorStateHandle> chainedStateHandle3, KeyedStateHandle keyedStateHandle, KeyedStateHandle keyedStateHandle2) {
            if ((keyedStateHandle == null && keyedStateHandle2 == null && chainedStateHandle2.isEmpty() && chainedStateHandle3.isEmpty() && chainedStateHandle.isEmpty()) ? false : true) {
                return new SubtaskState(chainedStateHandle, chainedStateHandle2, chainedStateHandle3, keyedStateHandle, keyedStateHandle2);
            }
            return null;
        }

        private void cleanup() throws Exception {
            if (!this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
                StreamTask.LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                return;
            }
            StreamTask.LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
            Exception exc = null;
            for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                if (operatorSnapshotResult != null) {
                    try {
                        operatorSnapshotResult.cancel();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                    }
                }
            }
            try {
                StateUtil.bestEffortDiscardAllStateObjects(this.nonPartitionedStateHandles);
            } catch (Exception e2) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
            }
            if (null != exc) {
                throw exc;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation.class */
    public static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private final List<StreamStateHandle> nonPartitionedStates;
        private final List<OperatorSnapshotResult> snapshotInProgressList;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation$AsynCheckpointState.class */
        public enum AsynCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED
        }

        public CheckpointingOperation(StreamTask<?, ?> streamTask, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointOptions = (CheckpointOptions) Preconditions.checkNotNull(checkpointOptions);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.allOperators = streamTask.operatorChain.getAllOperators();
            this.nonPartitionedStates = new ArrayList(this.allOperators.length);
            this.snapshotInProgressList = new ArrayList(this.allOperators.length);
        }

        public void executeCheckpointing() throws Exception {
            this.startSyncPartNano = System.nanoTime();
            boolean z = true;
            try {
                for (StreamOperator<?> streamOperator : this.allOperators) {
                    checkpointStreamOperator(streamOperator);
                }
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
                }
                this.startAsyncPartNano = System.nanoTime();
                this.checkpointMetrics.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000);
                runAsyncCheckpointingAndAcknowledge();
                z = false;
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("{} - finished synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                }
                if (0 != 0) {
                    for (OperatorSnapshotResult operatorSnapshotResult : this.snapshotInProgressList) {
                        if (null != operatorSnapshotResult) {
                            try {
                                operatorSnapshotResult.cancel();
                            } catch (Exception e) {
                                StreamTask.LOG.warn("Could not properly cancel an operator snapshot result.", e);
                            }
                        }
                    }
                    for (StreamStateHandle streamStateHandle : this.nonPartitionedStates) {
                        if (streamStateHandle != null) {
                            try {
                                streamStateHandle.discardState();
                            } catch (Exception e2) {
                                StreamTask.LOG.warn("Could not properly discard a non partitioned state. This might leave some orphaned files behind.", e2);
                            }
                        }
                    }
                    if (StreamTask.LOG.isDebugEnabled()) {
                        StreamTask.LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                    }
                }
            } catch (Throwable th) {
                if (z) {
                    for (OperatorSnapshotResult operatorSnapshotResult2 : this.snapshotInProgressList) {
                        if (null != operatorSnapshotResult2) {
                            try {
                                operatorSnapshotResult2.cancel();
                            } catch (Exception e3) {
                                StreamTask.LOG.warn("Could not properly cancel an operator snapshot result.", e3);
                            }
                        }
                    }
                    for (StreamStateHandle streamStateHandle2 : this.nonPartitionedStates) {
                        if (streamStateHandle2 != null) {
                            try {
                                streamStateHandle2.discardState();
                            } catch (Exception e4) {
                                StreamTask.LOG.warn("Could not properly discard a non partitioned state. This might leave some orphaned files behind.", e4);
                            }
                        }
                    }
                    if (StreamTask.LOG.isDebugEnabled()) {
                        StreamTask.LOG.debug("{} - did NOT finish synchronous part of checkpoint {}.Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                    }
                }
                throw th;
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> streamOperator) throws Exception {
            if (null != streamOperator) {
                this.nonPartitionedStates.add(streamOperator.snapshotLegacyOperatorState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions));
                this.snapshotInProgressList.add(streamOperator.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions));
            } else {
                this.nonPartitionedStates.add(null);
                this.snapshotInProgressList.add(new OperatorSnapshotResult());
            }
        }

        public void runAsyncCheckpointingAndAcknowledge() throws IOException {
            AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(this.owner, this.nonPartitionedStates, this.snapshotInProgressList, this.checkpointMetaData, this.checkpointMetrics, this.startAsyncPartNano);
            ((StreamTask) this.owner).cancelables.registerClosable(asyncCheckpointRunnable);
            ((StreamTask) this.owner).asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
        }
    }

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
        if (processingTimeService == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = processingTimeService;
    }

    public final void invoke() throws Exception {
        try {
            LOG.debug("Initializing {}.", getName());
            this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
            this.configuration = new StreamConfig(getTaskConfiguration());
            this.stateBackend = createStateBackend();
            this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
            if (this.timerService == null) {
                this.timerService = new SystemProcessingTimeService(this, getCheckpointLock(), new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
            }
            this.operatorChain = new OperatorChain<>(this);
            this.headOperator = this.operatorChain.getHeadOperator();
            init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", getName());
            synchronized (this.lock) {
                initializeState();
                openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            run();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.timerService.quiesceAndAwaitPending();
            LOG.debug("Finished task {}", getName());
            synchronized (this.lock) {
                this.isRunning = false;
                closeAllOperators();
            }
            LOG.debug("Closed operators for task {}", getName());
            this.operatorChain.flushOutputs();
            tryDisposeAllOperators();
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownService();
                } catch (Throwable th) {
                    LOG.error("Could not shut down timer service", th);
                }
            }
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th2) {
                LOG.error("Could not shut down async checkpoint threads", th2);
            }
            try {
                cleanup();
            } catch (Throwable th3) {
                LOG.error("Error during cleanup of stream task", th3);
            }
            if (1 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
        } catch (Throwable th4) {
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownService();
                } catch (Throwable th5) {
                    LOG.error("Could not shut down timer service", th5);
                }
            }
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th6) {
                LOG.error("Could not shut down async checkpoint threads", th6);
            }
            try {
                cleanup();
            } catch (Throwable th7) {
                LOG.error("Error during cleanup of stream task", th7);
            }
            if (0 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            throw th4;
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            cancelTask();
        } finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.open();
            }
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int length = allOperators.length - 1; length >= 0; length--) {
            StreamOperator<?> streamOperator = allOperators[length];
            if (streamOperator != null) {
                streamOperator.close();
            }
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.dispose();
            }
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                if (streamOperator != null) {
                    try {
                        streamOperator.dispose();
                    } catch (Throwable th) {
                        LOG.error("Error during disposal of stream operator.", th);
                    }
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(TaskStateHandles taskStateHandles) {
        this.restoreStateHandles = taskStateHandles;
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            return performCheckpoint(checkpointMetaData, checkpointOptions, new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L));
        } catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        try {
            performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        } catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(checkpointMetaData.getCheckpointId()));
            throw e;
        } catch (Exception e2) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e2);
        }
    }

    public void abortCheckpointOnBarrier(long j, Throwable th) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", Long.valueOf(j), getName());
        getEnvironment().declineCheckpoint(j, th);
        synchronized (this.lock) {
            this.operatorChain.broadcastCheckpointCancelMarker(j);
        }
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), checkpointOptions.getCheckpointType(), getName()});
        synchronized (this.lock) {
            if (this.isRunning) {
                this.operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exc = null;
            for (ResultPartitionWriter resultPartitionWriter : getEnvironment().getAllWriters()) {
                try {
                    resultPartitionWriter.writeBufferToAllChannels(EventSerializer.toBuffer(cancelCheckpointMarker));
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exc);
                }
            }
            if (exc != null) {
                throw exc;
            }
            return false;
        }
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        synchronized (this.lock) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", getName());
                for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                    if (streamOperator != null) {
                        streamOperator.notifyOfCompletedCheckpoint(j);
                    }
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        new CheckpointingOperation(this, checkpointMetaData, checkpointOptions, checkpointMetrics).executeCheckpointing();
    }

    private void initializeState() throws Exception {
        if (!(null != this.restoreStateHandles)) {
            initializeOperators(false);
            return;
        }
        checkRestorePreconditions(this.operatorChain.getChainLength());
        initializeOperators(true);
        this.restoreStateHandles = null;
    }

    private void initializeOperators(boolean z) throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = 0; i < allOperators.length; i++) {
            StreamOperator<?> streamOperator = allOperators[i];
            if (null != streamOperator) {
                if (!z || this.restoreStateHandles == null) {
                    streamOperator.initializeState(null);
                } else {
                    streamOperator.initializeState(new OperatorStateHandles(this.restoreStateHandles, i));
                }
            }
        }
    }

    private void checkRestorePreconditions(int i) {
        ChainedStateHandle legacyOperatorState = this.restoreStateHandles.getLegacyOperatorState();
        List managedOperatorState = this.restoreStateHandles.getManagedOperatorState();
        if (legacyOperatorState != null) {
            Preconditions.checkState(legacyOperatorState.getLength() == i, "Invalid Invalid number of operator states. Found :" + legacyOperatorState.getLength() + ". Expected: " + i);
        }
        if (CollectionUtil.isNullOrEmpty(managedOperatorState)) {
            return;
        }
        Preconditions.checkArgument(managedOperatorState.size() == i, "Invalid number of operator states. Found :" + managedOperatorState.size() + ". Expected: " + i);
    }

    private StateBackend createStateBackend() throws Exception {
        AbstractStateBackend stateBackend = this.configuration.getStateBackend(getUserCodeClassLoader());
        if (stateBackend == null) {
            return AbstractStateBackend.loadStateBackendFromConfigOrCreateDefault(getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
        }
        LOG.info("Using user-defined state backend: {}.", stateBackend);
        return stateBackend;
    }

    public OperatorStateBackend createOperatorStateBackend(StreamOperator<?> streamOperator, Collection<OperatorStateHandle> collection) throws Exception {
        OperatorStateBackend createOperatorStateBackend = this.stateBackend.createOperatorStateBackend(getEnvironment(), createOperatorIdentifier(streamOperator, getConfiguration().getVertexID().intValue()));
        this.cancelables.registerClosable(createOperatorStateBackend);
        if (null != collection) {
            createOperatorStateBackend.restore(collection);
        }
        return createOperatorStateBackend;
    }

    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange) throws Exception {
        if (this.keyedStateBackend != null) {
            throw new RuntimeException("The keyed state backend can only be created once.");
        }
        this.keyedStateBackend = this.stateBackend.createKeyedStateBackend(getEnvironment(), getEnvironment().getJobID(), createOperatorIdentifier(this.headOperator, this.configuration.getVertexID().intValue()), typeSerializer, i, keyGroupRange, getEnvironment().getTaskKvStateRegistry());
        this.cancelables.registerClosable(this.keyedStateBackend);
        this.keyedStateBackend.restore(this.restoreStateHandles == null ? null : this.restoreStateHandles.getManagedKeyedState());
        return (AbstractKeyedStateBackend<K>) this.keyedStateBackend;
    }

    public CheckpointStreamFactory createCheckpointStreamFactory(StreamOperator<?> streamOperator) throws IOException {
        return this.stateBackend.createStreamFactory(getEnvironment().getJobID(), createOperatorIdentifier(streamOperator, this.configuration.getVertexID().intValue()));
    }

    public CheckpointStreamFactory createSavepointStreamFactory(StreamOperator<?> streamOperator, String str) throws IOException {
        return this.stateBackend.createSavepointStreamFactory(getEnvironment().getJobID(), createOperatorIdentifier(streamOperator, this.configuration.getVertexID().intValue()), str);
    }

    private String createOperatorIdentifier(StreamOperator<?> streamOperator, int i) {
        return streamOperator.getClass().getSimpleName() + "_" + i + "_" + getEnvironment().getTaskInfo().getIndexOfThisSubtask();
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler
    public void handleAsyncException(String str, Throwable th) {
        if (this.isRunning) {
            getEnvironment().failExternally(th);
        }
    }

    public String toString() {
        return getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }
}
