package org.apache.flink.python.api.streaming.data;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.PythonOptions;
import org.apache.flink.python.api.PythonPlanBinder;
import org.apache.flink.python.api.streaming.data.PythonSender;
import org.apache.flink.python.api.streaming.util.SerializationUtils;
import org.apache.flink.python.api.streaming.util.StreamPrinter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/python/api/streaming/data/PythonStreamer.class */
public class PythonStreamer<S extends PythonSender, OUT> implements Serializable {
    protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
    private static final long serialVersionUID = -2342256613658373170L;
    protected static final int SIGNAL_BUFFER_REQUEST = 0;
    protected static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
    protected static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
    protected static final int SIGNAL_FINISHED = -1;
    protected static final int SIGNAL_ERROR = -2;
    protected static final byte SIGNAL_LAST = 32;
    private final Configuration config;
    private final int envID;
    private final int setID;
    private transient Process process;
    private transient Thread shutdownThread;
    protected transient ServerSocket server;
    protected transient Socket socket;
    protected transient DataInputStream in;
    protected transient DataOutputStream out;
    protected int port;
    protected S sender;
    protected PythonReceiver<OUT> receiver;
    protected AtomicReference<String> msg = new AtomicReference<>();
    protected final AbstractRichFunction function;
    protected transient Thread outPrinter;
    protected transient Thread errorPrinter;

    public PythonStreamer(AbstractRichFunction abstractRichFunction, Configuration configuration, int i, int i2, boolean z, S s) {
        this.config = configuration;
        this.envID = i;
        this.setID = i2;
        this.receiver = new PythonReceiver<>(configuration, z);
        this.function = abstractRichFunction;
        this.sender = s;
    }

    public void open() throws IOException {
        this.server = new ServerSocket(0);
        this.server.setSoTimeout(50);
        startPython();
    }

    private void startPython() throws IOException {
        String string = this.config.getString(PythonOptions.DATA_TMP_DIR);
        if (string == null) {
            string = System.getProperty("java.io.tmpdir");
        }
        File file = new File(string, this.envID + Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + this.setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_output");
        File file2 = new File(string, this.envID + Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + this.setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_input)");
        this.sender.open(file2);
        this.receiver.open(file);
        String str = this.function.getRuntimeContext().getDistributedCache().getFile(PythonPlanBinder.FLINK_PYTHON_DC_ID).getAbsolutePath() + PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
        this.process = Runtime.getRuntime().exec(this.config.getString(PythonOptions.PYTHON_BINARY_PATH) + " -O -B " + str + this.config.getString(PythonPlanBinder.PLAN_ARGUMENTS_KEY, ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
        this.outPrinter = new Thread(new StreamPrinter(this.process.getInputStream()));
        this.outPrinter.start();
        this.errorPrinter = new Thread(new StreamPrinter(this.process.getErrorStream(), this.msg));
        this.errorPrinter.start();
        this.shutdownThread = ShutdownHookUtil.addShutdownHook(() -> {
            destroyProcess(this.process);
        }, getClass().getSimpleName(), LOG);
        OutputStream outputStream = this.process.getOutputStream();
        outputStream.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((this.envID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((this.setID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER + this.server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write(((this.config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10) + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((file2 + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.write((file + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
        outputStream.flush();
        while (true) {
            try {
                this.socket = this.server.accept();
                this.in = new DataInputStream(this.socket.getInputStream());
                this.out = new DataOutputStream(this.socket.getOutputStream());
                return;
            } catch (SocketTimeoutException e) {
                checkPythonProcessHealth();
            }
        }
    }

    private void checkPythonProcessHealth() {
        try {
            int exitValue = this.process.exitValue();
            try {
                this.outPrinter.join();
            } catch (InterruptedException e) {
                this.outPrinter.interrupt();
                Thread.interrupted();
            }
            try {
                this.errorPrinter.join();
            } catch (InterruptedException e2) {
                this.errorPrinter.interrupt();
                Thread.interrupted();
            }
            if (exitValue == 0) {
                throw new RuntimeException("Plan file exited prematurely without an error." + this.msg.get());
            }
            throw new RuntimeException("Plan file caused an error. Check log-files for details." + this.msg.get());
        } catch (IllegalThreadStateException e3) {
        }
    }

    public void close() throws IOException {
        Throwable th = null;
        try {
            this.socket.close();
            this.sender.close();
            this.receiver.close();
        } catch (Throwable th2) {
            th = th2;
        }
        try {
            destroyProcess(this.process);
        } catch (Throwable th3) {
            th = ExceptionUtils.firstOrSuppressed(th3, th);
        }
        ShutdownHookUtil.removeShutdownHook(this.shutdownThread, getClass().getSimpleName(), LOG);
        ExceptionUtils.tryRethrowIOException(th);
    }

    public static void destroyProcess(Process process) throws IOException {
        try {
            process.exitValue();
        } catch (IllegalThreadStateException e) {
            if (!process.getClass().getName().equals("java.lang.UNIXProcess")) {
                process.destroy();
                return;
            }
            try {
                Field declaredField = process.getClass().getDeclaredField("pid");
                declaredField.setAccessible(true);
                Runtime.getRuntime().exec(new String[]{"kill", "-9", String.valueOf(declaredField.getInt(process))});
            } catch (Throwable th) {
                process.destroy();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendWriteNotification(int i, boolean z) throws IOException {
        this.out.writeInt(i);
        this.out.writeByte(z ? 0 : 32);
        this.out.flush();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendReadConfirmation() throws IOException {
        this.out.writeByte(1);
        this.out.flush();
    }

    public final void sendBroadCastVariables(Configuration configuration) throws IOException {
        try {
            int integer = configuration.getInteger(PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT, 0);
            String[] strArr = new String[integer];
            for (int i = 0; i < strArr.length; i++) {
                strArr[i] = configuration.getString(PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + i, (String) null);
            }
            this.out.write(new SerializationUtils.IntSerializer().serializeWithoutTypeInfo(Integer.valueOf(integer)));
            SerializationUtils.StringSerializer stringSerializer = new SerializationUtils.StringSerializer();
            for (String str : strArr) {
                Iterator it = this.function.getRuntimeContext().getBroadcastVariable(str).iterator();
                this.out.write(stringSerializer.serializeWithoutTypeInfo(str));
                while (it.hasNext()) {
                    this.out.writeByte(1);
                    this.out.write((byte[]) it.next());
                }
                this.out.writeByte(0);
            }
        } catch (SocketTimeoutException e) {
            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
        }
    }
}
