/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.UpsertExecutor;
import org.apache.phoenix.util.csv.CsvUpsertExecutor;

public class ODPSMapper
extends Mapper<NullWritable, MapWritable, TableRowkeyPair, ImmutableBytesWritable> {
    public static final String ODPS_COLUMN_INFO_CONFKEY = "odps.mapreduce.import.columninfos";
    public static final String PHOENIX_MAPPED_COLUMN_INFO_CONFKEY = "phoenix.mapped.import.columninfos";
    public static final String ACCESS_KEY_ID_CONFKEY = "odps.access.key.id";
    public static final String ACCESS_KEY_SECRET_CONFKEY = "odps.access.key.secret";
    public static final String ODPS_URL_CONFKEY = "odps.url";
    public static final String ODPS_TUNNEL_URL_CONFKEY = "odps.tunnel.url";
    public static final String ODPS_PROJECT_CONFKEY = "odps.project.name";
    public static final String ODPS_TABLE_NAME_CONFKEY = "odps.table.name";
    public static final String ODPS_TABLE_PARTITION_SPEC_CONFKEY = "odps.table.partition.spec";
    public static final String ODPS_PARTITION_NUMBER_CONFKEY = "odps.table.split.number";
    public static final String ODPS_INPUT_CLASS = "odps.input.class";
    public static final String ODPS_ERROR_DATA_PATH = "odps.error.data.path";
    protected PhoenixConnection conn;
    protected UpsertExecutor<Row, String> upsertExecutor;
    protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
    protected List<String> tableNames;
    protected List<String> logicalNames;
    protected UpsertExecutor.UpsertListener<Row> upsertListener;
    protected Map<byte[], Integer> columnIndexes;
    private List<ColumnInfo> mappedColumnInfoList;
    private FSDataOutputStream outputStream;
    private FileSystem fs;
    private Path filePath;

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        Properties clientInfos = new Properties();
        for (Map.Entry entry : conf) {
            clientInfos.setProperty((String)entry.getKey(), (String)entry.getValue());
        }
        try {
            this.conn = (PhoenixConnection)QueryUtil.getConnectionOnServer(clientInfos, conf);
            this.conn.setAutoCommit(false);
            String tableNamesConf = conf.get("phoenix.mapreduce.import.tablenames");
            String logicalNamesConf = conf.get("phoenix.mapreduce.import.logicalnames");
            this.tableNames = (List)TargetTableRefFunctions.NAMES_FROM_JSON.apply((Object)tableNamesConf);
            this.logicalNames = (List)TargetTableRefFunctions.NAMES_FROM_JSON.apply((Object)logicalNamesConf);
            this.columnIndexes = PhoenixMapReduceUtil.initColumnIndexes(this.conn, this.logicalNames);
        }
        catch (ClassNotFoundException | SQLException e) {
            throw new RuntimeException(e);
        }
        String targetTableName = conf.get("phoenix.mapreduce.import.tablename");
        this.mappedColumnInfoList = PhoenixMapReduceUtil.buildTargetTableColumns(conf);
        this.upsertListener = this.initUpsertListener(context, conf.getBoolean("phoenix.mapreduce.import.ignoreinvalidrow", true));
        this.preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
        this.upsertExecutor = this.initUpsertExecutor(this.conn, targetTableName, this.mappedColumnInfoList);
        this.filePath = new Path(conf.get(ODPS_ERROR_DATA_PATH) + "/" + context.getTaskAttemptID().toString());
        this.fs = FileSystem.get((Configuration)conf);
    }

    private UpsertExecutor.UpsertListener<Row> initUpsertListener(final Mapper.Context context, final boolean ignoreRecordErrors) {
        return new UpsertExecutor.UpsertListener<Row>(){

            @Override
            public void upsertDone(long upsertCount) {
                context.getCounter("Phoenix MapReduce Import", "Upserts Done").increment(1L);
            }

            @Override
            public void errorOnRecord(Row record, Throwable throwable) {
                FormatToBytesWritableMapper.LOG.error("Error on record " + record, throwable);
                context.getCounter("Phoenix MapReduce Import", "Errors on records").increment(1L);
                try {
                    if (ODPSMapper.this.outputStream == null) {
                        ODPSMapper.this.outputStream = ODPSMapper.this.fs.create(ODPSMapper.this.filePath);
                    }
                    ODPSMapper.this.outputStream.write(record.toString().getBytes("UTF-8"));
                }
                catch (IOException e) {
                    FormatToBytesWritableMapper.LOG.error("write row:" + record.toString() + " into " + ODPSMapper.this.filePath.toString() + " failed", e.getCause());
                    throw new IllegalStateException(e.getMessage());
                }
                if (!ignoreRecordErrors) {
                    Throwables.propagate((Throwable)throwable);
                }
            }
        };
    }

    private UpsertExecutor<Row, String> initUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList) {
        return new UpsertExecutor<Row, String>(conn, tableName, columnInfoList, this.upsertListener){

            @Override
            protected void execute(Row row) {
                try {
                    if (row.size() < this.conversionFunctions.size()) {
                        String message = String.format("record does not have enough values (has %d, but needs %d)", row.size(), this.conversionFunctions.size());
                        throw new IllegalArgumentException(message);
                    }
                    for (int fieldIndex = 0; fieldIndex < this.conversionFunctions.size(); ++fieldIndex) {
                        Object sqlValue = ((Function)this.conversionFunctions.get(fieldIndex)).apply((Object)row.get(fieldIndex));
                        if (sqlValue != null) {
                            this.preparedStatement.setObject(fieldIndex + 1, sqlValue);
                            continue;
                        }
                        this.preparedStatement.setNull(fieldIndex + 1, ((PDataType)this.dataTypes.get(fieldIndex)).getSqlType());
                    }
                    this.preparedStatement.execute();
                    this.upsertListener.upsertDone(++this.upsertCount);
                }
                catch (Exception e) {
                    FormatToBytesWritableMapper.LOG.warn("Error on record " + row, (Throwable)e);
                    this.upsertListener.errorOnRecord(row, e);
                }
            }

            @Override
            protected Function<String, Object> createConversionFunction(PDataType dataType) {
                if (dataType.isArrayType()) {
                    throw new IllegalStateException("Unsupported array type!");
                }
                return new CsvUpsertExecutor.SimpleDatatypeConversionFunction(dataType, this.conn);
            }
        };
    }

    protected void map(NullWritable key, MapWritable value, Mapper.Context context) throws IOException, InterruptedException {
        if (this.conn == null) {
            throw new RuntimeException("Connection not initialized.");
        }
        try {
            HashMap<String, String> columnNamesToStrValues = new HashMap<String, String>(value.size());
            for (Object entry : value.entrySet()) {
                columnNamesToStrValues.put(((Writable)entry.getKey()).toString(), ((Writable)entry.getValue()).toString());
            }
            ArrayList<String> values = new ArrayList<String>(value.size());
            for (ColumnInfo mcol : this.mappedColumnInfoList) {
                values.add((String)columnNamesToStrValues.get(mcol.getColumnName()));
            }
            this.upsertExecutor.execute((Row)ImmutableList.of((Object)new Row(values)));
            HashMap map = new HashMap();
            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(this.conn, true);
            block4: while (uncommittedDataIterator.hasNext()) {
                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
                List<KeyValue> keyValueList = (List<KeyValue>)kvPair.getSecond();
                keyValueList = this.preUpdateProcessor.preUpsert((byte[])kvPair.getFirst(), keyValueList);
                byte[] first = (byte[])kvPair.getFirst();
                for (int i = 0; i < this.tableNames.size(); ++i) {
                    if (Bytes.compareTo((byte[])Bytes.toBytes((String)this.tableNames.get(i)), (byte[])first) != 0) continue;
                    if (!map.containsKey(i)) {
                        map.put(i, new ArrayList());
                    }
                    List list = (List)map.get(i);
                    for (KeyValue kv : keyValueList) {
                        list.add(kv);
                    }
                    continue block4;
                }
            }
            for (Map.Entry rowEntry : map.entrySet()) {
                int tableIndex = (Integer)rowEntry.getKey();
                List lkv = (List)rowEntry.getValue();
                PhoenixMapReduceUtil.writeAggregatedRow(context, this.tableNames.get(tableIndex), lkv, this.columnIndexes);
            }
            this.conn.rollback();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        if (this.outputStream != null) {
            this.outputStream.close();
        }
    }

    private class Row {
        private List<String> values;

        public Row(List<String> values) {
            this.values = values;
        }

        public String get(int idx) {
            return this.values.get(idx);
        }

        public int size() {
            return this.values.size();
        }

        public String toString() {
            return Joiner.on((String)",").useForNull("").join(this.values).concat("\t\n");
        }
    }
}

