/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hbase.index;

import com.google.common.collect.Multimap;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
import org.apache.phoenix.hbase.index.builder.IndexBuilder;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.VersionUtil;
import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.ServerUtil;

public class Indexer
extends BaseRegionObserver {
    private static final Log LOG = LogFactory.getLog(Indexer.class);
    protected IndexWriter writer;
    protected IndexBuildManager builder;
    private RegionCoprocessorEnvironment environment;
    public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
    public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
    private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
    private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
    private IndexWriter recoveryWriter;
    private boolean stopped;
    private boolean disabled;
    public static final String RecoveryFailurePolicyKeyForTesting = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
    public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil.encodeMaxPatchVersion(0, 94);
    public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil.encodeVersion("0.94.0");
    private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil.encodeVersion("0.94.9");
    private static final OperationStatus SUCCESS = new OperationStatus(HConstants.OperationStatusCode.SUCCESS);

    public void start(CoprocessorEnvironment e) throws IOException {
        try {
            String errormsg;
            RegionCoprocessorEnvironment env;
            this.environment = env = (RegionCoprocessorEnvironment)e;
            env.getConfiguration().setClass("hbase.rpc.controllerfactory.class", ServerRpcControllerFactory.class, RpcControllerFactory.class);
            String serverName = env.getRegionServerServices().getServerName().getServerName();
            if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true) && (errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration())) != null) {
                IOException ioe = new IOException(errormsg);
                env.getRegionServerServices().abort(errormsg, (Throwable)ioe);
                throw ioe;
            }
            this.builder = new IndexBuildManager(env);
            this.writer = new IndexWriter(env, serverName + "-index-writer");
            try {
                Class policyClass = env.getConfiguration().getClass("org.apache.hadoop.hbase.index.recovery.failurepolicy", StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
                IndexFailurePolicy policy = (IndexFailurePolicy)policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(this.failedIndexEdits);
                LOG.debug((Object)("Setting up recovery writter with failure policy: " + policy.getClass()));
                this.recoveryWriter = new RecoveryIndexWriter(policy, env, serverName + "-recovery-writer");
            }
            catch (Exception ex) {
                throw new IOException("Could not instantiate recovery failure policy!", ex);
            }
        }
        catch (NoSuchMethodError ex) {
            this.disabled = true;
            super.start(e);
            LOG.error((Object)"Must be too early a version of HBase. Disabled coprocessor ", (Throwable)ex);
        }
    }

    public void stop(CoprocessorEnvironment e) throws IOException {
        if (this.stopped) {
            return;
        }
        if (this.disabled) {
            super.stop(e);
            return;
        }
        this.stopped = true;
        String msg = "Indexer is being stopped";
        this.builder.stop(msg);
        this.writer.stop(msg);
        this.recoveryWriter.stop(msg);
    }

    public Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> e, Increment inc) throws IOException {
        try {
            List<Mutation> mutations = this.builder.executeAtomicOp(inc);
            if (mutations == null) {
                return null;
            }
            e.bypass();
            e.complete();
            if (!mutations.isEmpty()) {
                Region region = ((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion();
                region.mutateRowsWithLocks(mutations, Collections.emptyList(), 0L, 0L);
            }
            return Result.EMPTY_RESULT;
        }
        catch (Throwable t) {
            throw ServerUtil.createIOException("Unable to process ON DUPLICATE IGNORE for " + ((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion().getRegionInfo().getTable().getNameAsString() + "(" + Bytes.toStringBinary((byte[])inc.getRow()) + ")", t);
        }
    }

    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        if (this.disabled) {
            super.preBatchMutate(c, miniBatchOp);
            return;
        }
        try {
            this.preBatchMutateWithExceptions(c, miniBatchOp);
            return;
        }
        catch (Throwable t) {
            IndexManagementUtil.rethrowIndexingException(t);
            throw new RuntimeException("Somehow didn't return an index update but also didn't propagate the failure to the client!");
        }
    }

    public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
        HashMap<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
        Durability defaultDurability = Durability.SYNC_WAL;
        if (((RegionCoprocessorEnvironment)c.getEnvironment()).getRegion() != null) {
            defaultDurability = ((RegionCoprocessorEnvironment)c.getEnvironment()).getRegion().getTableDesc().getDurability();
            defaultDurability = defaultDurability == Durability.USE_DEFAULT ? Durability.SYNC_WAL : defaultDurability;
        }
        Durability durability = Durability.SKIP_WAL;
        for (int i = 0; i < miniBatchOp.size(); ++i) {
            ImmutableBytesPtr row;
            MultiMutation stored;
            Durability effectiveDurablity;
            Mutation m = (Mutation)miniBatchOp.getOperation(i);
            if (this.builder.isAtomicOp(m)) {
                miniBatchOp.setOperationStatus(i, SUCCESS);
                continue;
            }
            if (!this.builder.isEnabled(m)) continue;
            Durability durability2 = effectiveDurablity = m.getDurability() == Durability.USE_DEFAULT ? defaultDurability : m.getDurability();
            if (effectiveDurablity.ordinal() > durability.ordinal()) {
                durability = effectiveDurablity;
            }
            if ((stored = (MultiMutation)((Object)mutations.get((Object)(row = new ImmutableBytesPtr(m.getRow()))))) == null) {
                stored = new MultiMutation(row);
                mutations.put(row, stored);
            }
            stored.addAll(m);
        }
        if (mutations.isEmpty()) {
            return;
        }
        WALEdit edit = miniBatchOp.getWalEdit(0);
        if (edit == null) {
            edit = new WALEdit();
            miniBatchOp.setWalEdit(0, edit);
        }
        try (TraceScope scope = Trace.startSpan((String)"Starting to build index updates");){
            Span current = scope.getSpan();
            if (current == null) {
                current = NullSpan.INSTANCE;
            }
            Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations.values());
            current.addTimelineAnnotation("Built index updates, doing preStep");
            TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
            this.doPre(indexUpdates, edit, durability);
        }
    }

    private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, WALEdit edit, Durability durability) throws IOException {
        if (indexUpdates == null || indexUpdates.size() == 0) {
            return false;
        }
        if (durability == Durability.SKIP_WAL) {
            try {
                this.writer.write(indexUpdates, false);
                return false;
            }
            catch (Throwable e) {
                LOG.error((Object)("Failed to update index with entries:" + indexUpdates), e);
                IndexManagementUtil.rethrowIndexingException(e);
            }
        }
        for (Pair<Mutation, byte[]> entry : indexUpdates) {
            edit.add((Cell)new IndexedKeyValue((byte[])entry.getSecond(), (Mutation)entry.getFirst()));
        }
        return true;
    }

    public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean success) throws IOException {
        if (this.disabled) {
            super.postBatchMutateIndispensably(c, miniBatchOp, success);
            return;
        }
        this.builder.batchCompleted(miniBatchOp);
        if (success) {
            Mutation mutation = (Mutation)miniBatchOp.getOperation(0);
            WALEdit edit = miniBatchOp.getWalEdit(0);
            this.doPost(edit, mutation, mutation.getDurability());
        }
    }

    private void doPost(WALEdit edit, Mutation m, Durability durability) throws IOException {
        try {
            this.doPostWithExceptions(edit, m, durability);
            return;
        }
        catch (Throwable e) {
            IndexManagementUtil.rethrowIndexingException(e);
            throw new RuntimeException("Somehow didn't complete the index update, but didn't return succesfully either!");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doPostWithExceptions(WALEdit edit, Mutation m, Durability durability) throws Exception {
        block25: {
            if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
                return;
            }
            try (TraceScope scope = Trace.startSpan((String)"Completing index writes");){
                IndexedKeyValue ikv;
                Span current = scope.getSpan();
                if (current == null) {
                    current = NullSpan.INSTANCE;
                }
                if ((ikv = this.getFirstIndexedKeyValue(edit)) == null) {
                    return;
                }
                if (ikv.getBatchFinished()) break block25;
                Collection<Pair<Mutation, byte[]>> indexUpdates = this.extractIndexUpdate(edit);
                try {
                    current.addTimelineAnnotation("Actually doing index update for first time");
                    ArrayList<Pair<Mutation, byte[]>> localUpdates = new ArrayList<Pair<Mutation, byte[]>>();
                    ArrayList<Pair<Mutation, byte[]>> remoteUpdates = new ArrayList<Pair<Mutation, byte[]>>();
                    for (Pair<Mutation, byte[]> mutation : indexUpdates) {
                        if (Bytes.toString((byte[])((byte[])mutation.getSecond())).equals(this.environment.getRegion().getTableDesc().getNameAsString())) {
                            localUpdates.add(mutation);
                            continue;
                        }
                        remoteUpdates.add(mutation);
                    }
                    if (!remoteUpdates.isEmpty()) {
                        this.writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
                    }
                    if (!localUpdates.isEmpty()) {
                        this.writer.writeAndKillYourselfOnFailure(localUpdates, true);
                    }
                }
                finally {
                    ikv.markBatchFinished();
                }
            }
        }
    }

    private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
        for (Cell kv : edit.getCells()) {
            if (!(kv instanceof IndexedKeyValue)) continue;
            return (IndexedKeyValue)kv;
        }
        return null;
    }

    private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
        ArrayList<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
        for (Cell kv : edit.getCells()) {
            if (!(kv instanceof IndexedKeyValue)) continue;
            IndexedKeyValue ikv = (IndexedKeyValue)kv;
            indexUpdates.add((Pair<Mutation, byte[]>)new Pair((Object)ikv.getMutation(), (Object)ikv.getIndexTable()));
        }
        return indexUpdates;
    }

    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
        Multimap<HTableInterfaceReference, Mutation> updates = this.failedIndexEdits.getEdits(((RegionCoprocessorEnvironment)c.getEnvironment()).getRegion());
        if (this.disabled) {
            super.postOpen(c);
            return;
        }
        if (updates == null || updates.size() == 0) {
            return;
        }
        LOG.info((Object)"Found some outstanding index updates that didn't succeed during WAL replay - attempting to replay now.");
        try {
            this.writer.writeAndKillYourselfOnFailure(updates, true);
        }
        catch (IOException e) {
            LOG.error((Object)"During WAL replay of outstanding index updates, Exception is thrown instead of killing server during index writing", (Throwable)e);
        }
    }

    public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
        if (this.disabled) {
            super.preWALRestore(env, info, logKey, logEdit);
            return;
        }
        Collection<Pair<Mutation, byte[]>> indexUpdates = this.extractIndexUpdate(logEdit);
        this.recoveryWriter.write(indexUpdates, true);
    }

    public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s) throws IOException {
        return (InternalScanner)User.runAsLoginUser((PrivilegedExceptionAction)new PrivilegedExceptionAction<InternalScanner>(){

            @Override
            public InternalScanner run() throws Exception {
                return Indexer.super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
            }
        });
    }

    public IndexBuilder getBuilderForTesting() {
        return this.builder.getBuilderForTesting();
    }

    public static String validateVersion(String hbaseVersion, Configuration conf) {
        int encodedVersion = VersionUtil.encodeVersion(hbaseVersion);
        if (encodedVersion > INDEXING_SUPPORTED_MAJOR_VERSION) {
            return null;
        }
        if (encodedVersion < INDEXING_SUPPORTED__MIN_MAJOR_VERSION) {
            return "Indexing not supported for versions older than 0.94.X";
        }
        if (encodedVersion < INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION && conf.getBoolean("hbase.regionserver.wal.enablecompression", false)) {
            return "Indexing not supported with WAL Compression for versions of HBase older than 0.94.9 - found version:" + hbaseVersion;
        }
        return null;
    }

    public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder, Map<String, String> properties, int priority) throws IOException {
        if (properties == null) {
            properties = new HashMap<String, String>();
        }
        properties.put(INDEX_BUILDER_CONF_KEY, builder.getName());
        desc.addCoprocessor(Indexer.class.getName(), null, priority, properties);
    }
}

