/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import junit.framework.Assert;
import org.apache.avro.ipc.Server;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.LoadBalancingRpcClient;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.api.RpcTestUtils;
import org.apache.flume.event.EventBuilder;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLoadBalancingRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestLoadBalancingRpcClient.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=FlumeException.class)
    public void testCreatingLbClientSingleHost() {
        Server server1 = null;
        Object c = null;
        try {
            server1 = RpcTestUtils.startServer(new RpcTestUtils.OKAvroHandler());
            Properties p = new Properties();
            p.put("host1", "127.0.0.1:" + server1.getPort());
            p.put("hosts", "host1");
            p.put("client.type", "default_loadbalance");
            RpcClientFactory.getInstance((Properties)p);
        }
        finally {
            if (server1 != null) {
                server1.close();
            }
            if (c != null) {
                c.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTwoHostFailover() throws Exception {
        Server s1 = null;
        Server s2 = null;
        RpcClient c = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler h2 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            s2 = RpcTestUtils.startServer(h2);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
            p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (int i = 0; i < 100; ++i) {
                if (i == 20) {
                    h2.setFailed();
                } else if (i == 40) {
                    h2.setOK();
                }
                c.append(this.getEvent(i));
            }
            Assert.assertEquals((int)60, (int)h1.getAppendCount());
            Assert.assertEquals((int)40, (int)h2.getAppendCount());
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (s2 != null) {
                s2.close();
            }
            if (c != null) {
                c.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=EventDeliveryException.class)
    public void testTwoHostFailoverThrowAfterClose() throws Exception {
        Server s1 = null;
        Server s2 = null;
        RpcClient c = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler h2 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            s2 = RpcTestUtils.startServer(h2);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
            p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (int i = 0; i < 100; ++i) {
                if (i == 20) {
                    h2.setFailed();
                } else if (i == 40) {
                    h2.setOK();
                }
                c.append(this.getEvent(i));
            }
            Assert.assertEquals((int)60, (int)h1.getAppendCount());
            Assert.assertEquals((int)40, (int)h2.getAppendCount());
            if (c != null) {
                c.close();
            }
            c.append(this.getEvent(3));
            Assert.fail();
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (s2 != null) {
                s2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTwoHostsOneDead() throws Exception {
        LOGGER.info("Running testTwoHostsOneDead...");
        Server s1 = null;
        RpcClient c1 = null;
        RpcClient c2 = null;
        try {
            int i;
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:0");
            p.put("hosts.h2", "127.0.0.1:" + s1.getPort());
            c1 = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c1 instanceof LoadBalancingRpcClient));
            for (i = 0; i < 10; ++i) {
                c1.appendBatch(this.getBatchedEvent(i));
            }
            Assert.assertEquals((int)10, (int)h1.getAppendBatchCount());
            c2 = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c2 instanceof LoadBalancingRpcClient));
            for (i = 0; i < 10; ++i) {
                c2.append(this.getEvent(i));
            }
            Assert.assertEquals((int)10, (int)h1.getAppendCount());
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (c1 != null) {
                c1.close();
            }
            if (c2 != null) {
                c2.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTwoHostFailoverBatch() throws Exception {
        Server s1 = null;
        Server s2 = null;
        RpcClient c = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler h2 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            s2 = RpcTestUtils.startServer(h2);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
            p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (int i = 0; i < 100; ++i) {
                if (i == 20) {
                    h2.setFailed();
                } else if (i == 40) {
                    h2.setOK();
                }
                c.appendBatch(this.getBatchedEvent(i));
            }
            Assert.assertEquals((int)60, (int)h1.getAppendBatchCount());
            Assert.assertEquals((int)40, (int)h2.getAppendBatchCount());
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (s2 != null) {
                s2.close();
            }
            if (c != null) {
                c.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbDefaultClientTwoHosts() throws Exception {
        Server s1 = null;
        Server s2 = null;
        RpcClient c = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler h2 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            s2 = RpcTestUtils.startServer(h2);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
            p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (int i = 0; i < 100; ++i) {
                c.append(this.getEvent(i));
            }
            Assert.assertEquals((int)50, (int)h1.getAppendCount());
            Assert.assertEquals((int)50, (int)h2.getAppendCount());
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (s2 != null) {
                s2.close();
            }
            if (c != null) {
                c.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbDefaultClientTwoHostsBatch() throws Exception {
        Server s1 = null;
        Server s2 = null;
        RpcClient c = null;
        try {
            RpcTestUtils.LoadBalancedAvroHandler h1 = new RpcTestUtils.LoadBalancedAvroHandler();
            RpcTestUtils.LoadBalancedAvroHandler h2 = new RpcTestUtils.LoadBalancedAvroHandler();
            s1 = RpcTestUtils.startServer(h1);
            s2 = RpcTestUtils.startServer(h2);
            Properties p = new Properties();
            p.put("hosts", "h1 h2");
            p.put("client.type", "default_loadbalance");
            p.put("hosts.h1", "127.0.0.1:" + s1.getPort());
            p.put("hosts.h2", "127.0.0.1:" + s2.getPort());
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (int i = 0; i < 100; ++i) {
                c.appendBatch(this.getBatchedEvent(i));
            }
            Assert.assertEquals((int)50, (int)h1.getAppendBatchCount());
            Assert.assertEquals((int)50, (int)h2.getAppendBatchCount());
        }
        finally {
            if (s1 != null) {
                s1.close();
            }
            if (s2 != null) {
                s2.close();
            }
            if (c != null) {
                c.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbClientTenHostRandomDistribution() throws Exception {
        int NUM_HOSTS = 10;
        int NUM_EVENTS = 1000;
        Server[] s = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] h = new RpcTestUtils.LoadBalancedAvroHandler[10];
        RpcClient c = null;
        try {
            int i;
            Properties p = new Properties();
            StringBuilder hostList = new StringBuilder("");
            for (i = 0; i < 10; ++i) {
                h[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                s[i] = RpcTestUtils.startServer(h[i]);
                String name = "h" + i;
                p.put("hosts." + name, "127.0.0.1:" + s[i].getPort());
                hostList.append(name).append(" ");
            }
            p.put("hosts", hostList.toString().trim());
            p.put("client.type", "default_loadbalance");
            p.put("host-selector", "random");
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (i = 0; i < 1000; ++i) {
                c.append(this.getEvent(i));
            }
            HashSet<Integer> counts = new HashSet<Integer>();
            int total = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler handler : h) {
                total += handler.getAppendCount();
                counts.add(handler.getAppendCount());
            }
            Assert.assertTrue((String)"Very unusual distribution", (counts.size() > 2 ? 1 : 0) != 0);
            Assert.assertTrue((String)"Missing events", (total == 1000 ? 1 : 0) != 0);
        }
        finally {
            for (int i = 0; i < 10; ++i) {
                if (s[i] == null) continue;
                s[i].close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbClientTenHostRandomDistributionBatch() throws Exception {
        int NUM_HOSTS = 10;
        int NUM_EVENTS = 1000;
        Server[] s = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] h = new RpcTestUtils.LoadBalancedAvroHandler[10];
        RpcClient c = null;
        try {
            int i;
            Properties p = new Properties();
            StringBuilder hostList = new StringBuilder("");
            for (i = 0; i < 10; ++i) {
                h[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                s[i] = RpcTestUtils.startServer(h[i]);
                String name = "h" + i;
                p.put("hosts." + name, "127.0.0.1:" + s[i].getPort());
                hostList.append(name).append(" ");
            }
            p.put("hosts", hostList.toString().trim());
            p.put("client.type", "default_loadbalance");
            p.put("host-selector", "random");
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (i = 0; i < 1000; ++i) {
                c.appendBatch(this.getBatchedEvent(i));
            }
            HashSet<Integer> counts = new HashSet<Integer>();
            int total = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler handler : h) {
                total += handler.getAppendBatchCount();
                counts.add(handler.getAppendBatchCount());
            }
            Assert.assertTrue((String)"Very unusual distribution", (counts.size() > 2 ? 1 : 0) != 0);
            Assert.assertTrue((String)"Missing events", (total == 1000 ? 1 : 0) != 0);
        }
        finally {
            for (int i = 0; i < 10; ++i) {
                if (s[i] == null) continue;
                s[i].close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbClientTenHostRoundRobinDistribution() throws Exception {
        int NUM_HOSTS = 10;
        int NUM_EVENTS = 1000;
        Server[] s = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] h = new RpcTestUtils.LoadBalancedAvroHandler[10];
        RpcClient c = null;
        try {
            int i;
            Properties p = new Properties();
            StringBuilder hostList = new StringBuilder("");
            for (i = 0; i < 10; ++i) {
                h[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                s[i] = RpcTestUtils.startServer(h[i]);
                String name = "h" + i;
                p.put("hosts." + name, "127.0.0.1:" + s[i].getPort());
                hostList.append(name).append(" ");
            }
            p.put("hosts", hostList.toString().trim());
            p.put("client.type", "default_loadbalance");
            p.put("host-selector", "round_robin");
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (i = 0; i < 1000; ++i) {
                c.append(this.getEvent(i));
            }
            HashSet<Integer> counts = new HashSet<Integer>();
            int total = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler handler : h) {
                total += handler.getAppendCount();
                counts.add(handler.getAppendCount());
            }
            Assert.assertTrue((String)"Very unusual distribution", (counts.size() == 1 ? 1 : 0) != 0);
            Assert.assertTrue((String)"Missing events", (total == 1000 ? 1 : 0) != 0);
        }
        finally {
            for (int i = 0; i < 10; ++i) {
                if (s[i] == null) continue;
                s[i].close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLbClientTenHostRoundRobinDistributionBatch() throws Exception {
        int NUM_HOSTS = 10;
        int NUM_EVENTS = 1000;
        Server[] s = new Server[10];
        RpcTestUtils.LoadBalancedAvroHandler[] h = new RpcTestUtils.LoadBalancedAvroHandler[10];
        RpcClient c = null;
        try {
            int i;
            Properties p = new Properties();
            StringBuilder hostList = new StringBuilder("");
            for (i = 0; i < 10; ++i) {
                h[i] = new RpcTestUtils.LoadBalancedAvroHandler();
                s[i] = RpcTestUtils.startServer(h[i]);
                String name = "h" + i;
                p.put("hosts." + name, "127.0.0.1:" + s[i].getPort());
                hostList.append(name).append(" ");
            }
            p.put("hosts", hostList.toString().trim());
            p.put("client.type", "default_loadbalance");
            p.put("host-selector", "round_robin");
            c = RpcClientFactory.getInstance((Properties)p);
            Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
            for (i = 0; i < 1000; ++i) {
                c.appendBatch(this.getBatchedEvent(i));
            }
            HashSet<Integer> counts = new HashSet<Integer>();
            int total = 0;
            for (RpcTestUtils.LoadBalancedAvroHandler handler : h) {
                total += handler.getAppendBatchCount();
                counts.add(handler.getAppendBatchCount());
            }
            Assert.assertTrue((String)"Very unusual distribution", (counts.size() == 1 ? 1 : 0) != 0);
            Assert.assertTrue((String)"Missing events", (total == 1000 ? 1 : 0) != 0);
        }
        finally {
            for (int i = 0; i < 10; ++i) {
                if (s[i] == null) continue;
                s[i].close();
            }
        }
    }

    @Test
    public void testRandomBackoff() throws Exception {
        Properties p = new Properties();
        ArrayList<RpcTestUtils.LoadBalancedAvroHandler> hosts = new ArrayList<RpcTestUtils.LoadBalancedAvroHandler>();
        ArrayList<Server> servers = new ArrayList<Server>();
        StringBuilder hostList = new StringBuilder("");
        for (int i = 0; i < 3; ++i) {
            RpcTestUtils.LoadBalancedAvroHandler s = new RpcTestUtils.LoadBalancedAvroHandler();
            hosts.add(s);
            Server srv = RpcTestUtils.startServer(s);
            servers.add(srv);
            String name = "h" + i;
            p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
            hostList.append(name).append(" ");
        }
        p.put("hosts", hostList.toString().trim());
        p.put("client.type", "default_loadbalance");
        p.put("host-selector", "random");
        p.put("backoff", "true");
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).setFailed();
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).setFailed();
        RpcClient c = RpcClientFactory.getInstance((Properties)p);
        Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
        for (int i = 0; i < 50; ++i) {
            c.append(EventBuilder.withBody((byte[])("test" + String.valueOf(i)).getBytes()));
        }
        Assert.assertEquals((int)50, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).getAppendCount());
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).getAppendCount());
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).setOK();
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).setFailed();
        try {
            c.append(EventBuilder.withBody((byte[])"shouldfail".getBytes()));
            Assert.fail((String)"Expected EventDeliveryException");
        }
        catch (EventDeliveryException i) {
            // empty catch block
        }
        Thread.sleep(2500L);
        for (int i = 0; i < 50; ++i) {
            c.append(EventBuilder.withBody((byte[])("test" + String.valueOf(i)).getBytes()));
        }
        Assert.assertEquals((int)50, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).getAppendCount());
        Assert.assertEquals((int)50, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException {
        int i;
        Properties p = new Properties();
        ArrayList<RpcTestUtils.LoadBalancedAvroHandler> hosts = new ArrayList<RpcTestUtils.LoadBalancedAvroHandler>();
        ArrayList<Server> servers = new ArrayList<Server>();
        StringBuilder hostList = new StringBuilder("");
        for (int i2 = 0; i2 < 3; ++i2) {
            RpcTestUtils.LoadBalancedAvroHandler s = new RpcTestUtils.LoadBalancedAvroHandler();
            hosts.add(s);
            Server srv = RpcTestUtils.startServer(s);
            servers.add(srv);
            String name = "h" + i2;
            p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
            hostList.append(name).append(" ");
        }
        p.put("hosts", hostList.toString().trim());
        p.put("client.type", "default_loadbalance");
        p.put("host-selector", "round_robin");
        p.put("backoff", "true");
        RpcClient c = RpcClientFactory.getInstance((Properties)p);
        Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).setFailed();
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).setOK();
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)4, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).getAppendCount());
        Assert.assertEquals((int)1, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Assert.assertEquals((int)4, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffIncreasingBackoffs() throws Exception {
        int i;
        Properties p = new Properties();
        ArrayList<RpcTestUtils.LoadBalancedAvroHandler> hosts = new ArrayList<RpcTestUtils.LoadBalancedAvroHandler>();
        ArrayList<Server> servers = new ArrayList<Server>();
        StringBuilder hostList = new StringBuilder("");
        for (int i2 = 0; i2 < 3; ++i2) {
            RpcTestUtils.LoadBalancedAvroHandler s = new RpcTestUtils.LoadBalancedAvroHandler();
            hosts.add(s);
            if (i2 == 1) {
                s.setFailed();
            }
            Server srv = RpcTestUtils.startServer(s);
            servers.add(srv);
            String name = "h" + i2;
            p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
            hostList.append(name).append(" ");
        }
        p.put("hosts", hostList.toString().trim());
        p.put("client.type", "default_loadbalance");
        p.put("host-selector", "round_robin");
        p.put("backoff", "true");
        RpcClient c = RpcClientFactory.getInstance((Properties)p);
        Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Thread.sleep(2100L);
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).setOK();
        Thread.sleep(2100L);
        for (i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)0, (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Thread.sleep(2500L);
        int numEvents = 60;
        for (int i3 = 0; i3 < numEvents; ++i3) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)(5 + numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).getAppendCount());
        Assert.assertEquals((int)(numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Assert.assertEquals((int)(4 + numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).getAppendCount());
    }

    @Test
    public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException {
        Properties p = new Properties();
        ArrayList<RpcTestUtils.LoadBalancedAvroHandler> hosts = new ArrayList<RpcTestUtils.LoadBalancedAvroHandler>();
        ArrayList<Server> servers = new ArrayList<Server>();
        StringBuilder hostList = new StringBuilder("");
        for (int i = 0; i < 3; ++i) {
            RpcTestUtils.LoadBalancedAvroHandler s = new RpcTestUtils.LoadBalancedAvroHandler();
            hosts.add(s);
            if (i == 1) {
                s.setFailed();
            }
            Server srv = RpcTestUtils.startServer(s);
            servers.add(srv);
            String name = "h" + i;
            p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
            hostList.append(name).append(" ");
        }
        p.put("hosts", hostList.toString().trim());
        p.put("client.type", "default_loadbalance");
        p.put("host-selector", "round_robin");
        p.put("backoff", "true");
        RpcClient c = RpcClientFactory.getInstance((Properties)p);
        Assert.assertTrue((boolean)(c instanceof LoadBalancingRpcClient));
        for (int i = 0; i < 3; ++i) {
            c.append(EventBuilder.withBody((byte[])"recovery test".getBytes()));
        }
        ((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).setOK();
        Thread.sleep(3000L);
        int numEvents = 60;
        for (int i = 0; i < numEvents; ++i) {
            c.append(EventBuilder.withBody((byte[])"testing".getBytes()));
        }
        Assert.assertEquals((int)(2 + numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(0)).getAppendCount());
        Assert.assertEquals((int)(0 + numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(1)).getAppendCount());
        Assert.assertEquals((int)(1 + numEvents / 3), (int)((RpcTestUtils.LoadBalancedAvroHandler)hosts.get(2)).getAppendCount());
    }

    private List<Event> getBatchedEvent(int index) {
        ArrayList<Event> result = new ArrayList<Event>();
        result.add(this.getEvent(index));
        return result;
    }

    private Event getEvent(int index) {
        return EventBuilder.withBody((byte[])("event: " + index).getBytes());
    }
}

