package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamGroupedFold.class */
public class StreamGroupedFold<IN, OUT, KEY> extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1;
    private static final String STATE_NAME = "_op_state";
    private transient ValueState<OUT> values;
    private transient OUT initialValue;
    private byte[] serializedInitialValue;
    private TypeSerializer<OUT> outTypeSerializer;

    public StreamGroupedFold(FoldFunction<IN, OUT> foldFunction, OUT out) {
        super(foldFunction);
        this.initialValue = out;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        if (this.serializedInitialValue == null) {
            throw new RuntimeException("No initial value was serialized for the fold operator. Probably the setOutputType method was not called.");
        }
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(this.serializedInitialValue);
        Throwable th = null;
        try {
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            Throwable th2 = null;
            try {
                try {
                    this.initialValue = (OUT) this.outTypeSerializer.deserialize(dataInputViewStreamWrapper);
                    if (dataInputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataInputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataInputViewStreamWrapper.close();
                        }
                    }
                    this.values = getPartitionedState(new ValueStateDescriptor(STATE_NAME, this.outTypeSerializer));
                } finally {
                }
            } catch (Throwable th4) {
                if (dataInputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Object value = this.values.value();
        if (value != null) {
            Object fold = this.userFunction.fold(this.outTypeSerializer.copy(value), streamRecord.getValue());
            this.values.update(fold);
            this.output.collect(streamRecord.replace(fold));
        } else {
            Object fold2 = this.userFunction.fold(this.outTypeSerializer.copy(this.initialValue), streamRecord.getValue());
            this.values.update(fold2);
            this.output.collect(streamRecord.replace(fold2));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void endInput() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        this.outTypeSerializer = typeInformation.createSerializer(executionConfig);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            this.outTypeSerializer.serialize(this.initialValue, new DataOutputViewStreamWrapper(byteArrayOutputStream));
            this.serializedInitialValue = byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("Unable to serialize initial value of type " + this.initialValue.getClass().getSimpleName() + " of fold operator.", e);
        }
    }
}
