/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.cat.message.storage.clean;

import com.dianping.cat.Cat;
import com.dianping.cat.config.server.ServerConfigManager;
import com.dianping.cat.message.Transaction;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.AccessControlException;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.cat.message.storage.hdfs.HdfsSystemManager;
import org.unidal.helper.Files;
import org.unidal.helper.Formats;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named
public class HdfsUploader
implements LogEnabled,
Initializable {
    @Inject
    private HdfsSystemManager m_fileSystemManager;
    @Inject
    private ServerConfigManager m_serverConfigManager;
    private ThreadPoolExecutor m_executors;
    private File m_localBaseDir;
    private Logger m_logger;

    private void deleteFile(String path) {
        File file = new File(this.m_localBaseDir, path);
        File parent = file.getParentFile();
        file.delete();
        parent.delete();
        parent.getParentFile().delete();
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public void initialize() throws InitializationException {
        int thread = this.m_serverConfigManager.getHdfsUploadThreadsCount();
        this.m_localBaseDir = new File(this.m_serverConfigManager.getHdfsLocalBaseDir("dump"));
        this.m_executors = new ThreadPoolExecutor(thread, thread, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5000), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private FSDataOutputStream makeHdfsOutputStream(String path) throws IOException {
        FSDataOutputStream out;
        FileSystem fs = this.m_fileSystemManager.getFileSystem();
        String baseDir = this.m_fileSystemManager.getBaseDir();
        Path file = new Path(baseDir, path);
        try {
            out = fs.create(file, true);
        }
        catch (RemoteException re) {
            fs.delete(file, false);
            out = fs.create(file);
        }
        catch (AlreadyBeingCreatedException e) {
            fs.delete(file, false);
            out = fs.create(file);
        }
        return out;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean upload(String path, File file) {
        if (!file.exists()) return false;
        Transaction t = Cat.newTransaction((String)"System", (String)"UploadDump");
        t.addData("file", (Object)path);
        FSDataOutputStream fdos = null;
        FileInputStream fis = null;
        try {
            fdos = this.makeHdfsOutputStream(path);
            fis = new FileInputStream(file);
            long start = System.currentTimeMillis();
            Files.forIO().copy((InputStream)fis, (OutputStream)fdos, Files.AutoClose.INPUT_OUTPUT);
            double sec = (double)(System.currentTimeMillis() - start) / 1000.0;
            String size = Formats.forNumber().format((Number)file.length(), "0.#", "B");
            String speed = sec <= 0.0 ? "N/A" : Formats.forNumber().format((Number)((double)file.length() / sec), "0.0", "B/s");
            t.addData("size", (Object)size);
            t.addData("speed", (Object)speed);
            t.setStatus("0");
            this.deleteFile(path);
            boolean bl = true;
            return bl;
        }
        catch (AlreadyBeingCreatedException e) {
            Cat.logError((Throwable)e);
            t.setStatus((Throwable)e);
            this.deleteFile(path);
            this.m_logger.error(String.format("Already being created (%s)!", path), (Throwable)e);
            return false;
        }
        catch (AccessControlException e) {
            Cat.logError((Throwable)e);
            t.setStatus((Throwable)e);
            this.deleteFile(path);
            this.m_logger.error(String.format("No permission to create HDFS file(%s)!", path), (Throwable)e);
            return false;
        }
        catch (Exception e) {
            Cat.logError((Throwable)e);
            t.setStatus((Throwable)e);
            this.m_logger.error(String.format("Uploading file(%s) to HDFS(%s) failed!", file, path), (Throwable)e);
            return false;
        }
        finally {
            try {
                if (fdos != null) {
                    fdos.close();
                }
            }
            catch (Exception e) {
                Cat.logError((Throwable)e);
            }
            finally {
                t.complete();
            }
        }
    }

    public void uploadLogviewFile(String path, File file) {
        try {
            this.m_executors.submit((Runnable)((Object)new Uploader(path, file)));
        }
        catch (Exception e) {
            Cat.logError((Throwable)e);
        }
    }

    public class Uploader
    implements Threads.Task {
        private String m_path;
        private File m_file;

        public Uploader(String path, File file) {
            this.m_path = path;
            this.m_file = file;
        }

        public String getName() {
            return "hdfs-uploader";
        }

        public void run() {
            HdfsUploader.this.upload(this.m_path, this.m_file);
        }

        public void shutdown() {
        }
    }
}

