/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.python;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.functions.UserDefinedFunction;
import org.apache.flink.table.dataformat.BinaryString;
import org.apache.flink.table.errorcode.TableErrors;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.functions.BuildInScalarFunctions;
import org.apache.flink.table.runtime.functions.python.PythonScalarFunction;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.types.InternalType;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonUDFUtil {
    private static final Logger LOG = LoggerFactory.getLogger(PythonUDFUtil.class);
    public static final int PROTOCAL_VER = 1;
    public static final int SCALAR_UDF = 1;
    public static final int SCALAR_UDF_RESULT = 2;
    public static final int PYTHON_EXCEPTION_THROWN = 3;
    public static final int EXIT_STREAM = 4;
    public static final int EXIT_PYTHON_PROCESS = 5;
    public static final String PYFLINK_CACHED_USR_LIB_IDS = "PYFLINK_SQL_CACHED_USR_LIB_IDS";
    public static final String PYFLINK_LIB_ZIP_FILENAME = "python-source.zip";
    public static final String VIRTUALEVN_ZIP_FILENAME = "venv.zip";
    public static final String VIRTUALEVN_FOLDER_NAME = "venv";
    public static final String PYFLINK_SQL_WORKER = "flink.sql.worker";
    public static final String PYFLINK_SQL_REGISTER = "flink.sql.register";
    public static final String PYUDF_PREFIX = "python ";
    public static final String CLUSTER_ENVIRONMENT_MARK = "_FLINK_CONTAINER_ID";
    public static final String TMPDIR_CLEAN_ON_EXIT = "_TMPDIR_CLEAN_ON_EXIT";
    private static File pyWorkingDir;
    private static Process pyProcess;
    private static volatile int pySockPort;
    private static final int RETRY = 2;

    public static Socket createWorkerSocket(FunctionContext ctx) {
        Socket workerSocket = null;
        for (int p = 0; p < 2; ++p) {
            PythonUDFUtil.setUpPythonServerIfNeeded(ctx);
            for (int s = 0; s < 2; ++s) {
                try {
                    workerSocket = new Socket("127.0.0.1", pySockPort);
                    workerSocket.setKeepAlive(true);
                    return workerSocket;
                }
                catch (Exception e2) {
                    String err = TableErrors.INST.sqlPythonCreateSocketError(e2.getMessage());
                    LOG.error(err, (Throwable)e2);
                    continue;
                }
            }
            if (pyProcess != null) {
                pyProcess.destroyForcibly();
                pyProcess = null;
                pySockPort = -1;
            }
            if (p != 2) continue;
            throw new RuntimeException(TableErrors.INST.sqlPythonCreateSocketError("Have tried twice, and can't connect to python. "));
        }
        return workerSocket;
    }

    private static synchronized void setUpPythonServerIfNeeded(FunctionContext ctx) {
        if (pyProcess != null && pyProcess.isAlive()) {
            return;
        }
        ServerSocket serverSocket = null;
        try {
            int port;
            if (pyWorkingDir == null || !pyWorkingDir.exists()) {
                HashMap<String, File> usrFiles = PythonUDFUtil.getUserFilesFromDistributedCache(ctx);
                pyWorkingDir = PythonUDFUtil.preparePythonWorkingDir(usrFiles);
            }
            serverSocket = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"));
            int localPort = serverSocket.getLocalPort();
            pyProcess = PythonUDFUtil.startPythonProcess(pyWorkingDir, PYFLINK_SQL_WORKER, localPort);
            serverSocket.setSoTimeout(10000);
            Socket shakeHandSock = serverSocket.accept();
            DataInputStream in = new DataInputStream(new BufferedInputStream(shakeHandSock.getInputStream()));
            pySockPort = port = in.readInt();
            serverSocket.close();
        }
        catch (Exception e2) {
            String err = TableErrors.INST.sqlPythonProcessError(e2.getMessage());
            LOG.error(err, (Throwable)e2);
            throw new RuntimeException(err, e2);
        }
        finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                }
                catch (IOException e3) {
                    LOG.warn(e3.getMessage(), (Throwable)e3);
                }
            }
        }
    }

    public static Map<String, UserDefinedFunction> registerPyUdfsToTableEnvironment(TableEnvironment tEnv, ArrayList<String> usrFilePaths, Map<String, String> pyUDFs) throws RuntimeException {
        HashMap<String, UserDefinedFunction> pyUdfs = new HashMap<String, UserDefinedFunction>();
        ServerSocket serverSocket = null;
        try {
            HashMap<String, File> usrFiles = new HashMap<String, File>();
            for (String path : usrFilePaths) {
                File f = new File(path);
                usrFiles.put(f.getName(), f);
            }
            File pyWorkDir = PythonUDFUtil.preparePythonWorkingDir(usrFiles);
            serverSocket = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"));
            int localPort = serverSocket.getLocalPort();
            Process regPyProcess = PythonUDFUtil.startPythonProcess(pyWorkDir, PYFLINK_SQL_REGISTER, localPort);
            assert (regPyProcess.isAlive());
            serverSocket.setSoTimeout(1000);
            Socket sock = serverSocket.accept();
            DataOutputStream out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream(), 8192));
            ByteArrayOutputStream buff = new ByteArrayOutputStream();
            DataOutputStream outBuff = new DataOutputStream(buff);
            int nums = pyUDFs.size();
            outBuff.writeShort(nums);
            for (Map.Entry<String, String> entry : pyUDFs.entrySet()) {
                outBuff.writeUTF(entry.getValue());
            }
            out.writeLong(buff.size());
            out.write(buff.toByteArray());
            out.flush();
            DataInputStream in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
            int dataLength = (int)in.readLong();
            byte[] udfTypesData = new byte[dataLength];
            in.read(udfTypesData);
            serverSocket.close();
            serverSocket = null;
            regPyProcess.destroyForcibly();
            ArrayList<InternalType> internalTypes = PythonUDFUtil.parserPythonUdfTypesInfo(udfTypesData);
            assert (internalTypes.size() == pyUDFs.size());
            int idx = 0;
            for (Map.Entry<String, String> entry : pyUDFs.entrySet()) {
                String sqlName = entry.getKey();
                String pyName = entry.getValue();
                PythonScalarFunction scalarFunction = new PythonScalarFunction(sqlName, pyName, internalTypes.get(idx));
                tEnv.registerFunction(sqlName, scalarFunction);
                pyUdfs.put(sqlName, scalarFunction);
                ++idx;
            }
        }
        catch (Exception ex) {
            throw new RuntimeException("Can't compile the python UDFs.", ex);
        }
        finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                }
                catch (Exception exception) {}
            }
        }
        return pyUdfs;
    }

    private static ArrayList<InternalType> parserPythonUdfTypesInfo(byte[] udfTypesData) throws Exception {
        ByteArrayInputStream buff = new ByteArrayInputStream(udfTypesData);
        DataInputStream in = new DataInputStream(buff);
        int nums = in.readShort();
        ArrayList<InternalType> types = new ArrayList<InternalType>();
        ArrayList<String> errorMessages = new ArrayList<String>();
        for (int i = 0; i < nums; ++i) {
            int type = in.read() & 0xFF;
            if (type > 128) {
                errorMessages.add(in.readUTF());
                continue;
            }
            if (type == PythonSerDesTypes.DECIMAL.ordinal()) {
                short precision = in.readShort();
                short scale = in.readShort();
                types.add(DataTypes.createDecimalType(precision, scale));
                continue;
            }
            if (type == PythonSerDesTypes.STRING.ordinal()) {
                types.add(DataTypes.STRING);
                continue;
            }
            if (type == PythonSerDesTypes.BOOLEAN.ordinal()) {
                types.add(DataTypes.BOOLEAN);
                continue;
            }
            if (type == PythonSerDesTypes.SHORT.ordinal()) {
                types.add(DataTypes.SHORT);
                continue;
            }
            if (type == PythonSerDesTypes.BYTE.ordinal()) {
                types.add(DataTypes.BYTE);
                continue;
            }
            if (type == PythonSerDesTypes.INT.ordinal()) {
                types.add(DataTypes.INT);
                continue;
            }
            if (type == PythonSerDesTypes.LONG.ordinal()) {
                types.add(DataTypes.LONG);
                continue;
            }
            if (type == PythonSerDesTypes.FLOAT.ordinal()) {
                types.add(DataTypes.FLOAT);
                continue;
            }
            if (type == PythonSerDesTypes.DOUBLE.ordinal()) {
                types.add(DataTypes.DOUBLE);
                continue;
            }
            if (type == PythonSerDesTypes.BYTES.ordinal()) {
                types.add(DataTypes.BYTE_ARRAY);
                continue;
            }
            if (type == PythonSerDesTypes.DATE.ordinal()) {
                types.add(DataTypes.DATE);
                continue;
            }
            if (type == PythonSerDesTypes.TIME.ordinal()) {
                types.add(DataTypes.TIME);
                continue;
            }
            if (type != PythonSerDesTypes.TIMESTAMP.ordinal()) continue;
            types.add(DataTypes.TIMESTAMP);
        }
        if (errorMessages.size() > 0) {
            StringBuilder sb = new StringBuilder();
            for (String errMsg : errorMessages) {
                sb.append(errMsg);
            }
            throw new RuntimeException(sb.toString());
        }
        return types;
    }

    private static HashMap<String, File> getUserFilesFromDistributedCache(FunctionContext ctx) throws FileNotFoundException {
        HashMap<String, File> usrFiles = new HashMap<String, File>();
        String usrLibIds = ctx.getJobParameter(PYFLINK_CACHED_USR_LIB_IDS, "");
        for (String fileName : usrLibIds.split(",")) {
            File dcFile = ctx.getCachedFile(fileName);
            if (dcFile == null || !dcFile.exists()) {
                throw new FileNotFoundException("User's python lib file " + fileName + " does not exist.");
            }
            usrFiles.put(fileName, dcFile);
        }
        return usrFiles;
    }

    public static String getTmpFilesDir(Map<String, String> environmentVariable) {
        String tmpFilesDirBase = environmentVariable.containsKey(TMPDIR_CLEAN_ON_EXIT) && !environmentVariable.get(TMPDIR_CLEAN_ON_EXIT).isEmpty() ? environmentVariable.get(TMPDIR_CLEAN_ON_EXIT) : (environmentVariable.containsKey(CLUSTER_ENVIRONMENT_MARK) ? System.getProperty("user.dir") : System.getProperty("java.io.tmpdir"));
        if (!tmpFilesDirBase.endsWith(File.separator)) {
            tmpFilesDirBase = tmpFilesDirBase + File.separator;
        }
        return tmpFilesDirBase + "pyflink_tmp_" + UUID.randomUUID();
    }

    private static File preparePythonWorkingDir(Map<String, File> usrFiles) throws IOException {
        String tmpFilesDir = PythonUDFUtil.getTmpFilesDir(System.getenv());
        LOG.info("Using " + tmpFilesDir + " as working dir.");
        Path tmpFileDirPath = new Path(tmpFilesDir);
        FileSystem fs = tmpFileDirPath.getFileSystem();
        if (fs.exists(tmpFileDirPath)) {
            fs.delete(tmpFileDirPath, true);
        }
        tmpFileDirPath.getFileSystem().mkdirs(tmpFileDirPath);
        for (Map.Entry<String, File> entry : usrFiles.entrySet()) {
            if (entry.getKey().endsWith(VIRTUALEVN_ZIP_FILENAME)) {
                PythonUDFUtil.prepareVirtualEnvFiles(entry.getValue().getAbsolutePath(), tmpFileDirPath.toUri().toString());
                continue;
            }
            Path targetFilePath = new Path(tmpFileDirPath, entry.getKey());
            FileCache.copy((Path)new Path(entry.getValue().toURI().toString()), (Path)targetFilePath, (boolean)false);
        }
        String pythonSource = PYFLINK_LIB_ZIP_FILENAME;
        ClassLoader classLoader = PythonOptions.class.getClassLoader();
        InputStream in = classLoader.getResourceAsStream(PYFLINK_LIB_ZIP_FILENAME);
        if (in == null) {
            String err = "Can't extract python library files from resource..";
            LOG.error(err);
            throw new IOException(err);
        }
        File targetFile = new File(tmpFileDirPath.getPath() + File.separator + PYFLINK_LIB_ZIP_FILENAME);
        Files.copy(in, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
        IOUtils.closeQuietly(in);
        File pyWorkDir = new File(tmpFileDirPath.toUri().toString());
        return pyWorkDir;
    }

    private static void prepareVirtualEnvFiles(String venvZipFilePath, String pythonDir) {
        try {
            String pythonExec;
            File pyExec;
            String[] unzipCmd = new String[]{"unzip", "-qq", "-o", venvZipFilePath, "-d", pythonDir};
            ProcessBuilder pb = new ProcessBuilder(new String[0]);
            pb.command(unzipCmd);
            Process p = pb.start();
            PythonUDFUtil.redirectStreamsToStderr(p.getInputStream(), p.getErrorStream());
            p.waitFor(1L, TimeUnit.MINUTES);
            if (!p.isAlive()) {
                p.destroyForcibly();
            }
            if (!(pyExec = new File(pythonExec = pythonDir + "/venv/bin/python")).exists()) {
                throw new RuntimeException("Can not setup virtualenv, try to unzip venv file but '/venv/bin/python' is not in the unzip results !");
            }
        }
        catch (Exception ex) {
            throw new RuntimeException("Can't prepare virtualenv for python, please check your venv.zip. ", ex);
        }
    }

    private static synchronized Process startPythonProcess(File pyWorkDir, String pyWorker, int javaPort) throws Exception {
        String[] fileList;
        String pythonDir;
        File pyflinkZip;
        boolean virtualenvEnabled = false;
        StringBuilder pythonPathEnv = new StringBuilder();
        if (System.getenv("PYTHONPATH") != null) {
            pythonPathEnv.append(System.getenv("PYTHONPATH"));
            pythonPathEnv.append(File.pathSeparator);
        }
        if (!(pyflinkZip = new File(pythonDir = pyWorkDir.getAbsolutePath(), PYFLINK_LIB_ZIP_FILENAME)).exists()) {
            String err = "python-source.zipdoesn't exist!";
            LOG.error(err);
            throw new Exception(err);
        }
        pythonPathEnv.append(pyflinkZip.getAbsolutePath());
        for (String f : fileList = pyWorkDir.list()) {
            if (f.endsWith(".zip") && !f.endsWith(PYFLINK_LIB_ZIP_FILENAME) && !f.endsWith(VIRTUALEVN_ZIP_FILENAME)) {
                File file = new File(pythonDir, f);
                pythonPathEnv.append(File.pathSeparator);
                pythonPathEnv.append(file.getAbsolutePath());
                continue;
            }
            if (!VIRTUALEVN_FOLDER_NAME.equals(f)) continue;
            virtualenvEnabled = true;
        }
        pythonPathEnv.append(File.pathSeparator);
        pythonPathEnv.append(pythonDir);
        String pythonExec = "python";
        if (virtualenvEnabled) {
            pythonExec = pythonDir + "/venv/bin/python";
        }
        String[] commands = new String[]{pythonExec, "-m", pyWorker, " " + javaPort};
        ProcessBuilder pb = new ProcessBuilder(new String[0]);
        Map<String, String> env = pb.environment();
        StringBuilder pathVar = new StringBuilder();
        pathVar.append(pythonDir + "/venv/bin/");
        pathVar.append(File.pathSeparator);
        pathVar.append(env.get("PATH"));
        env.put("PATH", pathVar.toString());
        env.put("PYTHONPATH", pythonPathEnv.toString());
        pb.command(commands);
        Process p = pb.start();
        Tuple2<RedirectThread, RedirectThread> venvResult = PythonUDFUtil.redirectStreamsToStderr(p.getInputStream(), p.getErrorStream());
        p.waitFor(1000L, TimeUnit.MILLISECONDS);
        if (!p.isAlive()) {
            String message = "Can't start python process!";
            if (venvResult != null) {
                message = message + "\n" + ((RedirectThread)venvResult.f0).tailMessage + "\n";
                message = message + ((RedirectThread)venvResult.f1).tailMessage + "\n";
            }
            throw new RuntimeException(message);
        }
        Runtime.getRuntime().addShutdownHook(new ShutDownPythonHook(p, pythonDir));
        return p;
    }

    private static Tuple2<RedirectThread, RedirectThread> redirectStreamsToStderr(InputStream stdout, InputStream stderr) {
        try {
            RedirectThread redirectThread1 = new RedirectThread(stdout, System.err);
            redirectThread1.start();
            RedirectThread redirectThread2 = new RedirectThread(stderr, System.err);
            redirectThread2.start();
            return new Tuple2<RedirectThread, RedirectThread>(redirectThread1, redirectThread2);
        }
        catch (Exception ex) {
            LOG.warn(ex.getMessage(), (Throwable)ex);
            return null;
        }
    }

    public static void sendCallRequest(String pyFunctionName, DataOutputStream out, Object ... args) throws IOException {
        ByteArrayOutputStream cmdBuff = new ByteArrayOutputStream();
        ByteArrayOutputStream argsTypesBuff = new ByteArrayOutputStream();
        ByteArrayOutputStream argsDataBuff = new ByteArrayOutputStream();
        DataOutputStream cmdOut = new DataOutputStream(cmdBuff);
        DataOutputStream argsTypesOut = new DataOutputStream(argsTypesBuff);
        DataOutputStream argsDataOut = new DataOutputStream(argsDataBuff);
        cmdOut.writeInt(1);
        cmdOut.writeInt(1);
        cmdOut.writeUTF(pyFunctionName);
        cmdOut.writeShort(args.length);
        for (Object a : args) {
            PythonUDFUtil.serializeArgData(argsTypesOut, argsDataOut, a);
        }
        long dataLength = cmdBuff.size() + argsTypesBuff.size() + argsDataBuff.size();
        out.writeLong(dataLength);
        out.write(cmdBuff.toByteArray());
        out.write(argsTypesBuff.toByteArray());
        out.write(argsDataBuff.toByteArray());
        out.flush();
    }

    public static void serializeArgData(DataOutputStream argsTypesOut, DataOutputStream argsDataOut, Object arg) throws IOException {
        if (arg == null) {
            argsTypesOut.writeByte(PythonSerDesTypes.NONE.ordinal());
        }
        if (arg instanceof String) {
            argsTypesOut.writeByte(PythonSerDesTypes.STRING.ordinal());
            byte[] bytes = ((String)arg).getBytes(Charset.forName("utf-8"));
            argsDataOut.writeInt(bytes.length);
            argsDataOut.write(bytes);
        } else if (arg instanceof Boolean) {
            argsTypesOut.writeByte(PythonSerDesTypes.BOOLEAN.ordinal());
            argsDataOut.writeBoolean((Boolean)arg);
        } else if (arg instanceof Short) {
            argsTypesOut.writeByte(PythonSerDesTypes.SHORT.ordinal());
            argsDataOut.writeShort(((Short)arg).shortValue());
        } else if (arg instanceof Byte) {
            argsTypesOut.writeByte(PythonSerDesTypes.BYTE.ordinal());
            argsDataOut.writeByte(((Byte)arg).byteValue());
        } else if (arg instanceof Integer) {
            argsTypesOut.writeByte(PythonSerDesTypes.INT.ordinal());
            argsDataOut.writeInt((Integer)arg);
        } else if (arg instanceof Long) {
            argsTypesOut.writeByte(PythonSerDesTypes.LONG.ordinal());
            argsDataOut.writeLong((Long)arg);
        } else if (arg instanceof Float) {
            argsTypesOut.writeByte(PythonSerDesTypes.FLOAT.ordinal());
            argsDataOut.writeFloat(((Float)arg).floatValue());
        } else if (arg instanceof Double) {
            argsTypesOut.writeByte(PythonSerDesTypes.DOUBLE.ordinal());
            argsDataOut.writeDouble((Double)arg);
        } else if (arg instanceof byte[]) {
            argsTypesOut.writeByte(PythonSerDesTypes.BYTES.ordinal());
            byte[] bytes = (byte[])arg;
            int len = bytes.length;
            argsDataOut.writeShort(len);
            argsDataOut.write(bytes);
        } else if (arg instanceof Date) {
            argsTypesOut.writeByte(PythonSerDesTypes.DATE.ordinal());
            Date date = (Date)arg;
            argsDataOut.writeInt(BuildInScalarFunctions.toInt(date));
        } else if (arg instanceof Time) {
            argsTypesOut.writeByte(PythonSerDesTypes.TIME.ordinal());
            Time time = (Time)arg;
            argsDataOut.writeInt(BuildInScalarFunctions.toInt(time));
        } else if (arg instanceof Timestamp) {
            argsTypesOut.writeByte(PythonSerDesTypes.TIMESTAMP.ordinal());
            Timestamp ts = (Timestamp)arg;
            argsDataOut.writeLong(ts.getTime());
        } else if (arg instanceof BigDecimal) {
            argsTypesOut.writeByte(PythonSerDesTypes.DECIMAL.ordinal());
            argsDataOut.writeUTF(arg.toString());
        }
    }

    public static Object getResult(DataInputStream in) throws IOException {
        byte resType = in.readByte();
        Object res = null;
        if (resType == PythonSerDesTypes.NONE.ordinal()) {
            res = null;
        } else if (resType == PythonSerDesTypes.STRING.ordinal()) {
            int len = in.readInt();
            byte[] utf8bytes = new byte[len];
            in.read(utf8bytes);
            res = BinaryString.fromBytes(utf8bytes);
        } else if (resType == PythonSerDesTypes.BOOLEAN.ordinal()) {
            res = in.readBoolean();
        } else if (resType == PythonSerDesTypes.SHORT.ordinal()) {
            res = in.readShort();
        } else if (resType == PythonSerDesTypes.BYTE.ordinal()) {
            res = in.readByte();
        } else if (resType == PythonSerDesTypes.INT.ordinal()) {
            res = in.readInt();
        } else if (resType == PythonSerDesTypes.LONG.ordinal()) {
            res = in.readLong();
        } else if (resType == PythonSerDesTypes.FLOAT.ordinal()) {
            res = Float.valueOf(in.readFloat());
        } else if (resType == PythonSerDesTypes.DOUBLE.ordinal()) {
            res = in.readDouble();
        } else if (resType == PythonSerDesTypes.BYTES.ordinal()) {
            int len = in.readUnsignedShort();
            byte[] bytes = new byte[len];
            in.read(bytes);
            res = bytes;
        } else if (resType == PythonSerDesTypes.DATE.ordinal()) {
            res = in.readInt();
        } else if (resType == PythonSerDesTypes.TIME.ordinal()) {
            res = in.readInt();
        } else if (resType == PythonSerDesTypes.TIMESTAMP.ordinal()) {
            res = in.readLong();
        } else if (resType == PythonSerDesTypes.DECIMAL.ordinal()) {
            String s = in.readUTF();
            res = new BigDecimal(s);
        }
        return res;
    }

    static {
        pyProcess = null;
        pySockPort = -1;
    }

    static class ShutDownPythonHook
    extends Thread {
        private Process p;
        private String pyFileDir;

        public ShutDownPythonHook(Process p, String pyFileDir) {
            this.p = p;
            this.pyFileDir = pyFileDir;
        }

        @Override
        public void run() {
            this.p.destroyForcibly();
            if (this.pyFileDir != null) {
                File pyDir = new File(this.pyFileDir);
                FileUtils.deleteDirectoryQuietly(pyDir);
            }
        }
    }

    static class RedirectThread
    extends Thread {
        InputStream in;
        OutputStream out;
        String tailMessage = "";

        public RedirectThread(InputStream in, OutputStream out) {
            this.setDaemon(true);
            this.in = in;
            this.out = out;
        }

        @Override
        public void run() {
            try {
                byte[] buf = new byte[65536];
                int len = this.in.read(buf);
                while (len != -1) {
                    this.tailMessage = new String(buf, 0, len, Charset.forName("utf-8"));
                    this.out.write(buf, 0, len);
                    this.out.flush();
                    len = this.in.read(buf);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    static enum PythonSerDesTypes {
        NONE,
        STRING,
        BOOLEAN,
        SHORT,
        BYTE,
        INT,
        LONG,
        FLOAT,
        DOUBLE,
        BYTES,
        DATE,
        TIME,
        TIMESTAMP,
        DECIMAL;

    }
}

