package com.alibaba.blink.streaming.connectors.common.source;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.sources.BatchExecTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;

/* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/source/SourceFunctionTableSource.class */
public class SourceFunctionTableSource<OUT> implements BatchExecTableSource<OUT>, StreamTableSource<OUT>, Serializable {
    private static final long serialVersionUID = 7078000231877526561L;
    private static final int DEFAULT_PARALLELISM = -1;
    public static final String BATCH_TAG = "Batch";
    public static final String STREAM_TAG = "Stream";
    protected TypeInformation<OUT> returnTypeInfo;
    private transient DataStream dataStream;
    private transient DataStreamSource boundedStreamSource;
    private SourceFunction<OUT> sourceFunction;

    public SourceFunctionTableSource() {
    }

    public SourceFunctionTableSource(SourceFunction<OUT> sourceFunction) {
        this.sourceFunction = sourceFunction;
    }

    public SourceFunction<OUT> getSourceFunction() throws UnsupportedSourceReqeustTypeException {
        if (this.sourceFunction != null) {
            return this.sourceFunction;
        }
        throw new UnsupportedSourceReqeustTypeException("SourceFunction has not been setup yet");
    }

    public String explainSource() {
        return getSourceFunction().toString();
    }

    public ResourceSpec getResource() {
        return null;
    }

    public DataStream<OUT> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        if (this.dataStream == null) {
            AbstractParallelSourceBase sourceFunction = getSourceFunction();
            if (sourceFunction instanceof AbstractParallelSourceBase) {
                sourceFunction.disableWatermarkEmitter();
            }
            this.dataStream = streamExecutionEnvironment.addSource(sourceFunction).name(String.format("%s-%s", explainSource(), STREAM_TAG));
        }
        return this.dataStream;
    }

    public DataStream<OUT> getBoundedStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        if (this.boundedStreamSource == null) {
            AbstractParallelSourceBase sourceFunction = getSourceFunction();
            TypeInformation<OUT> producedType = getProducedType();
            int i = DEFAULT_PARALLELISM;
            if (sourceFunction instanceof AbstractParallelSourceBase) {
                AbstractParallelSourceBase abstractParallelSourceBase = sourceFunction;
                abstractParallelSourceBase.disableParallelRead();
                try {
                    List<String> partitionList = abstractParallelSourceBase.getPartitionList();
                    if (partitionList != null) {
                        i = partitionList.size();
                    }
                } catch (Exception e) {
                }
            }
            this.boundedStreamSource = streamExecutionEnvironment.addSource(sourceFunction, String.format("%s-%s", explainSource(), BATCH_TAG), producedType);
            if (i > 0) {
                this.boundedStreamSource.setParallelism(i);
                this.boundedStreamSource.getTransformation().setParallelismLocked(true);
            }
            ResourceSpec resource = getResource();
            if (resource != null) {
                this.boundedStreamSource.getTransformation().setResources(resource, resource);
            }
        }
        return this.boundedStreamSource;
    }

    public TableStats getTableStats() {
        return null;
    }

    public DataType getReturnType() {
        return DataTypes.of(getProducedType());
    }

    public TypeInformation<OUT> getProducedType() {
        if (this.returnTypeInfo != null) {
            return this.returnTypeInfo;
        }
        ResultTypeQueryable sourceFunction = getSourceFunction();
        if (sourceFunction instanceof ResultTypeQueryable) {
            this.returnTypeInfo = sourceFunction.getProducedType();
        } else {
            try {
                this.returnTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, sourceFunction.getClass(), 0, (TypeInformation) null, (TypeInformation) null);
            } catch (InvalidTypesException e) {
                throw new RuntimeException("Fail to get type information of source function", e);
            }
        }
        return this.returnTypeInfo;
    }

    public TableSchema getTableSchema() {
        return TableSchema.fromTableSource(this);
    }
}
