package com.anyun.im_lib.netty; import android.text.TextUtils; import android.util.Log; import com.anyun.im_lib.ExecutorServiceFactory; import com.anyun.im_lib.HeartbeatHandler; import com.anyun.im_lib.IMSConfig; import com.anyun.im_lib.MsgDispatcher; import com.anyun.im_lib.MsgTimeOutTimerManager; import com.anyun.im_lib.interf.IMSClientInteface; import com.anyun.im_lib.listener.IMSConnectStatusCallback; import com.anyun.im_lib.listener.OnEventListener; import java.nio.ByteBuffer; import java.util.Vector; import java.util.function.ToDoubleBiFunction; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; /** * MyApplication2 * Created by lzw on 2019/12/2. 13:14:52 * 邮箱:632393724@qq.com * All Rights Saved! Chongqing AnYun Tech co. LTD */ public class NettyTcpClient implements IMSClientInteface { private static final String TAG = NettyTcpClient.class.getSimpleName(); private static volatile NettyTcpClient instance; private Bootstrap bootstrap; private Channel channel; /****标识IMS是否已关闭***/ private boolean isClosed = false; /****ims服务器地址组***/ private Vector serverUrlList; /****与应用层交互的listener***/ private OnEventListener mOnEventListener; /*** ims连接状态回调监听器**/ private IMSConnectStatusCallback imsConnectStatusCallback; /****消息转发器***/ private MsgDispatcher msgDispatcher; /***线程池工厂**/ private ExecutorServiceFactory loopGroup; /****是否正在进行重连***/ private boolean isReconnecting = false; /*** ims连接状态,初始化为连接失败 ***/ private int connectStatus = IMSConfig.CONNECT_STATE_FAILURE; /*** 重连间隔时长 ***/ private int reconnectInterval = IMSConfig.DEFAULT_RECONNECT_BASE_DELAY_TIME; /*** 连接超时时长 ***/ private int connectTimeOut = IMSConfig.DEFAULT_CONNECT_TIMEOUT; private int heartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_FOREGROUND; private int foregroundHeartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_FOREGROUND; private int backgroundHeartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_BACKGROUND; private int appStatus = IMSConfig.APP_STATUS_FOREGROUND; /*** 消息发送超时重发次数***/ private int resendCount = IMSConfig.DEFAULT_RESEND_COUNT; /*** 消息发送失败重发间隔时长***/ private int resendInterval = IMSConfig.DEFAULT_RESEND_INTERVAL; /*** 当前连接host***/ private String currentHost = null; /*** 当前连接port***/ private int currentPort = -1; /*** 消息发送超时定时管理器***/ private MsgTimeOutTimerManager msgTimeOutTimerManager; private NettyTcpClient(){ } public static IMSClientInteface getInstance() { if (null == instance){ synchronized (NettyTcpClient.class){ if (null==instance){ instance = new NettyTcpClient(); } } } return instance; } /** * 初始化 * @param serverUrlList 服务器地址列表 * @param listener 与应用层交互的listener * @param callback ims连接状态回调 */ @Override public void init(Vector serverUrlList, OnEventListener listener, IMSConnectStatusCallback callback) { close(); isClosed = false; this.serverUrlList = serverUrlList; this.mOnEventListener = listener; this.imsConnectStatusCallback = callback; msgDispatcher = new MsgDispatcher(); msgDispatcher.setOnEventListener(listener); loopGroup = new ExecutorServiceFactory(); loopGroup.initBossLoopGroup(); msgTimeOutTimerManager = new MsgTimeOutTimerManager(this); /*** 进行第一次连接***/ resetConnect(true); } @Override public void resetConnect() { this.resetConnect(false); } @Override public void resetConnect(boolean isFirst) { if (!isFirst){ try { Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } //只有第一个调用者才能赋值并调用重连 if (!isClosed && !isReconnecting){ synchronized (this){ if (!isClosed && !isReconnecting){ //标识重连任务进行中... isReconnecting = true; onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING); closeChannel(); loopGroup.execBossTask(new ResetConnectRunnable(isFirst)); } } } } /*** * 初始化Bootstrap */ private void initBootstrap() { EventLoopGroup loopGroup = new NioEventLoopGroup(4); bootstrap = new Bootstrap(); bootstrap.group(loopGroup).channel(NioSocketChannel.class); // 设置该项以后,如果在两小时内没有数据通信时,TCP会自动发送一个活动探测数据报文 bootstrap.option(ChannelOption.SO_KEEPALIVE,true); //设置禁用nagle算法 bootstrap.option(ChannelOption.TCP_NODELAY,true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getConnectTimeout()); bootstrap.handler(new TCPChannelInitializerHandler(this)); } /** * 真正连接服务器的地方 */ private void toServer() { try { channel = bootstrap.connect(currentHost,currentPort).sync().channel(); } catch (Exception e) { Log.i(TAG, String.format("连接Server(ip[%s],port[%d]失败)",currentHost,currentPort)); try { Thread.sleep(500); } catch (InterruptedException ex) { ex.printStackTrace(); } channel = null; } } private boolean isNetworkAvaliable() { if (mOnEventListener != null){ return mOnEventListener.isNetWorkAvailable(); } return false; } /** * 回调ims连接状态 * @param connectStateConnecting */ private void onConnectStatusCallback(int connectStateConnecting) { this.connectStatus = connectStateConnecting; switch (connectStatus){ case IMSConfig.CONNECT_STATE_CONNECTING: Log.i(TAG, "onConnectStatusCallback: ims连接中..."); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnecting(); } break; case IMSConfig.CONNECT_STATE_SUCCESSFUL: Log.i(TAG, String.format("onConnectStatusCallback: ims连接成功,host[%s],port[%s]",currentHost,currentPort)); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnected(); } // TODO: 2019/12/12 连接成功 ,发送握手消息(需要的话——) try { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byteBuf.writeBytes(mOnEventListener.getRegisterMessage()); channel.writeAndFlush(byteBuf); } catch (Exception e) { Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"); } break; case IMSConfig.CONNECT_STATE_FAILURE: default: Log.i(TAG, "onConnectStatusCallback: ims连接失败"); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnectFailed(); } break; } } /** * 关闭连接,同时释放资源 */ @Override public void close() { if (isClosed){ return; } isClosed = true; /*** 关闭channel***/ try { closeChannel(); } catch (Exception e) { e.printStackTrace(); } /*** 关闭bootstrap ***/ try { if (bootstrap != null){ bootstrap.group().shutdownGracefully(); } } catch (Exception e) { e.printStackTrace(); } /*** 释放线程池 ***/ try { if (loopGroup != null){ loopGroup.destroy(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (serverUrlList != null){ serverUrlList.clear(); } } catch (Exception e) { e.printStackTrace(); } isReconnecting = false; channel = null; bootstrap = null; } } private void closeChannel() { if (channel != null){ removeHandler(HeartbeatHandler.class.getSimpleName()); removeHandler(TCPReadHandler.class.getSimpleName()); removeHandler(IdleStateHandler.class.getSimpleName()); } } /** * 移除指定handler * @param handlerName */ private void removeHandler(String handlerName) { try { if (channel.pipeline().get(handlerName) != null){ channel.pipeline().remove(handlerName); } } catch (Exception e) { e.printStackTrace(); Log.i(TAG, "removeHandler fail,handlerName="+handlerName); } } @Override public boolean isClosed() { return false; } @Override public void sendMsg(String msg) { this.sendMsg(msg,true); } /** * * @param msg * @param isJoinTimeoutManager 是否加入超时管理器 */ @Override public void sendMsg(String msg, boolean isJoinTimeoutManager) { if (msg==null ){ return; } // TODO: 2019/12/12 根据MSG_ID 来加入 超时, 暂不写 if (channel == null){ Log.i(TAG, "sendMsg fail,channel为空"+msg); } try { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byteBuf.writeBytes(msg.getBytes()); channel.writeAndFlush(byteBuf); } catch (Exception e) { Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"+msg); } } /** * 获取客户设置的重连时长 * @return */ @Override public int getReconnectInterval() { if (mOnEventListener != null && mOnEventListener.getReConnectInterval()>0){ return reconnectInterval = mOnEventListener.getReConnectInterval(); } return reconnectInterval; } @Override public int getConnectTimeout() { if(mOnEventListener != null && mOnEventListener.getConnectTimeout()>0){ connectTimeOut = mOnEventListener.getConnectTimeout(); } return connectTimeOut; } @Override public int getForegroundHeartbeatInterval() { return 0; } @Override public void setAppStatus(int appStatus) { } @Override public int getBackgroundHeartbeatInterval() { return 0; } @Override public int getResendCount() { return 0; } @Override public int getResendInterval() { return 0; } @Override public MsgDispatcher getMsgDispatcher() { return msgDispatcher; } @Override public MsgTimeOutTimerManager getMsgTimeOutTimerManager() { return msgTimeOutTimerManager; } @Override public byte[] getRegisterMessage() { if (mOnEventListener != null){ return mOnEventListener.getRegisterMessage(); } return null; } /** * 重连任务 */ private class ResetConnectRunnable implements Runnable{ private boolean isFirst; public ResetConnectRunnable(boolean isFirst) { this.isFirst = isFirst; } @Override public void run() { Log.i(TAG, "执行重连任务"); if (!isFirst){ onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE); } try { //重连时,释放工作组线程池,也就是停止心跳 loopGroup.destroyWorkLoopGroup(); while (!isClosed){ if (!isNetworkAvaliable()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //网络可用才进行连接 int status; if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){ onConnectStatusCallback(status); //连接成功,调出循环 break; } if (status == IMSConfig.CONNECT_STATE_FAILURE){ onConnectStatusCallback(status); try { Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } } } finally { //标识重连任务停止 isReconnecting = false; } } /** * 重连,首次连接也认为是第一次重连 * @return */ private int reConnect() { if (!isClosed){ try { //先释放EventLoop线程组 if (bootstrap != null){ bootstrap.group().shutdownGracefully(); } } finally { bootstrap = null; } initBootstrap(); return connectServer(); } return IMSConfig.CONNECT_STATE_FAILURE; } private int connectServer(){ //服务器地址无效,直接回调连接状态,不再进行连接 if (serverUrlList == null || serverUrlList.size() == 0){ return IMSConfig.CONNECT_STATE_FAILURE; } for (int i = 0; i < serverUrlList.size(); i++) { String serverUrl = serverUrlList.get(i); //服务器地址无效,直接回调连接状态,不再进行连接 if (TextUtils.isEmpty(serverUrl)){ return IMSConfig.CONNECT_STATE_FAILURE; } String[] address = serverUrl.split(" "); for (int j = 1; j <= IMSConfig.DEFAULT_RECONNECT_COUNT; j++) { //如果ims已经关闭,或网络不可用,直接回调连接状态,不再进行连接 if (isClosed || !isNetworkAvaliable()){ return IMSConfig.CONNECT_STATE_FAILURE; } //回调连接状态 if(connectStatus != IMSConfig.CONNECT_STATE_CONNECTING){ onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING); } Log.i(TAG, String.format("正在进行connectServer【%s】的第[%d]连接,当前重连延时时长为[%dms]: ",serverUrl,j,j*getReconnectInterval())); try { currentPort = Integer.parseInt(address[1]); currentHost = address[0]; //连接服务器 toServer(); //channel不为空,则认为连接已经成功 if(channel != null){ return IMSConfig.CONNECT_STATE_SUCCESSFUL; }else{ Thread.sleep(j*getReconnectInterval()); } } catch (InterruptedException e) { e.printStackTrace(); close(); break; } } } return IMSConfig.CONNECT_STATE_FAILURE; } } }