package org.apache.flink.table.plan.nodes.physical.stream;

import java.util.List;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.table.api.StreamTableEnvironment;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.codegen.CodeGeneratorContext$;
import org.apache.flink.table.codegen.SinkCodeGenerator$;
import org.apache.flink.table.plan.nodes.FlinkRelNode;
import org.apache.flink.table.plan.nodes.calcite.Sink;
import org.apache.flink.table.plan.nodes.exec.BaseStreamExecNode;
import org.apache.flink.table.plan.nodes.exec.ExecNode;
import org.apache.flink.table.plan.nodes.exec.ExecNodeVisitor;
import org.apache.flink.table.plan.nodes.exec.ExecNodeWriter;
import org.apache.flink.table.plan.nodes.exec.NodeResource;
import org.apache.flink.table.plan.nodes.exec.RowStreamExecNode;
import org.apache.flink.table.plan.nodes.exec.StreamExecNode;
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel;
import org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel;
import org.apache.flink.table.plan.trait.AccMode$;
import org.apache.flink.table.plan.trait.AccModeTrait;
import org.apache.flink.table.plan.trait.AccModeTraitDef$;
import org.apache.flink.table.plan.util.UpdatingPlanChecker$;
import org.apache.flink.table.runtime.AbstractProcessStreamOperator;
import org.apache.flink.table.runtime.OneInputSubstituteStreamOperator;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.BaseRetractStreamTableSink;
import org.apache.flink.table.sinks.BaseUpsertStreamTableSink;
import org.apache.flink.table.sinks.DataStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.typeutils.BaseRowTypeInfo;
import org.apache.flink.table.typeutils.TypeUtils$;
import org.apache.flink.table.util.Logging;
import org.apache.flink.table.util.NodeResourceUtil;
import org.slf4j.Logger;
import scala.Array$;
import scala.Enumeration;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;

/* compiled from: StreamExecSink.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%e\u0001B\u0001\u0003\u0001M\u0011ab\u0015;sK\u0006lW\t_3d'&t7N\u0003\u0002\u0004\t\u000511\u000f\u001e:fC6T!!\u0002\u0004\u0002\u0011AD\u0017p]5dC2T!a\u0002\u0005\u0002\u000b9|G-Z:\u000b\u0005%Q\u0011\u0001\u00029mC:T!a\u0003\u0007\u0002\u000bQ\f'\r\\3\u000b\u00055q\u0011!\u00024mS:\\'BA\b\u0011\u0003\u0019\t\u0007/Y2iK*\t\u0011#A\u0002pe\u001e\u001c\u0001!\u0006\u0002\u0015\u0015N!\u0001!F\u000e !\t1\u0012$D\u0001\u0018\u0015\tAb!A\u0004dC2\u001c\u0017\u000e^3\n\u0005i9\"\u0001B*j].\u0004\"\u0001H\u000f\u000e\u0003\tI!A\b\u0002\u0003#M#(/Z1n!\"L8/[2bYJ+G\u000eE\u0002!G\u0015j\u0011!\t\u0006\u0003E\u0019\tA!\u001a=fG&\u0011A%\t\u0002\u0013\u0005\u0006\u001cXm\u0015;sK\u0006lW\t_3d\u001d>$W\r\u0005\u0002'S5\tqEC\u0001)\u0003\u0015\u00198-\u00197b\u0013\tQsEA\u0002B]fD\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!L\u0001\bG2,8\u000f^3s!\tq\u0013'D\u00010\u0015\tI\u0001G\u0003\u0002\u0019\u001d%\u0011!g\f\u0002\u000e%\u0016dw\n\u001d;DYV\u001cH/\u001a:\t\u0011Q\u0002!\u0011!Q\u0001\nU\n\u0001\u0002\u001e:bSR\u001cV\r\u001e\t\u0003]YJ!aN\u0018\u0003\u0017I+G\u000e\u0016:bSR\u001cV\r\u001e\u0005\ts\u0001\u0011\t\u0011)A\u0005u\u0005)\u0011N\u001c9viB\u00111HP\u0007\u0002y)\u0011Q\bM\u0001\u0004e\u0016d\u0017BA =\u0005\u001d\u0011V\r\u001c(pI\u0016D\u0011\"\u0011\u0001\u0003\u0002\u0003\u0006IA\u0011)\u0002\tMLgn\u001b\t\u0004\u0007\u001aCU\"\u0001#\u000b\u0005\u0015S\u0011!B:j].\u001c\u0018BA$E\u0005%!\u0016M\u00197f'&t7\u000e\u0005\u0002J\u00152\u0001A!B&\u0001\u0005\u0004a%!\u0001+\u0012\u00055+\u0003C\u0001\u0014O\u0013\tyuEA\u0004O_RD\u0017N\\4\n\u0005\u0005K\u0002\"\u0003*\u0001\u0005\u0003\u0005\u000b\u0011B*[\u0003!\u0019\u0018N\\6OC6,\u0007C\u0001+X\u001d\t1S+\u0003\u0002WO\u00051\u0001K]3eK\u001aL!\u0001W-\u0003\rM#(/\u001b8h\u0015\t1v%\u0003\u0002S3!)A\f\u0001C\u0001;\u00061A(\u001b8jiz\"bAX0aC\n\u001c\u0007c\u0001\u000f\u0001\u0011\")Af\u0017a\u0001[!)Ag\u0017a\u0001k!)\u0011h\u0017a\u0001u!)\u0011i\u0017a\u0001\u0005\")!k\u0017a\u0001'\")Q\r\u0001C!M\u0006!1m\u001c9z)\rQt\r\u001b\u0005\u0006i\u0011\u0004\r!\u000e\u0005\u0006S\u0012\u0004\rA[\u0001\u0007S:\u0004X\u000f^:\u0011\u0007-\u0004((D\u0001m\u0015\tig.\u0001\u0003vi&d'\"A8\u0002\t)\fg/Y\u0005\u0003c2\u0014A\u0001T5ti\")1\u000f\u0001C!i\u0006y\u0011n\u001d#fi\u0016\u0014X.\u001b8jgRL7-F\u0001v!\t1c/\u0003\u0002xO\t9!i\\8mK\u0006t\u0007\"B=\u0001\t\u0003R\u0018\u0001\u00078fK\u0012\u001cX\u000b\u001d3bi\u0016\u001c\u0018i\u001d*fiJ\f7\r^5p]R\u0011Qo\u001f\u0005\u0006sa\u0004\rA\u000f\u0005\b{\u0002\u0011\r\u0011\"\u0003u\u0003UI7\u000fR1uCN#(/Z1n)\u0006\u0014G.Z*j].Daa \u0001!\u0002\u0013)\u0018AF5t\t\u0006$\u0018m\u0015;sK\u0006lG+\u00192mKNKgn\u001b\u0011\t\u000f\u0005\r\u0001\u0001\"\u0011\u0002\u0006\u0005\u0019r-\u001a;GY&t7\u000e\u00155zg&\u001c\u0017\r\u001c*fYV\u0011\u0011q\u0001\t\u0005\u0003\u0013\tY!D\u0001\u0005\u0013\r\ti\u0001\u0002\u0002\u0011\r2Lgn\u001b)isNL7-\u00197SK2Dq!!\u0005\u0001\t\u0003\n\u0019\"\u0001\bhKR\u001cF/\u0019;f\t&<Wm\u001d;\u0015\t\u0005U\u00111\u0004\t\u0004A\u0005]\u0011bAA\rC\tqQ\t_3d\u001d>$Wm\u0016:ji\u0016\u0014\b\u0002CA\u000f\u0003\u001f\u0001\r!!\u0006\u0002\u0005A<\bbBA\u0011\u0001\u0011\u0005\u00131E\u0001\u0018iJ\fgn\u001d7bi\u0016$v\u000e\u00157b]&sG/\u001a:oC2$B!!\n\u0002:A)\u0011qEA\u001bK5\u0011\u0011\u0011\u0006\u0006\u0005\u0003W\ti#A\bue\u0006t7OZ8s[\u0006$\u0018n\u001c8t\u0015\u0011\ty#!\r\u0002\u0007\u0005\u0004\u0018NC\u0002\u000241\t\u0011b\u001d;sK\u0006l\u0017N\\4\n\t\u0005]\u0012\u0011\u0006\u0002\u0015'R\u0014X-Y7Ue\u0006t7OZ8s[\u0006$\u0018n\u001c8\t\u0011\u0005m\u0012q\u0004a\u0001\u0003{\t\u0001\u0002^1cY\u0016,eN\u001e\t\u0005\u0003\u007f\t\u0019%\u0004\u0002\u0002B)\u0019\u0011q\u0006\u0006\n\t\u0005\u0015\u0013\u0011\t\u0002\u0017'R\u0014X-Y7UC\ndW-\u00128wSJ|g.\\3oi\"9\u0011\u0011\n\u0001\u0005\n\u0005-\u0013!\u0003;sC:\u001cH.\u0019;f)\u0019\ti%a\u0014\u0002TA)\u0011qEA\u001b\u0011\"9\u0011\u0011KA$\u0001\u0004)\u0018AD<ji\"\u001c\u0005.\u00198hK\u001ac\u0017m\u001a\u0005\t\u0003w\t9\u00051\u0001\u0002>!9\u0011q\u000b\u0001\u0005\n\u0005e\u0013AD3nSR$\u0015\r^1TiJ,\u0017-\u001c\u000b\u0007\u00037\ny'a 1\t\u0005u\u00131\u000e\t\u0007\u0003?\n)'!\u001b\u000e\u0005\u0005\u0005$\u0002BA2\u0003[\t!\u0002Z1uCN$(/Z1n\u0013\u0011\t9'!\u0019\u0003\u001d\u0011\u000bG/Y*ue\u0016\fWnU5oWB\u0019\u0011*a\u001b\u0005\u0017\u00055\u0014QKA\u0001\u0002\u0003\u0015\t\u0001\u0014\u0002\u0004?\u0012\u001a\u0004\u0002CA9\u0003+\u0002\r!a\u001d\u0002\u0013Q\f'\r\\3D_:4\u0007\u0003BA;\u0003wj!!a\u001e\u000b\u0007\u0005eD\"A\u0007d_:4\u0017nZ;sCRLwN\\\u0005\u0005\u0003{\n9HA\u0007D_:4\u0017nZ;sCRLwN\u001c\u0005\t\u0003\u0003\u000b)\u00061\u0001\u0002\u0004\u0006QA-\u0019;b'R\u0014X-Y7\u0011\u000b\u0005}\u0013Q\u0011%\n\t\u0005\u001d\u0015\u0011\r\u0002\u000b\t\u0006$\u0018m\u0015;sK\u0006l\u0007")
/* loaded from: input_file:org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.class */
public class StreamExecSink<T> extends Sink implements StreamPhysicalRel, BaseStreamExecNode<Object> {
    private final RelOptCluster cluster;
    private final boolean isDataStreamTableSink;
    private final List<ExecNode<StreamTableEnvironment, ?>> org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes;
    private final transient Logger LOG;
    private final NodeResource org$apache$flink$table$plan$nodes$exec$ExecNode$$resource;
    private StreamTransformation<Object> org$apache$flink$table$plan$nodes$exec$ExecNode$$transformation;
    private volatile boolean bitmap$0;
    private volatile transient boolean bitmap$trans$0;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private List org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes = BaseStreamExecNode.Cclass.org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes;
        }
    }

    @Override // org.apache.flink.table.plan.nodes.exec.BaseStreamExecNode
    public List<ExecNode<StreamTableEnvironment, ?>> org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes() {
        return this.bitmap$0 ? this.org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes : org$apache$flink$table$plan$nodes$exec$BaseStreamExecNode$$inputNodes$lzycompute();
    }

    @Override // org.apache.flink.table.plan.nodes.exec.BaseStreamExecNode, org.apache.flink.table.plan.nodes.exec.ExecNode
    public List<ExecNode<StreamTableEnvironment, ?>> getInputNodes() {
        return BaseStreamExecNode.Cclass.getInputNodes(this);
    }

    @Override // org.apache.flink.table.plan.nodes.exec.BaseStreamExecNode, org.apache.flink.table.plan.nodes.exec.ExecNode
    public void replaceInputNode(int i, ExecNode<StreamTableEnvironment, ?> execNode) {
        BaseStreamExecNode.Cclass.replaceInputNode(this, i, execNode);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger LOG$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$trans$0) {
                this.LOG = Logging.Cclass.LOG(this);
                this.bitmap$trans$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.LOG;
        }
    }

    @Override // org.apache.flink.table.util.Logging
    public Logger LOG() {
        return this.bitmap$trans$0 ? this.LOG : LOG$lzycompute();
    }

    @Override // org.apache.flink.table.plan.nodes.exec.StreamExecNode
    public /* synthetic */ StreamTransformation org$apache$flink$table$plan$nodes$exec$StreamExecNode$$super$translateToPlan(StreamTableEnvironment streamTableEnvironment) {
        return ExecNode.Cclass.translateToPlan(this, streamTableEnvironment);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public StreamTransformation<Object> translateToPlan(StreamTableEnvironment streamTableEnvironment) {
        return StreamExecNode.Cclass.translateToPlan(this, streamTableEnvironment);
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public NodeResource org$apache$flink$table$plan$nodes$exec$ExecNode$$resource() {
        return this.org$apache$flink$table$plan$nodes$exec$ExecNode$$resource;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public StreamTransformation<Object> org$apache$flink$table$plan$nodes$exec$ExecNode$$transformation() {
        return this.org$apache$flink$table$plan$nodes$exec$ExecNode$$transformation;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    @TraitSetter
    public void org$apache$flink$table$plan$nodes$exec$ExecNode$$transformation_$eq(StreamTransformation<Object> streamTransformation) {
        this.org$apache$flink$table$plan$nodes$exec$ExecNode$$transformation = streamTransformation;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public void org$apache$flink$table$plan$nodes$exec$ExecNode$_setter_$org$apache$flink$table$plan$nodes$exec$ExecNode$$resource_$eq(NodeResource nodeResource) {
        this.org$apache$flink$table$plan$nodes$exec$ExecNode$$resource = nodeResource;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public NodeResource getResource() {
        return ExecNode.Cclass.getResource(this);
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public void accept(ExecNodeVisitor execNodeVisitor) {
        ExecNode.Cclass.accept(this, execNodeVisitor);
    }

    @Override // org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
    public boolean producesUpdates() {
        return StreamPhysicalRel.Cclass.producesUpdates(this);
    }

    @Override // org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
    public boolean consumesRetractions() {
        return StreamPhysicalRel.Cclass.consumesRetractions(this);
    }

    @Override // org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
    public boolean producesRetractions() {
        return StreamPhysicalRel.Cclass.producesRetractions(this);
    }

    @Override // org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
    public boolean requireWatermark() {
        return StreamPhysicalRel.Cclass.requireWatermark(this);
    }

    @Override // org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
    public RelNode satisfyTraitsByInput(RelTraitSet relTraitSet) {
        return FlinkPhysicalRel.Cclass.satisfyTraitsByInput(this, relTraitSet);
    }

    @Override // org.apache.flink.table.plan.nodes.FlinkRelNode
    public String getExpressionString(RexNode rexNode, scala.collection.immutable.List<String> list, Option<scala.collection.immutable.List<RexNode>> option) {
        return FlinkRelNode.Cclass.getExpressionString(this, rexNode, list, option);
    }

    @Override // org.apache.flink.table.plan.nodes.FlinkRelNode
    public String getExpressionString(RexNode rexNode, scala.collection.immutable.List<String> list, Option<scala.collection.immutable.List<RexNode>> option, Enumeration.Value value) {
        return FlinkRelNode.Cclass.getExpressionString(this, rexNode, list, option, value);
    }

    @Override // org.apache.calcite.rel.AbstractRelNode, org.apache.calcite.rel.RelNode
    public RelNode copy(RelTraitSet relTraitSet, List<RelNode> list) {
        return new StreamExecSink(this.cluster, relTraitSet, list.get(0), super.sink(), super.sinkName());
    }

    @Override // org.apache.flink.table.plan.nodes.FlinkRelNode
    public boolean isDeterministic() {
        return true;
    }

    @Override // org.apache.flink.table.plan.nodes.physical.stream.StreamPhysicalRel
    public boolean needsUpdatesAsRetraction(RelNode relNode) {
        return (super.sink() instanceof BaseRetractStreamTableSink) || (isDataStreamTableSink() && ((DataStreamTableSink) super.sink()).updatesAsRetraction());
    }

    private boolean isDataStreamTableSink() {
        return this.isDataStreamTableSink;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public FlinkPhysicalRel getFlinkPhysicalRel() {
        return this;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.StreamExecNode
    public ExecNodeWriter getStateDigest(ExecNodeWriter execNodeWriter) {
        return execNodeWriter;
    }

    @Override // org.apache.flink.table.plan.nodes.exec.ExecNode
    public StreamTransformation<Object> translateToPlanInternal(StreamTableEnvironment streamTableEnvironment) {
        StreamTransformation<T> translate;
        StreamTransformation<T> transformation;
        TableSink<?> sink = super.sink();
        if (sink instanceof BaseRetractStreamTableSink) {
            translate = translate(true, streamTableEnvironment);
        } else if (sink instanceof BaseUpsertStreamTableSink) {
            ((BaseUpsertStreamTableSink) sink).setIsAppendOnly(Predef$.MODULE$.boolean2Boolean(UpdatingPlanChecker$.MODULE$.isAppendOnly(this)));
            translate = translate(true, streamTableEnvironment);
        } else if (sink instanceof AppendStreamTableSink) {
            if (!UpdatingPlanChecker$.MODULE$.isAppendOnly(this)) {
                throw new TableException("AppendStreamTableSink requires that Table has only insert changes.");
            }
            Enumeration.Value accMode = ((AccModeTrait) getTraitSet().getTrait(AccModeTraitDef$.MODULE$.INSTANCE())).getAccMode();
            Enumeration.Value AccRetract = AccMode$.MODULE$.AccRetract();
            if (accMode != null ? accMode.equals(AccRetract) : AccRetract == null) {
                throw new TableException("AppendStreamTableSink can not be used to output retraction messages.");
            }
            translate = translate(false, streamTableEnvironment);
        } else {
            if (!(sink instanceof DataStreamTableSink)) {
                throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.");
            }
            translate = translate(((DataStreamTableSink) sink).withChangeFlag(), streamTableEnvironment);
        }
        StreamTransformation<T> streamTransformation = translate;
        if (isDataStreamTableSink()) {
            transformation = streamTransformation;
        } else {
            transformation = emitDataStream(streamTableEnvironment.getConfig().getConf(), new DataStream<>(streamTableEnvironment.execEnv(), streamTransformation)).getTransformation();
        }
        return transformation;
    }

    private StreamTransformation<T> translate(boolean z, StreamTableEnvironment streamTableEnvironment) {
        Object outputType;
        StreamTransformation<T> streamTransformation;
        RelNode input = getInput();
        DataType outputType2 = super.sink().getOutputType();
        if (!z && !UpdatingPlanChecker$.MODULE$.isAppendOnly(input)) {
            throw new TableException("Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.");
        }
        if (!(input instanceof RowStreamExecNode)) {
            throw new TableException("Cannot generate DataStream due to an invalid logical plan. This is a bug and should not happen. Please file an issue.");
        }
        StreamTransformation<T> translateToPlan = ((StreamExecNode) input).translateToPlan(streamTableEnvironment);
        RelDataType rowType = input.getRowType();
        Buffer buffer = (Buffer) JavaConversions$.MODULE$.asScalaBuffer(rowType.getFieldList()).filter(new StreamExecSink$$anonfun$1(this));
        if (buffer.size() > 1) {
            throw new TableException(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Found more than one rowtime field: [", "] in "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{((TraversableOnce) buffer.map(new StreamExecSink$$anonfun$2(this), Buffer$.MODULE$.canBuildFrom())).mkString(", ")}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"the table that should be converted to a DataStream.\\n"})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Please select the rowtime field that should be used as event-time timestamp for the "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"DataStream by casting all other fields to TIMESTAMP."})).s(Nil$.MODULE$)).toString());
        }
        if (buffer.size() == 1) {
            BaseRowTypeInfo baseRowTypeInfo = (BaseRowTypeInfo) translateToPlan.getOutputType();
            outputType = new BaseRowTypeInfo((TypeInformation[]) Predef$.MODULE$.refArrayOps(baseRowTypeInfo.getFieldTypes()).map(new StreamExecSink$$anonfun$3(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(TypeInformation.class))), baseRowTypeInfo.getFieldNames());
        } else {
            outputType = translateToPlan.getOutputType();
        }
        Object obj = outputType;
        TableConfig config = streamTableEnvironment.getConfig();
        Tuple2 generateRowConverterOperator = SinkCodeGenerator$.MODULE$.generateRowConverterOperator(config, CodeGeneratorContext$.MODULE$.apply(config, true).setOperatorBaseClass(AbstractProcessStreamOperator.class), (BaseRowTypeInfo) obj, rowType, "DataStreamSinkConversion", buffer.isEmpty() ? None$.MODULE$ : new Some(BoxesRunTime.boxToInteger(((RelDataTypeField) buffer.head()).getIndex())), z, outputType2, super.sink());
        if (generateRowConverterOperator == null) {
            throw new MatchError(generateRowConverterOperator);
        }
        Tuple2 tuple2 = new Tuple2((Option) generateRowConverterOperator._1(), (TypeInformation) generateRowConverterOperator._2());
        Some some = (Option) tuple2._1();
        TypeInformation typeInformation = (TypeInformation) tuple2._2();
        if (None$.MODULE$.equals(some)) {
            streamTransformation = translateToPlan;
        } else {
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            StreamTransformation<T> oneInputTransformation = new OneInputTransformation<>(translateToPlan, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"SinkConversion to ", ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER})).s(Predef$.MODULE$.genericWrapArray(new Object[]{TypeUtils$.MODULE$.getExternalClassForType(outputType2).getSimpleName()})), (OneInputSubstituteStreamOperator) some.x(), typeInformation, translateToPlan.getParallelism());
            ResourceSpec defaultResourceSpec = NodeResourceUtil.getDefaultResourceSpec(streamTableEnvironment.getConfig().getConf());
            oneInputTransformation.setResources(defaultResourceSpec, defaultResourceSpec);
            streamTransformation = oneInputTransformation;
        }
        return streamTransformation;
    }

    private DataStreamSink<?> emitDataStream(Configuration configuration, DataStream<T> dataStream) {
        DataStreamSink<?> emitDataStream;
        TableSink<?> sink = super.sink();
        if (sink instanceof BaseRetractStreamTableSink) {
            emitDataStream = ((BaseRetractStreamTableSink) sink).emitDataStream(dataStream);
        } else if (sink instanceof BaseUpsertStreamTableSink) {
            emitDataStream = ((BaseUpsertStreamTableSink) sink).emitDataStream(dataStream);
        } else {
            if (!(sink instanceof AppendStreamTableSink)) {
                throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.");
            }
            emitDataStream = ((AppendStreamTableSink) sink).emitDataStream(dataStream);
        }
        DataStreamSink<?> dataStreamSink = emitDataStream;
        SinkTransformation transformation = dataStreamSink.getTransformation();
        ResourceSpec minResources = transformation.getMinResources();
        if (minResources != null) {
            ResourceSpec resourceSpec = ResourceSpec.DEFAULT;
            if (minResources != null) {
            }
            return dataStreamSink;
        }
        ResourceSpec resourceSpec2 = NodeResourceUtil.getResourceSpec(configuration, NodeResourceUtil.getSinkMem(configuration), NodeResourceUtil.getSinkDirectMem(configuration));
        transformation.setResources(resourceSpec2, resourceSpec2);
        return dataStreamSink;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public StreamExecSink(RelOptCluster relOptCluster, RelTraitSet relTraitSet, RelNode relNode, TableSink<T> tableSink, String str) {
        super(relOptCluster, relTraitSet, relNode, tableSink, str);
        this.cluster = relOptCluster;
        FlinkRelNode.Cclass.$init$(this);
        FlinkPhysicalRel.Cclass.$init$(this);
        StreamPhysicalRel.Cclass.$init$(this);
        org$apache$flink$table$plan$nodes$exec$ExecNode$_setter_$org$apache$flink$table$plan$nodes$exec$ExecNode$$resource_$eq(new NodeResource());
        StreamExecNode.Cclass.$init$(this);
        Logging.Cclass.$init$(this);
        BaseStreamExecNode.Cclass.$init$(this);
        this.isDataStreamTableSink = super.sink() instanceof DataStreamTableSink;
    }
}
