/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.avro.ipc.Server;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.NettyAvroRpcClient;
import org.apache.flume.api.RpcTestUtils;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestNettyAvroRpcClient {
    private static final Logger logger = LoggerFactory.getLogger(TestNettyAvroRpcClient.class);
    private static final String localhost = "127.0.0.1";

    @Test
    public void testOKServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler());
    }

    @Test
    public void testOKServerSimpleCompressionLevel6() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 6);
    }

    @Test
    public void testOKServerSimpleCompressionLevel0() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 0);
    }

    @Test(expected=EventDeliveryException.class)
    public void testOKServerSimpleCompressionClientOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), false, true, 6);
    }

    @Test(expected=EventDeliveryException.class)
    public void testOKServerSimpleCompressionServerOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.OKAvroHandler(), true, false, 6);
    }

    @Test
    public void testOKServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler());
    }

    @Test
    public void testOKServerBatchCompressionLevel0() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 0);
    }

    @Test
    public void testOKServerBatchCompressionLevel6() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, true, 6);
    }

    @Test(expected=EventDeliveryException.class)
    public void testOKServerBatchCompressionServerOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), true, false, 6);
    }

    @Test(expected=EventDeliveryException.class)
    public void testOKServerBatchCompressionClientOnly() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.OKAvroHandler(), false, true, 6);
    }

    @Test(expected=FlumeException.class)
    public void testUnableToConnect() throws FlumeException {
        NettyAvroRpcClient client = new NettyAvroRpcClient();
        Properties props = new Properties();
        props.setProperty("hosts", "localhost");
        props.setProperty("hosts.localhost", "127.0.0.1:1");
        client.configure(props);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBatchOverrun() throws FlumeException, EventDeliveryException {
        int batchSize = 10;
        int moreThanBatchSize = batchSize + 1;
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        Properties props = new Properties();
        props.setProperty("hosts", "localhost");
        props.setProperty("hosts.localhost", "127.0.0.1:" + server.getPort());
        props.setProperty("batch-size", "" + batchSize);
        try {
            client = new NettyAvroRpcClient();
            client.configure(props);
            ArrayList<Event> events = new ArrayList<Event>();
            for (int i = 0; i < moreThanBatchSize; ++i) {
                events.add(EventBuilder.withBody((String)("evt: " + i), (Charset)Charset.forName("UTF8")));
            }
            client.appendBatch(events);
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=EventDeliveryException.class)
    public void testServerDisconnect() throws FlumeException, EventDeliveryException, InterruptedException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        try {
            client = RpcTestUtils.getStockLocalClient(server.getPort());
            server.close();
            Thread.sleep(1000L);
            try {
                server.join();
            }
            catch (InterruptedException ex) {
                logger.warn("Thread interrupted during join()", (Throwable)ex);
                Thread.currentThread().interrupt();
            }
            try {
                client.append(EventBuilder.withBody((String)"hello", (Charset)Charset.forName("UTF8")));
            }
            finally {
                Assert.assertFalse((String)"Client should not be active", (boolean)client.isActive());
            }
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=EventDeliveryException.class)
    public void testClientClosedRequest() throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        try {
            client = RpcTestUtils.getStockLocalClient(server.getPort());
            client.close();
            Assert.assertFalse((String)"Client should not be active", (boolean)client.isActive());
            System.out.println("Yaya! I am not active after client close!");
            client.append(EventBuilder.withBody((String)"hello", (Charset)Charset.forName("UTF8")));
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    @Test(expected=EventDeliveryException.class)
    public void testFailedServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.FailedAvroHandler());
        logger.error("Failed: I should never have gotten here!");
    }

    @Test(expected=EventDeliveryException.class)
    public void testUnknownServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.UnknownAvroHandler());
        logger.error("Unknown: I should never have gotten here!");
    }

    @Test(expected=EventDeliveryException.class)
    public void testThrowingServerSimple() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerSimpleAppendTest(new RpcTestUtils.ThrowingAvroHandler());
        logger.error("Throwing: I should never have gotten here!");
    }

    @Test(expected=EventDeliveryException.class)
    public void testFailedServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.FailedAvroHandler());
        logger.error("Failed: I should never have gotten here!");
    }

    @Test(expected=EventDeliveryException.class)
    public void testUnknownServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.UnknownAvroHandler());
        logger.error("Unknown: I should never have gotten here!");
    }

    @Test(expected=EventDeliveryException.class)
    public void testThrowingServerBatch() throws FlumeException, EventDeliveryException {
        RpcTestUtils.handlerBatchAppendTest(new RpcTestUtils.ThrowingAvroHandler());
        logger.error("Throwing: I should never have gotten here!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAppendWithMaxIOWorkers() throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
        Properties props = new Properties();
        props.setProperty("hosts", "localhost");
        props.setProperty("hosts.localhost", "127.0.0.1:" + server.getPort());
        props.setProperty("maxIoWorkers", Integer.toString(2));
        try {
            client = new NettyAvroRpcClient();
            client.configure(props);
            for (int i = 0; i < 5; ++i) {
                client.append(EventBuilder.withBody((String)("evt:" + i), (Charset)Charset.forName("UTF8")));
            }
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAppendWithMaxIOWorkersSimpleCompressionLevel0() throws FlumeException, EventDeliveryException {
        NettyAvroRpcClient client = null;
        Server server = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler(), 0, true);
        Properties props = new Properties();
        props.setProperty("hosts", "localhost");
        props.setProperty("hosts.localhost", "127.0.0.1:" + server.getPort());
        props.setProperty("maxIoWorkers", Integer.toString(2));
        props.setProperty("compression-type", "deflate");
        props.setProperty("compression-level", "0");
        try {
            client = new NettyAvroRpcClient();
            client.configure(props);
            for (int i = 0; i < 5; ++i) {
                client.append(EventBuilder.withBody((String)("evt:" + i), (Charset)Charset.forName("UTF8")));
            }
        }
        finally {
            RpcTestUtils.stopServer(server);
            if (client != null) {
                client.close();
            }
        }
    }
}

