/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.data;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.data.PythonReceiver;
import org.apache.flink.python.api.streaming.data.PythonSender;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PythonStreamer<S extends PythonSender, OUT>
implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
    private static final long serialVersionUID = -2342256613658373170L;
    protected static final int SIGNAL_BUFFER_REQUEST = 0;
    protected static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    protected static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    protected static final int SIGNAL_FINISHED = -1;
    protected static final int SIGNAL_ERROR = -2;
    protected static final byte SIGNAL_LAST = 32;
    private final Configuration config;
    private final int envID;
    private final int setID;
    private transient Process process;
    private transient Thread shutdownThread;
    protected transient ServerSocket server;
    protected transient Socket socket;
    protected transient DataInputStream in;
    protected transient DataOutputStream out;
    protected int port;
    protected S sender;
    protected PythonReceiver<OUT> receiver;
    protected AtomicReference<String> msg = new AtomicReference();
    protected final AbstractRichFunction function;
    protected transient Thread outPrinter;
    protected transient Thread errorPrinter;

    public PythonStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray, S sender) {
        this.config = config;
        this.envID = envID;
        this.setID = setID;
        this.receiver = new PythonReceiver(config, usesByteArray);
        this.function = function;
        this.sender = sender;
    }

    public void open() throws IOException {
        this.server = new ServerSocket(0);
        this.server.setSoTimeout(50);
        this.startPython();
    }

    private void startPython() throws IOException {
        String tmpDir = this.config.getString(PythonOptions.DATA_TMP_DIR);
        if (tmpDir == null) {
            tmpDir = System.getProperty("java.io.tmpdir");
        }
        File outputFile = new File(tmpDir, this.envID + "_" + this.setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_output");
        File inputFile = new File(tmpDir, this.envID + "_" + this.setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_input)");
        ((PythonSender)this.sender).open(inputFile);
        this.receiver.open(outputFile);
        String path = this.function.getRuntimeContext().getDistributedCache().getFile("flink").getAbsolutePath();
        String planPath = path + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
        String pythonBinaryPath = this.config.getString(PythonOptions.PYTHON_BINARY_PATH);
        String arguments = this.config.getString("python.plan.arguments", "");
        this.process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + arguments);
        this.outPrinter = new Thread(new StreamPrinter(this.process.getInputStream()));
        this.outPrinter.start();
        this.errorPrinter = new Thread(new StreamPrinter(this.process.getErrorStream(), this.msg));
        this.errorPrinter.start();
        this.shutdownThread = ShutdownHookUtil.addShutdownHook(() -> PythonStreamer.destroyProcess(this.process), this.getClass().getSimpleName(), LOG);
        OutputStream processOutput = this.process.getOutputStream();
        processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write((this.envID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write((this.setID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write(("" + this.server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write(((this.config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10) + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write((inputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.write((outputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        processOutput.flush();
        while (true) {
            try {
                this.socket = this.server.accept();
            }
            catch (SocketTimeoutException ignored) {
                this.checkPythonProcessHealth();
                continue;
            }
            break;
        }
        this.in = new DataInputStream(this.socket.getInputStream());
        this.out = new DataOutputStream(this.socket.getOutputStream());
    }

    private void checkPythonProcessHealth() {
        try {
            int value = this.process.exitValue();
            try {
                this.outPrinter.join();
            }
            catch (InterruptedException ignored) {
                this.outPrinter.interrupt();
                Thread.interrupted();
            }
            try {
                this.errorPrinter.join();
            }
            catch (InterruptedException ignored) {
                this.errorPrinter.interrupt();
                Thread.interrupted();
            }
            if (value != 0) {
                throw new RuntimeException("Plan file caused an error. Check log-files for details." + this.msg.get());
            }
            throw new RuntimeException("Plan file exited prematurely without an error." + this.msg.get());
        }
        catch (IllegalThreadStateException illegalThreadStateException) {
            return;
        }
    }

    public void close() throws IOException {
        Throwable throwable = null;
        try {
            this.socket.close();
            ((PythonSender)this.sender).close();
            this.receiver.close();
        }
        catch (Throwable t) {
            throwable = t;
        }
        try {
            PythonStreamer.destroyProcess(this.process);
        }
        catch (Throwable t) {
            throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
        }
        ShutdownHookUtil.removeShutdownHook(this.shutdownThread, this.getClass().getSimpleName(), LOG);
        ExceptionUtils.tryRethrowIOException(throwable);
    }

    public static void destroyProcess(Process process2) throws IOException {
        try {
            process2.exitValue();
        }
        catch (IllegalThreadStateException ignored) {
            if (process2.getClass().getName().equals("java.lang.UNIXProcess")) {
                int pid;
                try {
                    Field f = process2.getClass().getDeclaredField("pid");
                    f.setAccessible(true);
                    pid = f.getInt(process2);
                }
                catch (Throwable ignore) {
                    process2.destroy();
                    return;
                }
                String[] args = new String[]{"kill", "-9", String.valueOf(pid)};
                Runtime.getRuntime().exec(args);
            }
            process2.destroy();
        }
    }

    protected void sendWriteNotification(int size, boolean hasNext) throws IOException {
        this.out.writeInt(size);
        this.out.writeByte(hasNext ? 0 : 32);
        this.out.flush();
    }

    protected void sendReadConfirmation() throws IOException {
        this.out.writeByte(1);
        this.out.flush();
    }

    public final void sendBroadCastVariables(Configuration config) throws IOException {
        try {
            int broadcastCount = config.getInteger("PLANBINDER_BCVAR_COUNT", 0);
            String[] names = new String[broadcastCount];
            for (int x = 0; x < names.length; ++x) {
                names[x] = config.getString("PLANBINDER_BCVAR_" + x, null);
            }
            this.out.write(new SerializationUtils.IntSerializer().serializeWithoutTypeInfo(broadcastCount));
            SerializationUtils.StringSerializer stringSerializer = new SerializationUtils.StringSerializer();
            for (String name : names) {
                Iterator bcv = this.function.getRuntimeContext().getBroadcastVariable(name).iterator();
                this.out.write(stringSerializer.serializeWithoutTypeInfo(name));
                while (bcv.hasNext()) {
                    this.out.writeByte(1);
                    this.out.write((byte[])bcv.next());
                }
                this.out.writeByte(0);
            }
        }
        catch (SocketTimeoutException ignored) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
        }
    }
}

