package cn.com.duiba.cat.message.io;

import cn.com.duiba.cat.Cat;
import cn.com.duiba.cat.exception.ResourceNotFoundException;
import cn.com.duiba.cat.message.internal.MessageIdFactory;
import cn.com.duiba.cat.model.configuration.ClientConfigService;
import cn.com.duiba.cat.model.configuration.DefaultClientConfigService;
import cn.com.duiba.cat.model.configuration.client.entity.Server;
import cn.com.duiba.cat.util.Pair;
import cn.com.duiba.cat.util.Threads;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/com/duiba/cat/message/io/ChannelManager.class */
public class ChannelManager implements Threads.Task {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
    private static ChannelManager instance = new ChannelManager();
    private Bootstrap bootstrap;
    private ChannelsHolder activeChannelHolder;
    private int reconnectCount;
    private ClientConfigService configService = DefaultClientConfigService.getInstance();
    private boolean active = true;
    private MessageIdFactory idFactory = MessageIdFactory.getInstance();
    private final Random random = new Random();

    /* loaded from: input_file:cn/com/duiba/cat/message/io/ChannelManager$ChannelEntity.class */
    public static class ChannelEntity {
        private final String ip;
        private final Server server;
        private final InetSocketAddress address;
        private ChannelFuture channelFuture;
        private final AtomicInteger stalledTimesCounter = new AtomicInteger(0);
        private final AtomicInteger attempts = new AtomicInteger(0);
        private boolean closeable = true;

        public ChannelEntity(String str, Server server, InetSocketAddress inetSocketAddress) {
            this.ip = str;
            this.server = server;
            this.address = inetSocketAddress;
        }

        public String getIp() {
            return this.ip;
        }

        public Server getServer() {
            return this.server;
        }

        public InetSocketAddress getAddress() {
            return this.address;
        }

        public AtomicInteger getStalledTimesCounter() {
            return this.stalledTimesCounter;
        }

        public AtomicInteger getAttempts() {
            return this.attempts;
        }

        public ChannelFuture getChannelFuture() {
            return this.channelFuture;
        }

        public void setChannelFuture(ChannelFuture channelFuture) {
            this.channelFuture = channelFuture;
        }

        public boolean isCloseable() {
            return this.closeable;
        }

        public void makeUnCloseable() {
            this.closeable = false;
        }
    }

    /* loaded from: input_file:cn/com/duiba/cat/message/io/ChannelManager$ChannelsHolder.class */
    public static class ChannelsHolder {
        private final Map<String, ChannelEntity> channelMap = new HashMap();
        private final List<String> activeIps = new ArrayList();
        private boolean connectChanged;

        public boolean isConnectChanged() {
            return this.connectChanged;
        }

        public void setConnectChanged(boolean z) {
            this.connectChanged = z;
        }

        public Server getServer(String str) {
            ChannelEntity channelEntity = this.channelMap.get(str);
            if (channelEntity == null) {
                throw new ResourceNotFoundException("Channel not found, ip={}", str);
            }
            return channelEntity.getServer();
        }

        public ChannelFuture getActiveChannelFuture(String str) {
            ChannelEntity channelEntity = this.channelMap.get(str);
            if (channelEntity == null) {
                return null;
            }
            return channelEntity.getChannelFuture();
        }

        public ChannelEntity getActiveChannelEntity(String str) {
            return this.channelMap.get(str);
        }

        public List<String> getActiveIps() {
            return this.activeIps;
        }

        public Collection<ChannelEntity> getChannels() {
            return this.channelMap.values();
        }

        public void addChannelEntity(ChannelEntity channelEntity) {
            this.channelMap.put(channelEntity.getIp(), channelEntity);
            this.activeIps.add(channelEntity.getIp());
        }

        public ChannelEntity getChannelEntity(String str) {
            ChannelEntity channelEntity = this.channelMap.get(str);
            if (channelEntity == null) {
                throw new ResourceNotFoundException("Channel not found, ip={}", str);
            }
            return channelEntity;
        }

        public boolean isEqualServers(Set<Server> set) {
            if (CollectionUtils.isEmpty(set)) {
                return this.channelMap.isEmpty();
            }
            if (set.size() != this.channelMap.size()) {
                return false;
            }
            Iterator<Server> it = set.iterator();
            while (it.hasNext()) {
                if (this.channelMap.get(it.next().getIp()) == null) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return this.activeIps.toString();
        }
    }

    private ChannelManager() {
        ChannelsHolder initChannel;
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            return thread;
        });
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.handler(new ChannelInitializer<Channel>() { // from class: cn.com.duiba.cat.message.io.ChannelManager.1
            protected void initChannel(Channel channel) {
            }
        });
        this.bootstrap = bootstrap;
        Set<Server> servers = this.configService.getServers();
        if (!CollectionUtils.isNotEmpty(servers) || (initChannel = initChannel(parseSocketAddress(servers), servers)) == null) {
            return;
        }
        this.activeChannelHolder = initChannel;
    }

    public static ChannelManager getInstance() {
        return instance;
    }

    private ChannelEntity determineChannelEntity() {
        List<String> activeIps = this.activeChannelHolder.getActiveIps();
        int size = activeIps.size();
        int i = 0;
        boolean z = true;
        int[] iArr = new int[size];
        for (int i2 = 0; i2 < size; i2++) {
            int weight = this.activeChannelHolder.getServer(activeIps.get(i2)).getWeight();
            iArr[i2] = weight;
            i += weight;
            if (z && i2 > 0 && weight != iArr[i2 - 1]) {
                z = false;
            }
        }
        if (i > 0 && !z) {
            int nextInt = this.random.nextInt(i);
            for (int i3 = 0; i3 < size; i3++) {
                nextInt -= iArr[i3];
                if (nextInt < 0) {
                    return this.activeChannelHolder.getChannelEntity(activeIps.get(i3));
                }
            }
        }
        return this.activeChannelHolder.getChannelEntity(activeIps.get(this.random.nextInt(size)));
    }

    public ChannelFuture channelFuture() {
        if (this.activeChannelHolder == null) {
            return null;
        }
        ChannelEntity determineChannelEntity = determineChannelEntity();
        if (checkWritable(determineChannelEntity)) {
            return determineChannelEntity.getChannelFuture();
        }
        return null;
    }

    private boolean checkActive(ChannelFuture channelFuture) {
        boolean z = false;
        if (channelFuture != null) {
            Channel channel = channelFuture.channel();
            if (channel.isActive() && channel.isOpen()) {
                z = true;
            } else {
                LOGGER.error("channel buf is not active ,current channel " + channelFuture.channel().remoteAddress());
            }
        }
        return z;
    }

    private void checkServerChanged() {
        Pair<Boolean, Set<Server>> serversChanged = serversChanged();
        if (serversChanged.getKey().booleanValue()) {
            LOGGER.info("server config changed :" + serversChanged.getValue());
            Set<Server> value = serversChanged.getValue();
            ChannelsHolder initChannel = initChannel(parseSocketAddress(value), value);
            if (initChannel == null) {
                ChannelsHolder channelsHolder = this.activeChannelHolder;
                this.activeChannelHolder = null;
                closeChannelHolder(channelsHolder);
                LOGGER.info("switch active channel to [none]");
                return;
            }
            if (!initChannel.isConnectChanged()) {
                this.activeChannelHolder = initChannel;
                return;
            }
            ChannelsHolder channelsHolder2 = this.activeChannelHolder;
            this.activeChannelHolder = initChannel;
            closeChannelHolder(channelsHolder2);
            LOGGER.info("switch active channel to " + this.activeChannelHolder);
        }
    }

    private boolean checkWritable(ChannelEntity channelEntity) {
        boolean z = false;
        if (channelEntity != null) {
            Channel channel = channelEntity.getChannelFuture().channel();
            if (!channel.isActive() || !channel.isOpen()) {
                int incrementAndGet = channelEntity.getAttempts().incrementAndGet();
                if (incrementAndGet % 1000 == 0 || incrementAndGet == 1) {
                    LOGGER.error("channel buf is is close when send msg! Attempts: " + incrementAndGet);
                }
            } else if (channel.isWritable()) {
                z = true;
            } else {
                channel.flush();
            }
        }
        return z;
    }

    private void closeChannelFuture(ChannelFuture channelFuture) {
        if (channelFuture != null) {
            SocketAddress remoteAddress = channelFuture.channel().remoteAddress();
            if (remoteAddress != null) {
                LOGGER.info("close channel " + remoteAddress);
            }
            channelFuture.channel().close();
        }
    }

    private void closeChannelHolder(ChannelsHolder channelsHolder) {
        if (channelsHolder == null) {
            return;
        }
        for (ChannelEntity channelEntity : channelsHolder.getChannels()) {
            if (channelEntity.isCloseable()) {
                closeChannelFuture(channelEntity.getChannelFuture());
            }
        }
    }

    private ChannelFuture createChannelFuture(InetSocketAddress inetSocketAddress) {
        LOGGER.info("start connect server" + inetSocketAddress.toString());
        try {
            ChannelFuture connect = this.bootstrap.connect(inetSocketAddress);
            connect.awaitUninterruptibly(this.configService.getClientConnectTimeout(), TimeUnit.MILLISECONDS);
            if (connect.isSuccess()) {
                LOGGER.info("Connected to CAT server at " + inetSocketAddress);
                return connect;
            }
            LOGGER.error("Error when try connecting to " + inetSocketAddress);
            closeChannelFuture(connect);
            return null;
        } catch (Throwable th) {
            LOGGER.error("Error when connect server " + inetSocketAddress.getAddress(), th);
            if (0 == 0) {
                return null;
            }
            closeChannelFuture(null);
            return null;
        }
    }

    @Override // cn.com.duiba.cat.util.Threads.Task
    public String getName() {
        return "netty-channel-health-check";
    }

    private ChannelsHolder initChannel(List<InetSocketAddress> list, Set<Server> set) {
        ChannelEntity activeChannelEntity;
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        ChannelsHolder channelsHolder = new ChannelsHolder();
        boolean z = false;
        Map map = (Map) set.stream().collect(Collectors.toMap((v0) -> {
            return v0.getIp();
        }, server -> {
            return server;
        }));
        for (InetSocketAddress inetSocketAddress : list) {
            String hostAddress = inetSocketAddress.getAddress().getHostAddress();
            ChannelEntity channelEntity = new ChannelEntity(hostAddress, (Server) map.get(hostAddress), inetSocketAddress);
            ChannelFuture channelFuture = null;
            if (this.activeChannelHolder != null && (activeChannelEntity = this.activeChannelHolder.getActiveChannelEntity(hostAddress)) != null) {
                channelFuture = activeChannelEntity.getChannelFuture();
                activeChannelEntity.makeUnCloseable();
            }
            if (channelFuture != null) {
                channelEntity.setChannelFuture(channelFuture);
            } else {
                ChannelFuture createChannelFuture = createChannelFuture(inetSocketAddress);
                if (createChannelFuture == null) {
                    LOGGER.error("createChannelFuture error, ip={}", hostAddress);
                } else {
                    channelEntity.setChannelFuture(createChannelFuture);
                    z = true;
                }
            }
            channelsHolder.addChannelEntity(channelEntity);
        }
        if (this.activeChannelHolder == null || (!z && this.activeChannelHolder.getActiveIps().size() != list.size())) {
            z = true;
        }
        channelsHolder.setConnectChanged(z);
        return channelsHolder;
    }

    private boolean isChannelStalled(ChannelEntity channelEntity) {
        boolean checkActive = checkActive(channelEntity.getChannelFuture());
        AtomicInteger stalledTimesCounter = channelEntity.getStalledTimesCounter();
        if (!checkActive) {
            return stalledTimesCounter.incrementAndGet() % 3 == 0;
        }
        if (stalledTimesCounter.get() <= 0) {
            return false;
        }
        stalledTimesCounter.decrementAndGet();
        return false;
    }

    private List<InetSocketAddress> parseSocketAddress(Set<Server> set) {
        if (CollectionUtils.isEmpty(set)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (Server server : set) {
            arrayList.add(new InetSocketAddress(server.getIp(), server.getPort()));
        }
        return arrayList;
    }

    private Pair<Boolean, Set<Server>> serversChanged() {
        Set<Server> servers = this.configService.getServers();
        return this.activeChannelHolder == null ? CollectionUtils.isEmpty(servers) ? new Pair<>(false, servers) : new Pair<>(true, servers) : this.activeChannelHolder.isEqualServers(servers) ? new Pair<>(false, servers) : new Pair<>(true, servers);
    }

    private void doubleCheckAndReconnectActiveServer(ChannelsHolder channelsHolder) {
        if (channelsHolder == null) {
            return;
        }
        for (ChannelEntity channelEntity : channelsHolder.getChannels()) {
            String ip = channelEntity.getIp();
            ChannelFuture channelFuture = channelEntity.getChannelFuture();
            if (isChannelStalled(channelEntity)) {
                closeChannelFuture(channelFuture);
                ChannelFuture createChannelFuture = createChannelFuture(channelEntity.getAddress());
                if (createChannelFuture == null) {
                    LOGGER.error("createChannelFuture error, ip={}", ip);
                } else {
                    channelEntity.setChannelFuture(createChannelFuture);
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.active) {
            try {
                if (Cat.isEnabled()) {
                    this.reconnectCount++;
                    if (this.reconnectCount % 10 == 0) {
                        this.idFactory.saveMark();
                        checkAndReconnectServer();
                    }
                }
            } catch (Exception e) {
                LOGGER.error("ChannelManager reconnect error", e);
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
            }
        }
    }

    public void checkAndReconnectServer() {
        checkServerChanged();
        doubleCheckAndReconnectActiveServer(this.activeChannelHolder);
    }

    @Override // cn.com.duiba.cat.util.Threads.Task
    public void shutdown() {
        this.active = false;
    }
}
