/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.sinks.parquet;

import java.io.IOException;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.SafetyNetWrapperFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.sinks.parquet.RowWritableWriteSupport;
import org.apache.flink.table.sources.parquet.ParquetSchemaConverter;
import org.apache.flink.table.types.InternalType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowParquetOutputFormat
implements OutputFormat<BaseRow> {
    public static final Logger LOG = LoggerFactory.getLogger(RowParquetOutputFormat.class);
    private static final int DEFAULT_BLOCK_SIZE = 0x8000000;
    private static final String FILE_PREFIX_NAME = "parquet-";
    private final InternalType[] fieldTypes;
    private final String[] fieldNames;
    private final String dir;
    private int blockSize;
    private boolean enableDictionary;
    private CompressionCodecName compression;
    private String filePrefixName;
    private RecordWriter<Void, BaseRow> realWriter;
    private TaskAttemptContext taskContext;

    public RowParquetOutputFormat(String dir, InternalType[] fieldTypes2, String[] fieldNames, CompressionCodecName compression) {
        this(dir, fieldTypes2, fieldNames, compression, 0x8000000, false);
    }

    public RowParquetOutputFormat(String dir, InternalType[] fieldTypes2, String[] fieldNames) {
        this(dir, fieldTypes2, fieldNames, CompressionCodecName.UNCOMPRESSED, 0x8000000, false);
    }

    public RowParquetOutputFormat(String dir, InternalType[] fieldTypes2, String[] fieldNames, CompressionCodecName compression, int blockSize, boolean enableDictionary) {
        this(dir, fieldTypes2, fieldNames, compression, blockSize, enableDictionary, FILE_PREFIX_NAME);
    }

    public RowParquetOutputFormat(String dir, InternalType[] fieldTypes2, String[] fieldNames, CompressionCodecName compression, int blockSize, boolean enableDictionary, String filePrefixName) {
        Preconditions.checkArgument(fieldNames != null && fieldNames.length > 0);
        Preconditions.checkArgument(fieldTypes2 != null && fieldTypes2.length == fieldNames.length);
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes2;
        this.dir = dir;
        this.blockSize = blockSize;
        this.enableDictionary = enableDictionary;
        this.compression = compression;
        this.filePrefixName = filePrefixName;
    }

    @Override
    public void configure(Configuration parameters) {
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        JobConf jobConf = new JobConf();
        String fileName = this.filePrefixName + numTasks + "-" + taskNumber + ".parquet";
        Path path = new Path(this.dir, fileName);
        FileSystem fs = path.getFileSystem();
        if (fs instanceof SafetyNetWrapperFileSystem) {
            fs = ((SafetyNetWrapperFileSystem)fs).getWrappedDelegate();
        }
        if (fs instanceof HadoopFileSystem) {
            jobConf.addResource(((HadoopFileSystem)fs).getConfig());
        }
        if (!(fs instanceof LocalFileSystem) && !(fs instanceof HadoopFileSystem)) {
            throw new RuntimeException("FileSystem: " + fs.getClass().getCanonicalName() + " is not supported.");
        }
        fs.delete(path, true);
        ParquetOutputFormat realOutputFormat = new ParquetOutputFormat((WriteSupport)new RowWritableWriteSupport(this.fieldTypes));
        LOG.info("creating new record writer..." + this);
        RowWritableWriteSupport.setSchema(ParquetSchemaConverter.convert(this.fieldNames, this.fieldTypes), (org.apache.hadoop.conf.Configuration)jobConf);
        try {
            TaskAttemptID taskAttemptID = new TaskAttemptID();
            this.taskContext = ContextUtil.newTaskAttemptContext((org.apache.hadoop.conf.Configuration)jobConf, (TaskAttemptID)taskAttemptID);
            LOG.info("initialize serde with table properties.");
            this.initializeSerProperties((JobContext)this.taskContext);
            LOG.info("creating real writer to write at " + this.dir);
            this.realWriter = realOutputFormat.getRecordWriter(this.taskContext, new org.apache.hadoop.fs.Path(path.toUri()));
            LOG.info("real writer: " + this.realWriter);
        }
        catch (InterruptedException e2) {
            throw new IOException(e2);
        }
    }

    private void initializeSerProperties(JobContext job) {
        org.apache.hadoop.conf.Configuration conf = ContextUtil.getConfiguration((JobContext)job);
        if (this.blockSize > 0) {
            LOG.info("get override parquet.block.size property with: {}", (Object)this.blockSize);
            conf.setInt("parquet.block.size", this.blockSize);
            LOG.info("get override dfs.blocksize property with: {}", (Object)this.blockSize);
            conf.setInt("dfs.blocksize", this.blockSize);
        }
        LOG.info("get override parquet.enable.dictionary property with: {}", (Object)this.enableDictionary);
        conf.setBoolean("parquet.enable.dictionary", this.enableDictionary);
        if (this.compression != null) {
            LOG.info("get override compression properties with {}", (Object)this.compression.name());
            conf.set("parquet.compression", this.compression.name());
        }
    }

    @Override
    public void writeRecord(BaseRow record) throws IOException {
        try {
            this.realWriter.write(null, (Object)record);
        }
        catch (InterruptedException e2) {
            throw new IOException(e2);
        }
    }

    @Override
    public void close() throws IOException {
        try {
            this.realWriter.close(this.taskContext);
        }
        catch (InterruptedException e2) {
            throw new IOException(e2);
        }
    }
}

