/*
 * Decompiled with CFR 0.152.
 */
package cn.com.duiba.cat.message.io;

import cn.com.duiba.cat.Cat;
import cn.com.duiba.cat.exception.ResourceNotFoundException;
import cn.com.duiba.cat.message.internal.MessageIdFactory;
import cn.com.duiba.cat.model.configuration.ClientConfigService;
import cn.com.duiba.cat.model.configuration.DefaultClientConfigService;
import cn.com.duiba.cat.model.configuration.client.entity.Server;
import cn.com.duiba.cat.util.Pair;
import cn.com.duiba.cat.util.Threads;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelManager
implements Threads.Task {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
    private static ChannelManager instance = new ChannelManager();
    private ClientConfigService configService = DefaultClientConfigService.getInstance();
    private Bootstrap bootstrap;
    private boolean active = true;
    private ChannelsHolder activeChannelHolder;
    private MessageIdFactory idFactory = MessageIdFactory.getInstance();
    private int reconnectCount;
    private final Random random = new Random();

    private ChannelManager() {
        List<InetSocketAddress> configuredAddresses;
        ChannelsHolder holder;
        NioEventLoopGroup group = new NioEventLoopGroup(1, r -> {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        });
        Bootstrap bootstrap = new Bootstrap();
        ((Bootstrap)bootstrap.group((EventLoopGroup)group)).channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.handler((ChannelHandler)new ChannelInitializer<Channel>(){

            protected void initChannel(Channel ch) {
            }
        });
        this.bootstrap = bootstrap;
        Set<Server> servers = this.configService.getServers();
        if (CollectionUtils.isNotEmpty(servers) && (holder = this.initChannel(configuredAddresses = this.parseSocketAddress(servers), servers)) != null) {
            this.activeChannelHolder = holder;
        }
    }

    public static ChannelManager getInstance() {
        return instance;
    }

    private ChannelEntity determineChannelEntity() {
        List<String> activeIps = this.activeChannelHolder.getActiveIps();
        int length = activeIps.size();
        int totalWeight = 0;
        boolean sameWeight = true;
        int[] weights = new int[length];
        for (int i = 0; i < length; ++i) {
            int weight;
            String ip = activeIps.get(i);
            Server server = this.activeChannelHolder.getServer(ip);
            weights[i] = weight = server.getWeight();
            totalWeight += weight;
            if (!sameWeight || i <= 0 || weight == weights[i - 1]) continue;
            sameWeight = false;
        }
        if (totalWeight > 0 && !sameWeight) {
            int offset = this.random.nextInt(totalWeight);
            for (int i = 0; i < length; ++i) {
                if ((offset -= weights[i]) >= 0) continue;
                String ip = activeIps.get(i);
                return this.activeChannelHolder.getChannelEntity(ip);
            }
        }
        String ip = activeIps.get(this.random.nextInt(length));
        return this.activeChannelHolder.getChannelEntity(ip);
    }

    public ChannelFuture channelFuture() {
        ChannelEntity channelEntity;
        if (this.activeChannelHolder != null && this.checkWritable(channelEntity = this.determineChannelEntity())) {
            return channelEntity.getChannelFuture();
        }
        return null;
    }

    private boolean checkActive(ChannelFuture future) {
        boolean isActive = false;
        if (future != null) {
            Channel channel = future.channel();
            if (channel.isActive() && channel.isOpen()) {
                isActive = true;
            } else {
                LOGGER.error("channel buf is not active ,current channel " + future.channel().remoteAddress());
            }
        }
        return isActive;
    }

    private void checkServerChanged() {
        Pair<Boolean, Set<Server>> pair = this.serversChanged();
        if (pair.getKey().booleanValue()) {
            LOGGER.info("server config changed :" + pair.getValue());
            Set<Server> servers = pair.getValue();
            List<InetSocketAddress> serverAddresses = this.parseSocketAddress(servers);
            ChannelsHolder newHolder = this.initChannel(serverAddresses, servers);
            if (newHolder != null) {
                if (newHolder.isConnectChanged()) {
                    ChannelsHolder last = this.activeChannelHolder;
                    this.activeChannelHolder = newHolder;
                    this.closeChannelHolder(last);
                    LOGGER.info("switch active channel to " + this.activeChannelHolder);
                } else {
                    this.activeChannelHolder = newHolder;
                }
            } else {
                ChannelsHolder last = this.activeChannelHolder;
                this.activeChannelHolder = null;
                this.closeChannelHolder(last);
                LOGGER.info("switch active channel to [none]");
            }
        }
    }

    private boolean checkWritable(ChannelEntity channelEntity) {
        boolean isWritable = false;
        if (channelEntity != null) {
            Channel channel = channelEntity.getChannelFuture().channel();
            if (channel.isActive() && channel.isOpen()) {
                if (channel.isWritable()) {
                    isWritable = true;
                } else {
                    channel.flush();
                }
            } else {
                int count = channelEntity.getAttempts().incrementAndGet();
                if (count % 1000 == 0 || count == 1) {
                    LOGGER.error("channel buf is is close when send msg! Attempts: " + count);
                }
            }
        }
        return isWritable;
    }

    private void closeChannelFuture(ChannelFuture channelFuture) {
        if (channelFuture != null) {
            SocketAddress address = channelFuture.channel().remoteAddress();
            if (address != null) {
                LOGGER.info("close channel " + address);
            }
            channelFuture.channel().close();
        }
    }

    private void closeChannelHolder(ChannelsHolder channelHolder) {
        if (channelHolder == null) {
            return;
        }
        Collection<ChannelEntity> channels = channelHolder.getChannels();
        for (ChannelEntity cf : channels) {
            if (!cf.isCloseable()) continue;
            this.closeChannelFuture(cf.getChannelFuture());
        }
    }

    private ChannelFuture createChannelFuture(InetSocketAddress address) {
        block3: {
            LOGGER.info("start connect server" + address.toString());
            ChannelFuture future = null;
            try {
                future = this.bootstrap.connect((SocketAddress)address);
                future.awaitUninterruptibly((long)this.configService.getClientConnectTimeout(), TimeUnit.MILLISECONDS);
                if (future.isSuccess()) {
                    LOGGER.info("Connected to CAT server at " + address);
                    return future;
                }
                LOGGER.error("Error when try connecting to " + address);
                this.closeChannelFuture(future);
            }
            catch (Throwable e) {
                LOGGER.error("Error when connect server " + address.getAddress(), e);
                if (future == null) break block3;
                this.closeChannelFuture(future);
            }
        }
        return null;
    }

    @Override
    public String getName() {
        return "netty-channel-health-check";
    }

    private ChannelsHolder initChannel(List<InetSocketAddress> addresses, Set<Server> servers) {
        if (CollectionUtils.isEmpty(addresses)) {
            return null;
        }
        ChannelsHolder newHolder = new ChannelsHolder();
        boolean connectChanged = false;
        Map<String, Server> ipServerMap = servers.stream().collect(Collectors.toMap(Server::getIp, x -> x));
        for (InetSocketAddress address : addresses) {
            ChannelEntity existChannelEntity;
            String ip = address.getAddress().getHostAddress();
            Server server = ipServerMap.get(ip);
            ChannelEntity newChannelEntity = new ChannelEntity(ip, server, address);
            ChannelFuture existChannelFuture = null;
            if (this.activeChannelHolder != null && (existChannelEntity = this.activeChannelHolder.getActiveChannelEntity(ip)) != null) {
                existChannelFuture = existChannelEntity.getChannelFuture();
                existChannelEntity.makeUnCloseable();
            }
            if (existChannelFuture != null) {
                newChannelEntity.setChannelFuture(existChannelFuture);
            } else {
                ChannelFuture newChannelFuture = this.createChannelFuture(address);
                if (newChannelFuture == null) {
                    LOGGER.error("createChannelFuture error, ip={}", (Object)ip);
                    continue;
                }
                newChannelEntity.setChannelFuture(newChannelFuture);
                connectChanged = true;
            }
            newHolder.addChannelEntity(newChannelEntity);
        }
        if (this.activeChannelHolder == null || !connectChanged && this.activeChannelHolder.getActiveIps().size() != addresses.size()) {
            connectChanged = true;
        }
        newHolder.setConnectChanged(connectChanged);
        return newHolder;
    }

    private boolean isChannelStalled(ChannelEntity channel) {
        boolean active = this.checkActive(channel.getChannelFuture());
        AtomicInteger channelStalledTimesCounter = channel.getStalledTimesCounter();
        if (!active) {
            return channelStalledTimesCounter.incrementAndGet() % 3 == 0;
        }
        if (channelStalledTimesCounter.get() > 0) {
            channelStalledTimesCounter.decrementAndGet();
        }
        return false;
    }

    private List<InetSocketAddress> parseSocketAddress(Set<Server> servers) {
        if (CollectionUtils.isEmpty(servers)) {
            return Collections.emptyList();
        }
        ArrayList<InetSocketAddress> address = new ArrayList<InetSocketAddress>();
        for (Server server : servers) {
            address.add(new InetSocketAddress(server.getIp(), server.getPort()));
        }
        return address;
    }

    private Pair<Boolean, Set<Server>> serversChanged() {
        Set<Server> servers = this.configService.getServers();
        if (this.activeChannelHolder == null) {
            if (CollectionUtils.isEmpty(servers)) {
                return new Pair<Boolean, Set<Server>>(false, servers);
            }
            return new Pair<Boolean, Set<Server>>(true, servers);
        }
        if (this.activeChannelHolder.isEqualServers(servers)) {
            return new Pair<Boolean, Set<Server>>(false, servers);
        }
        return new Pair<Boolean, Set<Server>>(true, servers);
    }

    private void doubleCheckAndReconnectActiveServer(ChannelsHolder channelHolder) {
        if (channelHolder == null) {
            return;
        }
        Collection<ChannelEntity> activeChannels = channelHolder.getChannels();
        for (ChannelEntity channelEntity : activeChannels) {
            String ip = channelEntity.getIp();
            ChannelFuture currentChannelFuture = channelEntity.getChannelFuture();
            if (!this.isChannelStalled(channelEntity)) continue;
            this.closeChannelFuture(currentChannelFuture);
            InetSocketAddress address = channelEntity.getAddress();
            ChannelFuture newFuture = this.createChannelFuture(address);
            if (newFuture == null) {
                LOGGER.error("createChannelFuture error, ip={}", (Object)ip);
                continue;
            }
            channelEntity.setChannelFuture(newFuture);
        }
    }

    @Override
    public void run() {
        while (this.active) {
            try {
                if (Cat.isEnabled()) {
                    ++this.reconnectCount;
                    if (this.reconnectCount % 10 == 0) {
                        this.idFactory.saveMark();
                        this.checkAndReconnectServer();
                    }
                }
            }
            catch (Exception e) {
                LOGGER.error("ChannelManager reconnect error", (Throwable)e);
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void checkAndReconnectServer() {
        this.checkServerChanged();
        this.doubleCheckAndReconnectActiveServer(this.activeChannelHolder);
    }

    @Override
    public void shutdown() {
        this.active = false;
    }

    public static class ChannelsHolder {
        private final Map<String, ChannelEntity> channelMap = new HashMap<String, ChannelEntity>();
        private final List<String> activeIps = new ArrayList<String>();
        private boolean connectChanged;

        public boolean isConnectChanged() {
            return this.connectChanged;
        }

        public void setConnectChanged(boolean connectChanged) {
            this.connectChanged = connectChanged;
        }

        public Server getServer(String ip) {
            ChannelEntity channel = this.channelMap.get(ip);
            if (channel == null) {
                throw new ResourceNotFoundException("Channel not found, ip={}", ip);
            }
            return channel.getServer();
        }

        public ChannelFuture getActiveChannelFuture(String ip) {
            ChannelEntity channelEntity = this.channelMap.get(ip);
            if (channelEntity == null) {
                return null;
            }
            return channelEntity.getChannelFuture();
        }

        public ChannelEntity getActiveChannelEntity(String ip) {
            return this.channelMap.get(ip);
        }

        public List<String> getActiveIps() {
            return this.activeIps;
        }

        public Collection<ChannelEntity> getChannels() {
            return this.channelMap.values();
        }

        public void addChannelEntity(ChannelEntity newChannel) {
            this.channelMap.put(newChannel.getIp(), newChannel);
            this.activeIps.add(newChannel.getIp());
        }

        public ChannelEntity getChannelEntity(String ip) {
            ChannelEntity channelEntity = this.channelMap.get(ip);
            if (channelEntity == null) {
                throw new ResourceNotFoundException("Channel not found, ip={}", ip);
            }
            return channelEntity;
        }

        public boolean isEqualServers(Set<Server> servers) {
            if (CollectionUtils.isEmpty(servers)) {
                return this.channelMap.isEmpty();
            }
            if (servers.size() != this.channelMap.size()) {
                return false;
            }
            for (Server s : servers) {
                ChannelEntity channel = this.channelMap.get(s.getIp());
                if (channel != null) continue;
                return false;
            }
            return true;
        }

        public String toString() {
            return this.activeIps.toString();
        }
    }

    public static class ChannelEntity {
        private final String ip;
        private final Server server;
        private final InetSocketAddress address;
        private final AtomicInteger stalledTimesCounter = new AtomicInteger(0);
        private final AtomicInteger attempts = new AtomicInteger(0);
        private ChannelFuture channelFuture;
        private boolean closeable = true;

        public ChannelEntity(String ip, Server server, InetSocketAddress address) {
            this.ip = ip;
            this.server = server;
            this.address = address;
        }

        public String getIp() {
            return this.ip;
        }

        public Server getServer() {
            return this.server;
        }

        public InetSocketAddress getAddress() {
            return this.address;
        }

        public AtomicInteger getStalledTimesCounter() {
            return this.stalledTimesCounter;
        }

        public AtomicInteger getAttempts() {
            return this.attempts;
        }

        public ChannelFuture getChannelFuture() {
            return this.channelFuture;
        }

        public void setChannelFuture(ChannelFuture channelFuture) {
            this.channelFuture = channelFuture;
        }

        public boolean isCloseable() {
            return this.closeable;
        }

        public void makeUnCloseable() {
            this.closeable = false;
        }
    }
}

