/*
 * Decompiled with CFR 0.152.
 */
package com.navercorp.pinpoint.rpc.stream;

import com.alibaba.csp.ahas.ext.arms.shaded.org.jboss.netty.channel.Channel;
import com.alibaba.csp.ahas.ext.arms.shaded.org.jboss.netty.channel.ChannelFuture;
import com.navercorp.pinpoint.common.arms.logging.PLogger;
import com.navercorp.pinpoint.common.arms.logging.PLoggerFactory;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.packet.stream.StreamClosePacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCreateFailPacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCreatePacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamCreateSuccessPacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamPacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamPingPacket;
import com.navercorp.pinpoint.rpc.packet.stream.StreamResponsePacket;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannel;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannelMessageListener;
import com.navercorp.pinpoint.rpc.stream.DisabledServerStreamChannelMessageListener;
import com.navercorp.pinpoint.rpc.stream.LoggingStreamChannelStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannel;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.ServerStreamChannelMessageListener;
import com.navercorp.pinpoint.rpc.stream.StreamChannel;
import com.navercorp.pinpoint.rpc.stream.StreamChannelContext;
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.stream.StreamChannelStateCode;
import com.navercorp.pinpoint.rpc.util.IDGenerator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class StreamChannelManager {
    private static final LoggingStreamChannelStateChangeEventHandler LOGGING_STATE_CHANGE_HANDLER = new LoggingStreamChannelStateChangeEventHandler();
    private final PLogger logger = PLoggerFactory.getLogger(this.getClass());
    private final Channel channel;
    private final IDGenerator idGenerator;
    private final ServerStreamChannelMessageListener streamChannelMessageListener;
    private final ConcurrentMap<Integer, StreamChannelContext> channelMap = new ConcurrentHashMap<Integer, StreamChannelContext>();

    public StreamChannelManager(Channel channel, IDGenerator idGenerator) {
        this(channel, idGenerator, DisabledServerStreamChannelMessageListener.INSTANCE);
    }

    public StreamChannelManager(Channel channel, IDGenerator idGenerator, ServerStreamChannelMessageListener serverStreamChannelMessageListener) {
        Assert.requireNonNull(channel, "Channel must not be null.");
        Assert.requireNonNull(idGenerator, "IDGenerator must not be null.");
        Assert.requireNonNull(serverStreamChannelMessageListener, "ServerStreamChannelMessageListener must not be null.");
        this.channel = channel;
        this.idGenerator = idGenerator;
        this.streamChannelMessageListener = serverStreamChannelMessageListener;
    }

    public void close() {
        Set keySet = this.channelMap.keySet();
        for (Integer key : keySet) {
            this.clearResourceAndSendClose(key, StreamCode.STATE_CLOSED);
        }
    }

    public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener messageListener) {
        return this.openStream(payload, messageListener, LOGGING_STATE_CHANGE_HANDLER);
    }

    public ClientStreamChannelContext openStream(byte[] payload, ClientStreamChannelMessageListener messageListener, StreamChannelStateChangeEventHandler<ClientStreamChannel> stateChangeListener) {
        this.logger.info("Open streamChannel initialization started. Channel:{} ", (Object)this.channel);
        int streamChannelId = this.idGenerator.generate();
        ClientStreamChannel newStreamChannel = new ClientStreamChannel(this.channel, streamChannelId, this);
        if (stateChangeListener != null) {
            newStreamChannel.addStateChangeEventHandler(stateChangeListener);
        } else {
            newStreamChannel.addStateChangeEventHandler(LOGGING_STATE_CHANGE_HANDLER);
        }
        newStreamChannel.changeStateOpen();
        ClientStreamChannelContext newStreamChannelContext = new ClientStreamChannelContext(newStreamChannel, messageListener);
        StreamChannelContext old = this.channelMap.put(streamChannelId, newStreamChannelContext);
        if (old != null) {
            throw new PinpointSocketException("already streamChannelId exist:" + streamChannelId + " streamChannel:" + old);
        }
        newStreamChannel.changeStateConnectAwait();
        newStreamChannel.sendCreate(payload);
        newStreamChannel.awaitOpen(3000L);
        if (newStreamChannel.checkState(StreamChannelStateCode.CONNECTED)) {
            this.logger.info("Open streamChannel initialization completed. Channel:{}, StreamChannelContext:{} ", (Object)this.channel, (Object)newStreamChannelContext);
        } else {
            newStreamChannel.changeStateClose();
            this.channelMap.remove(streamChannelId);
            newStreamChannelContext.setCreateFailPacket(new StreamCreateFailPacket(streamChannelId, StreamCode.CONNECTION_TIMEOUT));
        }
        return newStreamChannelContext;
    }

    public void messageReceived(StreamPacket packet) {
        int streamChannelId = packet.getStreamChannelId();
        short packetType = packet.getPacketType();
        this.logger.debug("StreamChannel message received. (Channel:{}, StreamId:{}, Packet:{}).", this.channel, streamChannelId, packet);
        if (10 == packetType) {
            this.handleCreate((StreamCreatePacket)packet);
            return;
        }
        StreamChannelContext context = this.findStreamChannel(streamChannelId);
        if (context == null) {
            if (15 != packetType) {
                this.clearResourceAndSendClose(streamChannelId, StreamCode.ID_NOT_FOUND);
            }
        } else if (this.isServerStreamChannelContext(context)) {
            this.messageReceived((ServerStreamChannelContext)context, packet);
        } else if (this.isClientStreamChannelContext(context)) {
            this.messageReceived((ClientStreamChannelContext)context, packet);
        } else {
            this.clearResourceAndSendClose(streamChannelId, StreamCode.UNKNWON_ERROR);
        }
    }

    private void messageReceived(ServerStreamChannelContext context, StreamPacket packet) {
        short packetType = packet.getPacketType();
        int streamChannelId = packet.getStreamChannelId();
        switch (packetType) {
            case 15: {
                this.handleStreamClose(context, (StreamClosePacket)packet);
                break;
            }
            case 17: {
                this.handlePing(context, (StreamPingPacket)packet);
                break;
            }
            case 18: {
                break;
            }
            default: {
                this.clearResourceAndSendClose(streamChannelId, StreamCode.PACKET_UNKNOWN);
                this.logger.info("Unknown StreamPacket received Channel:{}, StreamId:{}, Packet;{}.", this.channel, streamChannelId, packet);
            }
        }
    }

    private void messageReceived(ClientStreamChannelContext context, StreamPacket packet) {
        short packetType = packet.getPacketType();
        int streamChannelId = packet.getStreamChannelId();
        switch (packetType) {
            case 12: {
                this.handleCreateSuccess(context, (StreamCreateSuccessPacket)packet);
                break;
            }
            case 14: {
                this.handleCreateFail(context, (StreamCreateFailPacket)packet);
                break;
            }
            case 20: {
                this.handleStreamResponse(context, (StreamResponsePacket)packet);
                break;
            }
            case 15: {
                this.handleStreamClose(context, (StreamClosePacket)packet);
                break;
            }
            case 17: {
                this.handlePing(context, (StreamPingPacket)packet);
                break;
            }
            case 18: {
                break;
            }
            default: {
                this.clearResourceAndSendClose(streamChannelId, StreamCode.PACKET_UNKNOWN);
                this.logger.info("Unknown StreamPacket received Channel:{}, StreamId:{}, Packet;{}.", this.channel, streamChannelId, packet);
            }
        }
    }

    private void handleCreate(StreamCreatePacket packet) {
        int streamChannelId = packet.getStreamChannelId();
        StreamCode code = StreamCode.OK;
        ServerStreamChannel streamChannel = new ServerStreamChannel(this.channel, streamChannelId, this);
        ServerStreamChannelContext streamChannelContext = new ServerStreamChannelContext(streamChannel);
        code = this.registerStreamChannel(streamChannelContext);
        if (code == StreamCode.OK && (code = this.streamChannelMessageListener.handleStreamCreate(streamChannelContext, packet)) == StreamCode.OK) {
            streamChannel.changeStateConnected();
            streamChannel.sendCreateSuccess();
        }
        if (code != StreamCode.OK) {
            this.clearResourceAndSendCreateFail(streamChannelId, code);
        }
    }

    private StreamCode registerStreamChannel(ServerStreamChannelContext streamChannelContext) {
        int streamChannelId = streamChannelContext.getStreamId();
        ServerStreamChannel streamChannel = streamChannelContext.getStreamChannel();
        streamChannel.changeStateOpen();
        if (this.channelMap.putIfAbsent(streamChannelId, streamChannelContext) != null) {
            streamChannel.changeStateClose();
            return StreamCode.ID_DUPLICATED;
        }
        if (!streamChannel.changeStateConnectArrived()) {
            streamChannel.changeStateClose();
            this.channelMap.remove(streamChannelId);
            return StreamCode.STATE_ERROR;
        }
        return StreamCode.OK;
    }

    private void handleCreateSuccess(ClientStreamChannelContext streamChannelContext, StreamCreateSuccessPacket packet) {
        ClientStreamChannel streamChannel = streamChannelContext.getStreamChannel();
        streamChannel.changeStateConnected();
    }

    private void handleCreateFail(ClientStreamChannelContext streamChannelContext, StreamCreateFailPacket packet) {
        streamChannelContext.setCreateFailPacket(packet);
        this.clearStreamChannelResource(streamChannelContext.getStreamId());
    }

    private void handleStreamResponse(ClientStreamChannelContext context, StreamResponsePacket packet) {
        int streamChannelId = packet.getStreamChannelId();
        ClientStreamChannel streamChannel = context.getStreamChannel();
        StreamChannelStateCode currentCode = streamChannel.getCurrentState();
        if (StreamChannelStateCode.CONNECTED == currentCode) {
            context.getClientStreamChannelMessageListener().handleStreamData(context, packet);
        } else if (StreamChannelStateCode.CONNECT_AWAIT != currentCode) {
            this.clearResourceAndSendClose(streamChannelId, StreamCode.STATE_NOT_CONNECTED);
        }
    }

    private void handleStreamClose(ClientStreamChannelContext context, StreamClosePacket packet) {
        context.getClientStreamChannelMessageListener().handleStreamClose(context, packet);
        this.clearStreamChannelResource(context.getStreamId());
    }

    private void handleStreamClose(ServerStreamChannelContext context, StreamClosePacket packet) {
        this.streamChannelMessageListener.handleStreamClose(context, packet);
        this.clearStreamChannelResource(context.getStreamId());
    }

    private void handlePing(StreamChannelContext streamChannelContext, StreamPingPacket packet) {
        int streamChannelId = packet.getStreamChannelId();
        StreamChannel streamChannel = streamChannelContext.getStreamChannel();
        if (!streamChannel.checkState(StreamChannelStateCode.CONNECTED)) {
            this.clearResourceAndSendClose(streamChannelId, StreamCode.STATE_NOT_CONNECTED);
            return;
        }
        streamChannel.sendPong(packet.getRequestId());
    }

    public StreamChannelContext findStreamChannel(int channelId) {
        StreamChannelContext streamChannelContext = (StreamChannelContext)this.channelMap.get(channelId);
        return streamChannelContext;
    }

    private ChannelFuture clearResourceAndSendCreateFail(int streamChannelId, StreamCode code) {
        this.clearStreamChannelResource(streamChannelId);
        return this.sendCreateFail(streamChannelId, code);
    }

    protected ChannelFuture clearResourceAndSendClose(int streamChannelId, StreamCode code) {
        this.clearStreamChannelResource(streamChannelId);
        return this.sendClose(streamChannelId, code);
    }

    private void clearStreamChannelResource(int streamId) {
        StreamChannelContext streamChannelContext = (StreamChannelContext)this.channelMap.remove(streamId);
        if (streamChannelContext != null) {
            streamChannelContext.getStreamChannel().changeStateClose();
        }
    }

    private ChannelFuture sendCreateFail(int streamChannelId, StreamCode code) {
        StreamCreateFailPacket packet = new StreamCreateFailPacket(streamChannelId, code);
        if (this.channel.isWritable()) {
            return this.channel.write(packet);
        }
        this.logger.error("[SteamChannelManager] channel isWritable is false! channel: " + this.channel);
        return null;
    }

    private ChannelFuture sendClose(int streamChannelId, StreamCode code) {
        if (this.channel.isConnected()) {
            StreamClosePacket packet = new StreamClosePacket(streamChannelId, code);
            return this.channel.write(packet);
        }
        return null;
    }

    private boolean isServerStreamChannelContext(StreamChannelContext context) {
        return context != null && context instanceof ServerStreamChannelContext;
    }

    private boolean isClientStreamChannelContext(StreamChannelContext context) {
        return context != null && context instanceof ClientStreamChannelContext;
    }

    public boolean isSupportServerMode() {
        return this.streamChannelMessageListener != DisabledServerStreamChannelMessageListener.INSTANCE;
    }
}

