/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.UpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FormatToBytesWritableMapper<RECORD>
extends Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable> {
    protected static final Logger LOG = LoggerFactory.getLogger(FormatToBytesWritableMapper.class);
    protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
    public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
    public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
    public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
    public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
    protected PhoenixConnection conn;
    protected UpsertExecutor<RECORD, ?> upsertExecutor;
    protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
    protected List<String> tableNames;
    protected List<String> logicalNames;
    protected MapperUpsertListener<RECORD> upsertListener;
    protected Map<byte[], Integer> columnIndexes;

    protected abstract UpsertExecutor<RECORD, ?> buildUpsertExecutor(Configuration var1);

    protected abstract LineParser<RECORD> getLineParser();

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        Properties clientInfos = new Properties();
        for (Map.Entry entry : conf) {
            clientInfos.setProperty((String)entry.getKey(), (String)entry.getValue());
        }
        try {
            this.conn = (PhoenixConnection)QueryUtil.getConnectionOnServer(clientInfos, conf);
            this.conn.setAutoCommit(false);
            String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
            String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
            this.tableNames = (List)TargetTableRefFunctions.NAMES_FROM_JSON.apply((Object)tableNamesConf);
            this.logicalNames = (List)TargetTableRefFunctions.NAMES_FROM_JSON.apply((Object)logicalNamesConf);
            this.columnIndexes = PhoenixMapReduceUtil.initColumnIndexes(this.conn, this.logicalNames);
        }
        catch (ClassNotFoundException | SQLException e) {
            throw new RuntimeException(e);
        }
        this.upsertListener = new MapperUpsertListener(context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
        this.upsertExecutor = this.buildUpsertExecutor(conf);
        this.preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
    }

    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        if (this.conn == null) {
            throw new RuntimeException("Connection not initialized.");
        }
        try {
            Object record = null;
            try {
                record = this.getLineParser().parse(value.toString());
            }
            catch (IOException e) {
                context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
                return;
            }
            if (record == null) {
                context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
                return;
            }
            this.upsertExecutor.execute(ImmutableList.of(record));
            HashMap map = new HashMap();
            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(this.conn, true);
            block4: while (uncommittedDataIterator.hasNext()) {
                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
                List<KeyValue> keyValueList = (List<KeyValue>)kvPair.getSecond();
                keyValueList = this.preUpdateProcessor.preUpsert((byte[])kvPair.getFirst(), keyValueList);
                byte[] first = (byte[])kvPair.getFirst();
                for (int i = 0; i < this.tableNames.size(); ++i) {
                    if (Bytes.compareTo((byte[])Bytes.toBytes((String)this.tableNames.get(i)), (byte[])first) != 0) continue;
                    if (!map.containsKey(i)) {
                        map.put(i, new ArrayList());
                    }
                    List list = (List)map.get(i);
                    for (KeyValue kv : keyValueList) {
                        list.add(kv);
                    }
                    continue block4;
                }
            }
            for (Map.Entry rowEntry : map.entrySet()) {
                int tableIndex = (Integer)rowEntry.getKey();
                List lkv = (List)rowEntry.getValue();
                PhoenixMapReduceUtil.writeAggregatedRow(context, this.tableNames.get(tableIndex), lkv, this.columnIndexes);
            }
            this.conn.rollback();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
        try {
            if (this.conn != null) {
                this.conn.close();
            }
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
        conf.set(COLUMN_INFO_CONFKEY, Joiner.on((String)"|").useForNull("").join(columnInfoList));
    }

    @VisibleForTesting
    static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
        return Lists.newArrayList((Iterable)Iterables.transform((Iterable)Splitter.on((String)"|").split((CharSequence)conf.get(COLUMN_INFO_CONFKEY)), (Function)new Function<String, ColumnInfo>(){

            @Nullable
            public ColumnInfo apply(@Nullable String input) {
                if (input == null || input.isEmpty()) {
                    return null;
                }
                return ColumnInfo.fromString(input);
            }
        }));
    }

    public static class DefaultImportPreUpsertKeyValueProcessor
    implements ImportPreUpsertKeyValueProcessor {
        @Override
        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
            return keyValues;
        }
    }

    @VisibleForTesting
    static class MapperUpsertListener<T>
    implements UpsertExecutor.UpsertListener<T> {
        private final Mapper.Context context;
        private final boolean ignoreRecordErrors;

        MapperUpsertListener(Mapper.Context context, boolean ignoreRecordErrors) {
            this.context = context;
            this.ignoreRecordErrors = ignoreRecordErrors;
        }

        @Override
        public void upsertDone(long upsertCount) {
            this.context.getCounter(FormatToBytesWritableMapper.COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
        }

        @Override
        public void errorOnRecord(T record, Throwable throwable) {
            LOG.error("Error on record " + record, throwable);
            this.context.getCounter(FormatToBytesWritableMapper.COUNTER_GROUP_NAME, "Errors on records").increment(1L);
            if (!this.ignoreRecordErrors) {
                Throwables.propagate((Throwable)throwable);
            }
        }
    }

    public static interface LineParser<T> {
        public T parse(String var1) throws IOException;
    }
}

