yy1717
2019-12-30 704a7434b0494e84a578d719daacb72f995b2ad2
im_lib/src/main/java/com/anyun/im_lib/netty/NettyTcpClient.java
@@ -12,9 +12,13 @@
import com.anyun.im_lib.listener.IMSConnectStatusCallback;
import com.anyun.im_lib.listener.OnEventListener;
import java.nio.ByteBuffer;
import java.util.Vector;
import java.util.function.ToDoubleBiFunction;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
@@ -149,72 +153,8 @@
        }
    }
    private class ResetConnectRunnable implements Runnable{
        private boolean isFirst;
        public ResetConnectRunnable(boolean isFirst) {
            this.isFirst = isFirst;
        }
        @Override
        public void run() {
            if (!isFirst){
                onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE);
            }
            try {
                //重连时,释放工作组线程池,也就是停止心跳
                loopGroup.destroyWorkLoopGroup();
                while (!isClosed){
                    if (!isNetworkAvaliable()){
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        continue;
                    }
                    //网络可用才进行连接
                    int status;
                    if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){
                        onConnectStatusCallback(status);
                        //连接成功,调出循环
                        break;
                    }
                    if (status == IMSConfig.CONNECT_STATE_FAILURE){
                        onConnectStatusCallback(status);
                        try {
                            Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } finally {
                //标识重连任务停止
                isReconnecting = false;
            }
        }
    }
    /**
     * 重连,首次连接也认为是第一次重连
     * @return
     */
    private int reConnect() {
        if (!isClosed){
            try {
                //先释放EventLoop线程组
                if (bootstrap != null){
                    bootstrap.group().shutdownGracefully();
                }
            } finally {
                    bootstrap = null;
            }
            initBootstrap();
            return connectServer();
        }
        return IMSConfig.CONNECT_STATE_FAILURE;
    }
    /***
     * 初始化Bootstrap
@@ -225,19 +165,29 @@
        bootstrap.group(loopGroup).channel(NioSocketChannel.class);
        // 设置该项以后,如果在两小时内没有数据通信时,TCP会自动发送一个活动探测数据报文
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        //设置禁用nagle算法
        bootstrap.option(ChannelOption.TCP_NODELAY,true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getConnectTimeout());
        bootstrap.handler(new TCPChannelInitializerHandler(this));
    }
    private int connectServer(){
        if (serverUrlList == null || serverUrlList.size() == 0){
            return IMSConfig.CONNECT_STATE_FAILURE;
        }
        for (int i = 0; i < serverUrlList.size(); i++) {
            String serverUrl = serverUrlList.get(i);
            if (TextUtils.isEmpty(serverUrl)){
                return IMSConfig.CONNECT_STATE_FAILURE;
    /**
     * 真正连接服务器的地方
     */
    private void toServer() {
        try {
            channel = bootstrap.connect(currentHost,currentPort).sync().channel();
        } catch (Exception e) {
            Log.i(TAG, String.format("连接Server(ip[%s],port[%d]失败)",currentHost,currentPort));
            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            channel = null;
        }
    }
@@ -248,9 +198,46 @@
        return false;
    }
    /**
     * 回调ims连接状态
     * @param connectStateConnecting
     */
    private void onConnectStatusCallback(int connectStateConnecting) {
        this.connectStatus = connectStateConnecting;
        switch (connectStatus){
            case IMSConfig.CONNECT_STATE_CONNECTING:
                Log.i(TAG, "onConnectStatusCallback: ims连接中...");
                if (imsConnectStatusCallback != null){
                    imsConnectStatusCallback.onConnecting();
                }
                break;
            case IMSConfig.CONNECT_STATE_SUCCESSFUL:
                Log.i(TAG, String.format("onConnectStatusCallback: ims连接成功,host[%s],port[%s]",currentHost,currentPort));
                if (imsConnectStatusCallback != null){
                    imsConnectStatusCallback.onConnected();
                }
                // TODO: 2019/12/12  连接成功 ,发送握手消息(需要的话——)
                try {
                    ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
                    byteBuf.writeBytes(mOnEventListener.getRegisterMessage());
                    channel.writeAndFlush(byteBuf);
                } catch (Exception e) {
                    Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t");
                }
                break;
            case IMSConfig.CONNECT_STATE_FAILURE:
                default:
                    Log.i(TAG, "onConnectStatusCallback: ims连接失败");
                    if (imsConnectStatusCallback != null){
                        imsConnectStatusCallback.onConnectFailed();
                    }
                    break;
        }
    }
    /**
     * 关闭连接,同时释放资源
     */
    @Override
    public void close() {
        if (isClosed){
@@ -323,22 +310,51 @@
    @Override
    public void sendMsg(String msg) {
        this.sendMsg(msg,true);
    }
    /**
     *
     * @param msg
     * @param isJoinTimeoutManager 是否加入超时管理器
     */
    @Override
    public void sendMsg(String msg, boolean isJoinTimeoutManager) {
        if (msg==null ){
            return;
        }
        // TODO: 2019/12/12  根据MSG_ID 来加入 超时,  暂不写
        if (channel == null){
            Log.i(TAG, "sendMsg fail,channel为空"+msg);
        }
        try {
            ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
            byteBuf.writeBytes(msg.getBytes());
            channel.writeAndFlush(byteBuf);
        } catch (Exception e) {
            Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"+msg);
        }
    }
    /**
     * 获取客户设置的重连时长
     * @return
     */
    @Override
    public int getReconnectInterval() {
        return 0;
        if (mOnEventListener != null && mOnEventListener.getReConnectInterval()>0){
            return reconnectInterval = mOnEventListener.getReConnectInterval();
        }
        return reconnectInterval;
    }
    @Override
    public int getConnectTimeout() {
        return 0;
        if(mOnEventListener != null && mOnEventListener.getConnectTimeout()>0){
            connectTimeOut = mOnEventListener.getConnectTimeout();
        }
        return connectTimeOut;
    }
    @Override
@@ -368,11 +384,139 @@
    @Override
    public MsgDispatcher getMsgDispatcher() {
        return null;
        return msgDispatcher;
    }
    @Override
    public MsgTimeOutTimerManager getMsgTimeOutTimerManager() {
        return msgTimeOutTimerManager;
    }
    @Override
    public byte[] getRegisterMessage() {
        if (mOnEventListener != null){
            return mOnEventListener.getRegisterMessage();
        }
        return null;
    }
    /**
     * 重连任务
     */
    private class ResetConnectRunnable implements Runnable{
        private boolean isFirst;
        public ResetConnectRunnable(boolean isFirst) {
            this.isFirst = isFirst;
        }
        @Override
        public void run() {
            Log.i(TAG, "执行重连任务");
            if (!isFirst){
                onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE);
            }
            try {
                //重连时,释放工作组线程池,也就是停止心跳
                loopGroup.destroyWorkLoopGroup();
                while (!isClosed){
                    if (!isNetworkAvaliable()){
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        continue;
                    }
                    //网络可用才进行连接
                    int status;
                    if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){
                        onConnectStatusCallback(status);
                        //连接成功,调出循环
                        break;
                    }
                    if (status == IMSConfig.CONNECT_STATE_FAILURE){
                        onConnectStatusCallback(status);
                        try {
                            Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            } finally {
                //标识重连任务停止
                isReconnecting = false;
            }
        }
        /**
         * 重连,首次连接也认为是第一次重连
         * @return
         */
        private int reConnect() {
            if (!isClosed){
                try {
                    //先释放EventLoop线程组
                    if (bootstrap != null){
                        bootstrap.group().shutdownGracefully();
                    }
                } finally {
                    bootstrap = null;
                }
                initBootstrap();
                return connectServer();
            }
            return IMSConfig.CONNECT_STATE_FAILURE;
        }
        private int connectServer(){
            //服务器地址无效,直接回调连接状态,不再进行连接
            if (serverUrlList == null || serverUrlList.size() == 0){
                return IMSConfig.CONNECT_STATE_FAILURE;
            }
            for (int i = 0; i < serverUrlList.size(); i++) {
                String serverUrl = serverUrlList.get(i);
                //服务器地址无效,直接回调连接状态,不再进行连接
                if (TextUtils.isEmpty(serverUrl)){
                    return IMSConfig.CONNECT_STATE_FAILURE;
                }
                String[] address = serverUrl.split(" ");
                for (int j = 1; j <= IMSConfig.DEFAULT_RECONNECT_COUNT; j++) {
                    //如果ims已经关闭,或网络不可用,直接回调连接状态,不再进行连接
                    if (isClosed || !isNetworkAvaliable()){
                        return IMSConfig.CONNECT_STATE_FAILURE;
                    }
                    //回调连接状态
                    if(connectStatus != IMSConfig.CONNECT_STATE_CONNECTING){
                        onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING);
                    }
                    Log.i(TAG, String.format("正在进行connectServer【%s】的第[%d]连接,当前重连延时时长为[%dms]: ",serverUrl,j,j*getReconnectInterval()));
                    try {
                        currentPort = Integer.parseInt(address[1]);
                        currentHost = address[0];
                        //连接服务器
                        toServer();
                        //channel不为空,则认为连接已经成功
                        if(channel != null){
                            return IMSConfig.CONNECT_STATE_SUCCESSFUL;
                        }else{
                            Thread.sleep(j*getReconnectInterval());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        close();
                        break;
                    }
                }
            }
            return IMSConfig.CONNECT_STATE_FAILURE;
        }
    }
}