/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce.index;

import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.index.IndexToolUtil;
import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectReducer;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
import org.apache.phoenix.mapreduce.index.PhoenixIndexPartialBuildMapper;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IndexTool
extends Configured
implements Tool {
    private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)");
    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, "Data table name (mandatory)");
    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true, "Index table name(not required in case of partial rebuilding)");
    private static final Option PARTIAL_REBUILD_OPTION = new Option("pr", "partial-rebuild", false, "To build indexes for a data table from least disabledTimeStamp");
    private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false, "If specified, we avoid the bulk load (optional)");
    private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, "Applicable on top of -direct option.If specified, runs index build in Foreground. Default - Runs the build in background.");
    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written");
    private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false, "If specified, uses Snapshots for async index building (optional)");
    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
    public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";

    private Options getOptions() {
        Options options = new Options();
        options.addOption(SCHEMA_NAME_OPTION);
        options.addOption(DATA_TABLE_OPTION);
        options.addOption(INDEX_TABLE_OPTION);
        options.addOption(PARTIAL_REBUILD_OPTION);
        options.addOption(DIRECT_API_OPTION);
        options.addOption(RUN_FOREGROUND_OPTION);
        options.addOption(OUTPUT_PATH_OPTION);
        options.addOption(SNAPSHOT_OPTION);
        options.addOption(HELP_OPTION);
        return options;
    }

    private CommandLine parseOptions(String[] args) {
        Options options = this.getOptions();
        PosixParser parser = new PosixParser();
        CommandLine cmdLine = null;
        try {
            cmdLine = parser.parse(options, args);
        }
        catch (ParseException e) {
            this.printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
        }
        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
            this.printHelpAndExit(options, 0);
        }
        if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory " + "parameter");
        }
        if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || cmdLine.hasOption(DIRECT_API_OPTION.getOpt()) || cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt()))) {
            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + "parameter");
        }
        if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
            throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
        }
        if (!cmdLine.hasOption(DIRECT_API_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt()) && cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())) {
            throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt() + " is applicable only for " + DIRECT_API_OPTION.getLongOpt());
        }
        return cmdLine;
    }

    private void printHelpAndExit(String errorMessage, Options options) {
        System.err.println(errorMessage);
        this.printHelpAndExit(options, 1);
    }

    private void printHelpAndExit(Options options, int exitCode) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("help", options);
        System.exit(exitCode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args) throws Exception {
        Connection connection = null;
        HTable htable = null;
        try {
            CommandLine cmdLine = null;
            try {
                cmdLine = this.parseOptions(args);
            }
            catch (IllegalStateException e) {
                this.printHelpAndExit(e.getMessage(), this.getOptions());
            }
            Configuration configuration = HBaseConfiguration.addHbaseResources((Configuration)this.getConf());
            String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
            String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
            String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
            boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
            String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
            boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
            String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
            boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
            connection = ConnectionUtil.getInputConnection(configuration);
            byte[][] splitKeysBeforeJob = null;
            boolean isLocalIndexBuild = false;
            PTable pindexTable = null;
            if (indexTable != null) {
                if (!this.isValidIndexTable(connection, qDataTable, indexTable)) {
                    throw new IllegalArgumentException(String.format(" %s is not an index table for %s ", indexTable, qDataTable));
                }
                pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
                htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices().getTable(pindexTable.getPhysicalName().getBytes());
                if (PTable.IndexType.LOCAL.equals((Object)pindexTable.getIndexType())) {
                    isLocalIndexBuild = true;
                    splitKeysBeforeJob = htable.getRegionLocator().getStartKeys();
                }
            }
            PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
            Path outputPath = null;
            FileSystem fs = null;
            if (basePath != null) {
                outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null ? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString());
                fs = outputPath.getFileSystem(configuration);
                fs.delete(outputPath, true);
            }
            Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, useDirectApi, isPartialBuild, useSnapshot);
            if (!isForeground && useDirectApi) {
                LOG.info("Running Index Build in Background - Submit async and exit");
                job.submit();
                int n = 0;
                return n;
            }
            LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!.");
            boolean result = job.waitForCompletion(true);
            if (result) {
                if (!useDirectApi && indexTable != null) {
                    if (isLocalIndexBuild) {
                        this.validateSplitForLocalIndex(splitKeysBeforeJob, htable);
                    }
                    LOG.info("Loading HFiles from {}", (Object)outputPath);
                    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
                    loader.doBulkLoad(outputPath, htable);
                    htable.close();
                    IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE);
                    fs.delete(outputPath, true);
                }
                int n = 0;
                return n;
            }
            LOG.error("IndexTool job failed! Check logs for errors..");
            int n = -1;
            return n;
        }
        catch (Exception ex) {
            LOG.error("An exception occurred while performing the indexing job: " + ExceptionUtils.getMessage((Throwable)ex) + " at:\n" + ExceptionUtils.getStackTrace((Throwable)ex));
            int n = -1;
            return n;
        }
        finally {
            try {
                if (connection != null) {
                    connection.close();
                }
                if (htable != null) {
                    htable.close();
                }
            }
            catch (SQLException sqle) {
                LOG.error("Failed to close connection ", (Object)sqle.getMessage());
                throw new RuntimeException("Failed to close connection");
            }
        }
    }

    private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, HTable htable) throws Exception {
        if (splitKeysBeforeJob != null && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator().getStartKeys())) {
            String errMsg = "The index to build is local index and the split keys are not matching before and after running the job. Please rerun the job otherwise there may be inconsistencies between actual data and index data";
            LOG.error(errMsg);
            throw new Exception(errMsg);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isValidIndexTable(Connection connection, String masterTable, String indexTable) throws SQLException {
        DatabaseMetaData dbMetaData = connection.getMetaData();
        String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
        String tableName = SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable));
        try (ResultSet rs = null;){
            rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false);
            while (rs.next()) {
                String indexName = rs.getString(6);
                if (!indexTable.equalsIgnoreCase(indexName)) continue;
                boolean bl = true;
                return bl;
            }
        }
        return false;
    }

    public static void main(String[] args) throws Exception {
        int result = ToolRunner.run((Tool)new IndexTool(), (String[])args);
        System.exit(result);
    }

    class JobFactory {
        Connection connection;
        Configuration configuration;
        private Path outputPath;
        private FileSystem fs;

        public JobFactory(Connection connection, Configuration configuration, Path outputPath) {
            this.connection = connection;
            this.configuration = configuration;
            this.outputPath = outputPath;
        }

        public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild, boolean useSnapshot) throws Exception {
            if (isPartialBuild) {
                return this.configureJobForPartialBuild(schemaName, dataTable);
            }
            return this.configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot);
        }

        private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception {
            String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
            PTable pdataTable = PhoenixRuntime.getTable(this.connection, qDataTable);
            this.connection = ConnectionUtil.getInputConnection(this.configuration);
            long minDisableTimestamp = Long.MAX_VALUE;
            PTable indexWithMinDisableTimestamp = null;
            ArrayList<String> disableIndexes = new ArrayList<String>();
            ArrayList<PTable> disabledPIndexes = new ArrayList<PTable>();
            for (PTable index : pdataTable.getIndexes()) {
                if (!index.getIndexState().equals((Object)PIndexState.BUILDING)) continue;
                disableIndexes.add(index.getTableName().getString());
                disabledPIndexes.add(index);
                long indexDisableTimestamp = Math.abs(index.getIndexDisableTimestamp());
                if (minDisableTimestamp <= indexDisableTimestamp) continue;
                minDisableTimestamp = indexDisableTimestamp;
                indexWithMinDisableTimestamp = index;
            }
            if (indexWithMinDisableTimestamp == null) {
                throw new Exception("There is no index for a datatable to be rebuild:" + qDataTable);
            }
            if (minDisableTimestamp == 0L) {
                throw new Exception("It seems Index " + indexWithMinDisableTimestamp + " has disable timestamp as 0 , please run IndexTool with IndexName to build it first");
            }
            long maxTimestamp = this.getMaxRebuildAsyncDate(schemaName, disableIndexes);
            ArrayList maintainers = Lists.newArrayListWithExpectedSize((int)disabledPIndexes.size());
            for (PTable index : disabledPIndexes) {
                maintainers.add(index.getIndexMaintainer(pdataTable, this.connection.unwrap(PhoenixConnection.class)));
            }
            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
            IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, this.connection.unwrap(PhoenixConnection.class));
            PhoenixConfigurationUtil.setIndexMaintainers(this.configuration, indexMetaDataPtr);
            Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
            scan.setTimeRange(minDisableTimestamp - 1L, maxTimestamp);
            scan.setRaw(true);
            scan.setCacheBlocks(false);
            if (pdataTable.isTransactional()) {
                long maxTimeRange = pdataTable.getTimeStamp() + 1L;
                scan.setAttribute("_TxScn", Bytes.toBytes((long)Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)))));
            }
            String physicalTableName = pdataTable.getPhysicalName().getString();
            String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString(), new Object[0]);
            PhoenixConfigurationUtil.setInputTableName(this.configuration, qDataTable);
            PhoenixConfigurationUtil.setPhysicalTableName(this.configuration, physicalTableName);
            PhoenixConfigurationUtil.setDisableIndexes(this.configuration, StringUtils.join((CharSequence)",", disableIndexes));
            Job job = Job.getInstance((Configuration)this.configuration, (String)jobName);
            if (this.outputPath != null) {
                FileOutputFormat.setOutputPath((Job)job, (Path)this.outputPath);
            }
            job.setJarByClass(IndexTool.class);
            TableMapReduceUtil.initTableMapperJob((String)physicalTableName, (Scan)scan, PhoenixIndexPartialBuildMapper.class, null, null, (Job)job);
            TableMapReduceUtil.initCredentials((Job)job);
            TableInputFormat.configureSplitTable((Job)job, (TableName)TableName.valueOf((String)physicalTableName));
            return this.configureSubmittableJobUsingDirectApi(job, true);
        }

        private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException {
            Long maxRebuilAsyncDate = Long.MAX_VALUE;
            Long maxDisabledTimeStamp = 0L;
            if (disableIndexes == null || disableIndexes.isEmpty()) {
                return 0L;
            }
            ArrayList<String> quotedIndexes = new ArrayList<String>(disableIndexes.size());
            for (String index : disableIndexes) {
                quotedIndexes.add("'" + index + "'");
            }
            ResultSet rs = this.connection.createStatement().executeQuery("SELECT MAX(ASYNC_REBUILD_TIMESTAMP),MAX(INDEX_DISABLE_TIMESTAMP) FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " (" + "ASYNC_REBUILD_TIMESTAMP" + " BIGINT) WHERE " + "TABLE_SCHEM" + (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL") + " and " + "TABLE_NAME" + " IN (" + StringUtils.join((CharSequence)",", quotedIndexes) + ")");
            if (rs.next()) {
                maxRebuilAsyncDate = rs.getLong(1);
                maxDisabledTimeStamp = rs.getLong(2);
            }
            if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
                return maxRebuilAsyncDate;
            }
            throw new RuntimeException("Inconsistent state we have one or more index tables which are disabled after the async is called!!");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private Job configureJobForAysncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot) throws Exception {
            String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
            String qIndexTable = schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable;
            PTable pdataTable = PhoenixRuntime.getTable(this.connection, qDataTable);
            PTable pindexTable = PhoenixRuntime.getTable(this.connection, qIndexTable);
            long maxTimeRange = pindexTable.getTimeStamp() + 1L;
            if (pdataTable.isTransactional()) {
                this.configuration.set("phoenix.mr.txscn.value", Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
            }
            this.configuration.set("phoenix.mr.currentscn.value", Long.toString(maxTimeRange));
            String physicalIndexTable = pindexTable.getPhysicalName().getString();
            PhoenixConnection pConnection = this.connection.unwrap(PhoenixConnection.class);
            PostIndexDDLCompiler ddlCompiler = new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable));
            ddlCompiler.compile(pindexTable);
            List<String> indexColumns = ddlCompiler.getIndexColumnNames();
            String selectQuery = ddlCompiler.getSelectQuery();
            String upsertQuery = QueryUtil.constructUpsertStatement(qIndexTable, indexColumns, HintNode.Hint.NO_INDEX);
            this.configuration.set("phoenix.upsert.stmt", upsertQuery);
            PhoenixConfigurationUtil.setPhysicalTableName(this.configuration, physicalIndexTable);
            PhoenixConfigurationUtil.setDisableIndexes(this.configuration, indexTable);
            PhoenixConfigurationUtil.setUpsertColumnNames(this.configuration, indexColumns.toArray(new String[indexColumns.size()]));
            List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(this.connection, qIndexTable, indexColumns);
            ColumnInfoToStringEncoderDecoder.encode(this.configuration, columnMetadataList);
            this.fs = this.outputPath.getFileSystem(this.configuration);
            this.fs.delete(this.outputPath, true);
            String jobName = String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, pdataTable.getName().toString(), indexTable);
            Job job = Job.getInstance((Configuration)this.configuration, (String)jobName);
            job.setJarByClass(IndexTool.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            FileOutputFormat.setOutputPath((Job)job, (Path)this.outputPath);
            if (!useSnapshot) {
                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery);
            } else {
                String snapshotName;
                try (HBaseAdmin admin = null;){
                    admin = pConnection.getQueryServices().getAdmin();
                    String pdataTableName = pdataTable.getName().getString();
                    snapshotName = pdataTableName + "-Snapshot";
                    admin.snapshot(snapshotName, TableName.valueOf((String)pdataTableName));
                }
                Path rootDir = new Path("hdfs:///index-snapshot-dir");
                FSUtils.setRootDir((Configuration)this.configuration, (Path)rootDir);
                Path restoreDir = new Path(FSUtils.getRootDir((Configuration)this.configuration), "restore-dir");
                PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery);
            }
            TableMapReduceUtil.initCredentials((Job)job);
            if (useDirectApi) {
                return this.configureSubmittableJobUsingDirectApi(job, false);
            }
            return this.configureRunnableJobUsingBulkLoad(job, this.outputPath);
        }

        private Job configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception {
            job.setMapperClass(PhoenixIndexImportMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
            Configuration configuration = job.getConfiguration();
            String physicalIndexTable = PhoenixConfigurationUtil.getPhysicalTableName(configuration);
            HTable htable = new HTable(configuration, physicalIndexTable);
            HFileOutputFormat.configureIncrementalLoad((Job)job, (HTable)htable);
            return job;
        }

        private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild) throws Exception {
            if (!isPartialRebuild) {
                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
            }
            job.setReducerClass(PhoenixIndexImportDirectReducer.class);
            Configuration conf = job.getConfiguration();
            HBaseConfiguration.merge((Configuration)conf, (Configuration)HBaseConfiguration.create((Configuration)conf));
            conf.set("hbase.mapred.outputtable", PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(NullWritable.class);
            TableMapReduceUtil.addDependencyJars((Job)job);
            job.setNumReduceTasks(1);
            return job;
        }
    }
}

