/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.python.api.streaming.data;

import java.net.SocketTimeoutException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.api.streaming.data.PythonDualInputSender;
import org.apache.flink.python.api.streaming.data.PythonStreamer;
import org.apache.flink.python.api.streaming.data.SingleElementPushBackIterator;
import org.apache.flink.util.Collector;

public class PythonDualInputStreamer<IN1, IN2, OUT>
extends PythonStreamer<PythonDualInputSender<IN1, IN2>, OUT> {
    private static final long serialVersionUID = -607175070491761873L;

    public PythonDualInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) {
        super(function, config, envID, setID, usesByteArray, new PythonDualInputSender(config));
    }

    public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2> iterator2, Collector<OUT> c) {
        block14: {
            SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<IN1>(iterator1);
            SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<IN2>(iterator2);
            try {
                if (!i1.hasNext() && !i2.hasNext()) break block14;
                block13: while (true) {
                    int sig = this.in.readInt();
                    switch (sig) {
                        case -3: {
                            if (!i1.hasNext()) continue block13;
                            int size = ((PythonDualInputSender)this.sender).sendBuffer1(i1);
                            this.sendWriteNotification(size, i1.hasNext());
                            continue block13;
                        }
                        case -4: {
                            if (!i2.hasNext()) continue block13;
                            int size = ((PythonDualInputSender)this.sender).sendBuffer2(i2);
                            this.sendWriteNotification(size, i2.hasNext());
                            continue block13;
                        }
                        case -1: {
                            return;
                        }
                        case -2: {
                            try {
                                this.outPrinter.join();
                            }
                            catch (InterruptedException e2) {
                                this.outPrinter.interrupt();
                            }
                            try {
                                this.errorPrinter.join();
                            }
                            catch (InterruptedException e3) {
                                this.errorPrinter.interrupt();
                            }
                            throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + this.msg);
                        }
                    }
                    this.receiver.collectBuffer(c, sig);
                    this.sendReadConfirmation();
                }
            }
            catch (SocketTimeoutException ignored) {
                throw new RuntimeException("External process for task " + this.function.getRuntimeContext().getTaskName() + " stopped responding." + this.msg);
            }
            catch (Exception e4) {
                throw new RuntimeException("Critical failure for task " + this.function.getRuntimeContext().getTaskName() + ". " + (String)this.msg.get(), e4);
            }
        }
    }
}

