/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.kite;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.net.URI;
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.kite.DatasetSinkConstants;
import org.apache.flume.sink.kite.NonRecoverableEventException;
import org.apache.flume.sink.kite.parser.EntityParser;
import org.apache.flume.sink.kite.parser.EntityParserFactory;
import org.apache.flume.sink.kite.policy.FailurePolicy;
import org.apache.flume.sink.kite.policy.FailurePolicyFactory;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatasetSink
extends AbstractSink
implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(DatasetSink.class);
    private Context context = null;
    private PrivilegedExecutor privilegedExecutor;
    private String datasetName = null;
    private URI datasetUri = null;
    private Schema datasetSchema = null;
    private DatasetWriter<GenericRecord> writer = null;
    private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE;
    private int rollIntervalSeconds = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL;
    private boolean commitOnBatch = DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH;
    private boolean syncOnBatch = DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH;
    private long lastRolledMillis = 0L;
    private long bytesParsed = 0L;
    private EntityParser<GenericRecord> parser = null;
    private FailurePolicy failurePolicy = null;
    private SinkCounter counter = null;
    private GenericRecord entity = null;
    private boolean reuseEntity = true;
    private Transaction transaction = null;
    private boolean committedBatch = false;
    private static final EntityParserFactory ENTITY_PARSER_FACTORY = new EntityParserFactory();
    private static final FailurePolicyFactory FAILURE_POLICY_FACTORY = new FailurePolicyFactory();

    protected List<String> allowedFormats() {
        return Lists.newArrayList((Object[])new String[]{"avro", "parquet"});
    }

    public void configure(Context context) {
        this.context = context;
        String principal = context.getString("auth.kerberosPrincipal");
        String keytab = context.getString("auth.kerberosKeytab");
        String effectiveUser = context.getString("auth.proxyUser");
        this.privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator((String)principal, (String)keytab).proxyAs(effectiveUser);
        String datasetURI = context.getString("kite.dataset.uri");
        if (datasetURI != null) {
            this.datasetUri = URI.create(datasetURI);
            this.datasetName = DatasetSink.uriToName(this.datasetUri);
        } else {
            String repositoryURI = context.getString("kite.repo.uri");
            Preconditions.checkNotNull((Object)repositoryURI, (Object)"No dataset configured. Setting kite.dataset.uri is required.");
            this.datasetName = context.getString("kite.dataset.name");
            Preconditions.checkNotNull((Object)this.datasetName, (Object)"No dataset configured. Setting kite.dataset.uri is required.");
            String namespace = context.getString("kite.dataset.namespace", "default");
            this.datasetUri = new URIBuilder(repositoryURI, namespace, this.datasetName).build();
        }
        this.setName(this.datasetUri.toString());
        if (context.getBoolean("kite.syncable.syncOnBatch", Boolean.valueOf(DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH)).booleanValue()) {
            Preconditions.checkArgument((boolean)context.getBoolean("kite.flushable.commiteOnBatch", Boolean.valueOf(DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH)), (Object)"Configuration error: kite.flushable.commiteOnBatch must be set to true when kite.syncable.syncOnBatch is set to true.");
        }
        this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(context);
        this.batchSize = context.getLong("kite.batchSize", Long.valueOf(DatasetSinkConstants.DEFAULT_BATCH_SIZE));
        this.rollIntervalSeconds = context.getInteger("kite.rollInterval", Integer.valueOf(DatasetSinkConstants.DEFAULT_ROLL_INTERVAL));
        this.counter = new SinkCounter(this.datasetName);
    }

    public synchronized void start() {
        this.lastRolledMillis = System.currentTimeMillis();
        this.counter.start();
        LOG.info("Started DatasetSink " + this.getName());
        super.start();
    }

    @VisibleForTesting
    void roll() {
        this.lastRolledMillis = 0L;
    }

    @VisibleForTesting
    DatasetWriter<GenericRecord> getWriter() {
        return this.writer;
    }

    @VisibleForTesting
    void setWriter(DatasetWriter<GenericRecord> writer) {
        this.writer = writer;
    }

    @VisibleForTesting
    void setParser(EntityParser<GenericRecord> parser) {
        this.parser = parser;
    }

    @VisibleForTesting
    void setFailurePolicy(FailurePolicy failurePolicy) {
        this.failurePolicy = failurePolicy;
    }

    public synchronized void stop() {
        this.counter.stop();
        try {
            this.closeWriter();
            this.commitTransaction();
        }
        catch (EventDeliveryException ex) {
            this.rollbackTransaction();
            LOG.warn("Closing the writer failed: " + ex.getLocalizedMessage());
            LOG.debug("Exception follows.", (Throwable)ex);
        }
        LOG.info("Stopped dataset sink: " + this.getName());
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        long processedEvents = 0L;
        try {
            Event event;
            if (this.shouldRoll()) {
                this.closeWriter();
                this.commitTransaction();
                this.createWriter();
            }
            Preconditions.checkNotNull(this.writer, (Object)"Can't process events with a null writer. This is likely a bug.");
            Channel channel = this.getChannel();
            this.enterTransaction(channel);
            while (processedEvents < this.batchSize && (event = channel.take()) != null) {
                this.write(event);
                ++processedEvents;
            }
            if (this.commitOnBatch) {
                if (this.syncOnBatch && this.writer instanceof Syncable) {
                    ((Syncable)this.writer).sync();
                } else if (this.writer instanceof Flushable) {
                    ((Flushable)this.writer).flush();
                }
                boolean committed = this.commitTransaction();
                Preconditions.checkState((boolean)committed, (Object)"Tried to commit a batch when there was no transaction");
                this.committedBatch |= committed;
            }
        }
        catch (Throwable th) {
            this.rollbackTransaction();
            if (this.commitOnBatch && this.committedBatch) {
                try {
                    this.closeWriter();
                }
                catch (EventDeliveryException ex) {
                    LOG.warn("Error closing writer there may be temp files that need to be manually recovered: " + ex.getLocalizedMessage());
                    LOG.debug("Exception follows.", (Throwable)ex);
                }
            } else {
                this.writer = null;
            }
            Throwables.propagateIfInstanceOf((Throwable)th, Error.class);
            Throwables.propagateIfInstanceOf((Throwable)th, EventDeliveryException.class);
            throw new EventDeliveryException(th);
        }
        if (processedEvents == 0L) {
            this.counter.incrementBatchEmptyCount();
            return Sink.Status.BACKOFF;
        }
        if (processedEvents < this.batchSize) {
            this.counter.incrementBatchUnderflowCount();
        } else {
            this.counter.incrementBatchCompleteCount();
        }
        this.counter.addToEventDrainSuccessCount(processedEvents);
        return Sink.Status.READY;
    }

    @VisibleForTesting
    void write(Event event) throws EventDeliveryException {
        try {
            this.entity = this.parser.parse(event, this.reuseEntity ? this.entity : null);
            this.bytesParsed += (long)event.getBody().length;
            this.writer.write((Object)this.entity);
        }
        catch (NonRecoverableEventException ex) {
            this.failurePolicy.handle(event, ex);
        }
        catch (DataFileWriter.AppendWriteException ex) {
            this.failurePolicy.handle(event, ex);
        }
        catch (RuntimeException ex) {
            Throwables.propagateIfInstanceOf((Throwable)ex, EventDeliveryException.class);
            throw new EventDeliveryException((Throwable)ex);
        }
    }

    @VisibleForTesting
    void createWriter() throws EventDeliveryException {
        this.committedBatch = false;
        try {
            View view = (View)this.privilegedExecutor.execute((PrivilegedAction)new PrivilegedAction<Dataset<GenericRecord>>(){

                @Override
                public Dataset<GenericRecord> run() {
                    return (Dataset)Datasets.load((URI)DatasetSink.this.datasetUri);
                }
            });
            DatasetDescriptor descriptor = view.getDataset().getDescriptor();
            Format format = descriptor.getFormat();
            Preconditions.checkArgument((boolean)this.allowedFormats().contains(format.getName()), (Object)("Unsupported format: " + format.getName()));
            Schema newSchema = descriptor.getSchema();
            if (this.datasetSchema == null || !newSchema.equals((Object)this.datasetSchema)) {
                this.datasetSchema = descriptor.getSchema();
                this.parser = ENTITY_PARSER_FACTORY.newParser(this.datasetSchema, this.context);
            }
            this.reuseEntity = !Formats.PARQUET.equals((Object)format);
            this.commitOnBatch = this.context.getBoolean("kite.flushable.commiteOnBatch", Boolean.valueOf(DatasetSinkConstants.DEFAULT_FLUSHABLE_COMMIT_ON_BATCH)) != false && Formats.AVRO.equals((Object)format);
            this.syncOnBatch = this.context.getBoolean("kite.syncable.syncOnBatch", Boolean.valueOf(DatasetSinkConstants.DEFAULT_SYNCABLE_SYNC_ON_BATCH)) != false && Formats.AVRO.equals((Object)format);
            this.datasetName = view.getDataset().getName();
            this.writer = view.newWriter();
            this.lastRolledMillis = System.currentTimeMillis();
            this.bytesParsed = 0L;
        }
        catch (DatasetNotFoundException ex) {
            throw new EventDeliveryException("Dataset " + this.datasetUri + " not found." + " The dataset must be created before Flume can write to it.", (Throwable)ex);
        }
        catch (RuntimeException ex) {
            throw new EventDeliveryException("Error trying to open a new writer for dataset " + this.datasetUri, (Throwable)ex);
        }
    }

    private boolean shouldRoll() {
        long currentTimeMillis = System.currentTimeMillis();
        long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - this.lastRolledMillis);
        LOG.debug("Current time: {}, lastRolled: {}, diff: {} sec", new Object[]{currentTimeMillis, this.lastRolledMillis, elapsedTimeSeconds});
        return elapsedTimeSeconds >= (long)this.rollIntervalSeconds || this.writer == null;
    }

    @VisibleForTesting
    void closeWriter() throws EventDeliveryException {
        if (this.writer != null) {
            try {
                this.writer.close();
                long elapsedTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - this.lastRolledMillis);
                LOG.info("Closed writer for {} after {} seconds and {} bytes parsed", new Object[]{this.datasetUri, elapsedTimeSeconds, this.bytesParsed});
            }
            catch (DatasetIOException ex) {
                throw new EventDeliveryException("Check HDFS permissions/health. IO error trying to close the  writer for dataset " + this.datasetUri, (Throwable)ex);
            }
            catch (RuntimeException ex) {
                throw new EventDeliveryException("Error trying to close the  writer for dataset " + this.datasetUri, (Throwable)ex);
            }
            finally {
                this.writer = null;
                this.failurePolicy.close();
            }
        }
    }

    private void enterTransaction(Channel channel) throws EventDeliveryException {
        if (this.transaction == null) {
            this.transaction = channel.getTransaction();
            this.transaction.begin();
            this.failurePolicy = FAILURE_POLICY_FACTORY.newPolicy(this.context);
        }
    }

    @VisibleForTesting
    boolean commitTransaction() throws EventDeliveryException {
        if (this.transaction != null) {
            this.failurePolicy.sync();
            this.transaction.commit();
            this.transaction.close();
            this.transaction = null;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void rollbackTransaction() {
        if (this.transaction != null) {
            try {
                this.transaction.rollback();
            }
            catch (RuntimeException ex) {
                LOG.error("Transaction rollback failed: " + ex.getLocalizedMessage());
                LOG.debug("Exception follows.", (Throwable)ex);
            }
            finally {
                this.transaction.close();
                this.transaction = null;
            }
        }
    }

    private static String uriToName(URI uri) {
        return (String)((Map)Registration.lookupDatasetUri((URI)URI.create(uri.getRawSchemeSpecificPart())).second()).get("dataset");
    }
}

