im_lib/src/main/java/com/anyun/im_lib/ExecutorServiceFactory.java
@@ -1,5 +1,7 @@ package com.anyun.im_lib; import android.util.Log; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -10,6 +12,8 @@ * All Rights Saved! Chongqing AnYun Tech co. LTD */ public class ExecutorServiceFactory { private static final String TAG = ExecutorServiceFactory.class.getSimpleName(); /*** 管理线程组,负责重连***/ private ExecutorService bossPool; @@ -38,6 +42,7 @@ if (bossPool == null){ initBossLoopGroup(); } Log.i(TAG, "execBossTask"); bossPool.execute(runnable); } /** im_lib/src/main/java/com/anyun/im_lib/IMSConfig.java
@@ -25,4 +25,5 @@ public static final int DEFAULT_RECONNECT_INTERVAL = 3 * 1000; public static final int CONNECT_STATE_CONNECTING = 0; public static final int CONNECT_STATE_SUCCESSFUL = 1; public static final int DEFAULT_RECONNECT_COUNT = 3; } im_lib/src/main/java/com/anyun/im_lib/LoginAuthRespHandler.java
@@ -4,6 +4,7 @@ import com.anyun.im_lib.interf.IMSClientInteface; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -14,10 +15,17 @@ * 邮箱:632393724@qq.com * All Rights Saved! Chongqing AnYun Tech co. LTD */ public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter { public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter { private static final String TAG = LoginAuthRespHandler.class.getSimpleName(); private IMSClientInteface imsClient; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); Log.i(TAG, "channelActive: "); } /** * 构造函数 @@ -36,7 +44,7 @@ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); Log.i(TAG, "channelRead"); // Log.i(TAG, "channelRead"+(String)msg); // TODO: 2019/12/4 } } im_lib/src/main/java/com/anyun/im_lib/MsgDispatcher.java
@@ -9,7 +9,17 @@ * All Rights Saved! Chongqing AnYun Tech co. LTD */ public class MsgDispatcher { public void setOnEventListener(OnEventListener listener) { private OnEventListener onEventListener; public void setOnEventListener(OnEventListener listener) { this.onEventListener = listener; } public void receivedMsg(String msg){ // TODO: 2019/12/12 if (onEventListener != null){ onEventListener.dispatchMsg(msg); } } } im_lib/src/main/java/com/anyun/im_lib/interf/IMSClientInteface.java
@@ -110,5 +110,6 @@ MsgTimeOutTimerManager getMsgTimeOutTimerManager(); //获取注册消息 byte[] getRegisterMessage(); } im_lib/src/main/java/com/anyun/im_lib/listener/IMSConnectStatusCallback.java
@@ -8,7 +8,7 @@ */ public interface IMSConnectStatusCallback { /***ims连接中*/ void connecting(); void onConnecting(); /***ims连接成功**/ void onConnected(); im_lib/src/main/java/com/anyun/im_lib/listener/OnEventListener.java
@@ -37,6 +37,13 @@ */ int getBackgroundHeartbeatInterval(); /** * 获取应用层消息发送状态报告消息类型 * @return */ int getServerSentReportMsgType(); /** * 获取应用层消息发送超时重发次数 * @return @@ -48,4 +55,8 @@ * @return */ int getResendInterval(); int getReConnectInterval(); //注册消息 byte[] getRegisterMessage(); } im_lib/src/main/java/com/anyun/im_lib/netty/NettyTcpClient.java
@@ -12,9 +12,13 @@ import com.anyun.im_lib.listener.IMSConnectStatusCallback; import com.anyun.im_lib.listener.OnEventListener; import java.nio.ByteBuffer; import java.util.Vector; import java.util.function.ToDoubleBiFunction; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -149,72 +153,8 @@ } } private class ResetConnectRunnable implements Runnable{ private boolean isFirst; public ResetConnectRunnable(boolean isFirst) { this.isFirst = isFirst; } @Override public void run() { if (!isFirst){ onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE); } try { //重连时,释放工作组线程池,也就是停止心跳 loopGroup.destroyWorkLoopGroup(); while (!isClosed){ if (!isNetworkAvaliable()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //网络可用才进行连接 int status; if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){ onConnectStatusCallback(status); //连接成功,调出循环 break; } if (status == IMSConfig.CONNECT_STATE_FAILURE){ onConnectStatusCallback(status); try { Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } } } finally { //标识重连任务停止 isReconnecting = false; } } } /** * 重连,首次连接也认为是第一次重连 * @return */ private int reConnect() { if (!isClosed){ try { //先释放EventLoop线程组 if (bootstrap != null){ bootstrap.group().shutdownGracefully(); } } finally { bootstrap = null; } initBootstrap(); return connectServer(); } return IMSConfig.CONNECT_STATE_FAILURE; } /*** * 初始化Bootstrap @@ -225,19 +165,29 @@ bootstrap.group(loopGroup).channel(NioSocketChannel.class); // 设置该项以后,如果在两小时内没有数据通信时,TCP会自动发送一个活动探测数据报文 bootstrap.option(ChannelOption.SO_KEEPALIVE,true); //设置禁用nagle算法 bootstrap.option(ChannelOption.TCP_NODELAY,true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getConnectTimeout()); bootstrap.handler(new TCPChannelInitializerHandler(this)); } private int connectServer(){ if (serverUrlList == null || serverUrlList.size() == 0){ return IMSConfig.CONNECT_STATE_FAILURE; } for (int i = 0; i < serverUrlList.size(); i++) { String serverUrl = serverUrlList.get(i); if (TextUtils.isEmpty(serverUrl)){ return IMSConfig.CONNECT_STATE_FAILURE; /** * 真正连接服务器的地方 */ private void toServer() { try { channel = bootstrap.connect(currentHost,currentPort).sync().channel(); } catch (Exception e) { Log.i(TAG, String.format("连接Server(ip[%s],port[%d]失败)",currentHost,currentPort)); try { Thread.sleep(500); } catch (InterruptedException ex) { ex.printStackTrace(); } channel = null; } } @@ -248,9 +198,46 @@ return false; } /** * 回调ims连接状态 * @param connectStateConnecting */ private void onConnectStatusCallback(int connectStateConnecting) { this.connectStatus = connectStateConnecting; switch (connectStatus){ case IMSConfig.CONNECT_STATE_CONNECTING: Log.i(TAG, "onConnectStatusCallback: ims连接中..."); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnecting(); } break; case IMSConfig.CONNECT_STATE_SUCCESSFUL: Log.i(TAG, String.format("onConnectStatusCallback: ims连接成功,host[%s],port[%s]",currentHost,currentPort)); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnected(); } // TODO: 2019/12/12 连接成功 ,发送握手消息(需要的话——) try { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byteBuf.writeBytes(mOnEventListener.getRegisterMessage()); channel.writeAndFlush(byteBuf); } catch (Exception e) { Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"); } break; case IMSConfig.CONNECT_STATE_FAILURE: default: Log.i(TAG, "onConnectStatusCallback: ims连接失败"); if (imsConnectStatusCallback != null){ imsConnectStatusCallback.onConnectFailed(); } break; } } /** * 关闭连接,同时释放资源 */ @Override public void close() { if (isClosed){ @@ -323,22 +310,51 @@ @Override public void sendMsg(String msg) { this.sendMsg(msg,true); } /** * * @param msg * @param isJoinTimeoutManager 是否加入超时管理器 */ @Override public void sendMsg(String msg, boolean isJoinTimeoutManager) { if (msg==null ){ return; } // TODO: 2019/12/12 根据MSG_ID 来加入 超时, 暂不写 if (channel == null){ Log.i(TAG, "sendMsg fail,channel为空"+msg); } try { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer(); byteBuf.writeBytes(msg.getBytes()); channel.writeAndFlush(byteBuf); } catch (Exception e) { Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"+msg); } } /** * 获取客户设置的重连时长 * @return */ @Override public int getReconnectInterval() { return 0; if (mOnEventListener != null && mOnEventListener.getReConnectInterval()>0){ return reconnectInterval = mOnEventListener.getReConnectInterval(); } return reconnectInterval; } @Override public int getConnectTimeout() { return 0; if(mOnEventListener != null && mOnEventListener.getConnectTimeout()>0){ connectTimeOut = mOnEventListener.getConnectTimeout(); } return connectTimeOut; } @Override @@ -368,11 +384,139 @@ @Override public MsgDispatcher getMsgDispatcher() { return null; return msgDispatcher; } @Override public MsgTimeOutTimerManager getMsgTimeOutTimerManager() { return msgTimeOutTimerManager; } @Override public byte[] getRegisterMessage() { if (mOnEventListener != null){ return mOnEventListener.getRegisterMessage(); } return null; } /** * 重连任务 */ private class ResetConnectRunnable implements Runnable{ private boolean isFirst; public ResetConnectRunnable(boolean isFirst) { this.isFirst = isFirst; } @Override public void run() { Log.i(TAG, "执行重连任务"); if (!isFirst){ onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE); } try { //重连时,释放工作组线程池,也就是停止心跳 loopGroup.destroyWorkLoopGroup(); while (!isClosed){ if (!isNetworkAvaliable()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //网络可用才进行连接 int status; if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){ onConnectStatusCallback(status); //连接成功,调出循环 break; } if (status == IMSConfig.CONNECT_STATE_FAILURE){ onConnectStatusCallback(status); try { Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } } } finally { //标识重连任务停止 isReconnecting = false; } } /** * 重连,首次连接也认为是第一次重连 * @return */ private int reConnect() { if (!isClosed){ try { //先释放EventLoop线程组 if (bootstrap != null){ bootstrap.group().shutdownGracefully(); } } finally { bootstrap = null; } initBootstrap(); return connectServer(); } return IMSConfig.CONNECT_STATE_FAILURE; } private int connectServer(){ //服务器地址无效,直接回调连接状态,不再进行连接 if (serverUrlList == null || serverUrlList.size() == 0){ return IMSConfig.CONNECT_STATE_FAILURE; } for (int i = 0; i < serverUrlList.size(); i++) { String serverUrl = serverUrlList.get(i); //服务器地址无效,直接回调连接状态,不再进行连接 if (TextUtils.isEmpty(serverUrl)){ return IMSConfig.CONNECT_STATE_FAILURE; } String[] address = serverUrl.split(" "); for (int j = 1; j <= IMSConfig.DEFAULT_RECONNECT_COUNT; j++) { //如果ims已经关闭,或网络不可用,直接回调连接状态,不再进行连接 if (isClosed || !isNetworkAvaliable()){ return IMSConfig.CONNECT_STATE_FAILURE; } //回调连接状态 if(connectStatus != IMSConfig.CONNECT_STATE_CONNECTING){ onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING); } Log.i(TAG, String.format("正在进行connectServer【%s】的第[%d]连接,当前重连延时时长为[%dms]: ",serverUrl,j,j*getReconnectInterval())); try { currentPort = Integer.parseInt(address[1]); currentHost = address[0]; //连接服务器 toServer(); //channel不为空,则认为连接已经成功 if(channel != null){ return IMSConfig.CONNECT_STATE_SUCCESSFUL; }else{ Thread.sleep(j*getReconnectInterval()); } } catch (InterruptedException e) { e.printStackTrace(); close(); break; } } } return IMSConfig.CONNECT_STATE_FAILURE; } } } im_lib/src/main/java/com/anyun/im_lib/netty/TCPChannelInitializerHandler.java
@@ -1,12 +1,18 @@ package com.anyun.im_lib.netty; import android.util.Log; import com.anyun.im_lib.HeartbeatRespHandler; import com.anyun.im_lib.LoginAuthRespHandler; import com.anyun.im_lib.interf.IMSClientInteface; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; /** * MyApplication2 @@ -17,6 +23,8 @@ */ public class TCPChannelInitializerHandler extends ChannelInitializer<Channel> { private static final String TAG = TCPChannelInitializerHandler.class.getSimpleName(); private IMSClientInteface imsClient; public TCPChannelInitializerHandler(NettyTcpClient nettyTcpClient) { @@ -26,11 +34,40 @@ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); Log.i(TAG, "initChannel: "); //netty提供的自定义长度解码器,解决TP拆包/粘包问题 // TODO: 2019/12/4 // (1) maxFrameLength - 发送的数据包最大长度; // //(2) lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标; // //(3) lengthFieldLength - 长度域的自己的字节数长度。 // //(4) lengthAdjustment – 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。 // //(5) initialBytesToStrip – 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有4个节点的长度域,则它的值为4。 // // 在上面的例子中,自定义长度解码器的构造参数值如下: // // LengthFieldBasedFrameDecoder spliter=new LengthFieldBasedFrameDecoder(1024,0,4,0,4); // 第一个参数为1024,表示数据包的最大长度为1024; // 第二个参数0,表示长度域的偏移量为0,也就是长度域放在了最前面,处于包的起始位置; // 第三个参数为4,表示长度域占用4个字节; // 第四个参数为0,表示长度域保存的值,仅仅为有效数据长度,不包含其他域(如长度域)的长度; // 第五个参数为4,表示最终的取到的目标数据包,抛弃最前面的4个字节数据,长度域的值被抛弃。 // // 为了更加清楚的说明一下上面的规则,调整一下例子中的代码。在写入通道前,在数据 // pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,3 ,2,1,0)); // pipeline.addLast(new FixedLengthFrameDecoder(10)); // 测试用 固定长度消息 // pipeline.addLast(new LineBasedFrameDecoder(1024)); byte[] bytes = new byte[]{0x7e}; ByteBuf byteBuf = Unpooled.copiedBuffer(bytes); pipeline.addLast(new DelimiterBasedFrameDecoder(1024,byteBuf)); //握手认证消息相应处理handler pipeline.addLast(LoginAuthRespHandler.class.getSimpleName(), new LoginAuthRespHandler(imsClient)); im_lib/src/main/java/com/anyun/im_lib/netty/TCPReadHandler.java
@@ -3,7 +3,10 @@ import android.util.Log; import com.anyun.im_lib.interf.IMSClientInteface; import com.anyun.im_lib.util.ByteUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -45,6 +48,7 @@ Channel channel = ctx.channel(); if (channel != null){ channel.close(); ctx.close(); } //触发重连 imsClient.resetConnect(false); @@ -52,14 +56,21 @@ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); // TODO: 2019/12/4 //服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); Log.i(TAG, "channelRead hex str: "+ ByteUtil.byte2HexStr(req)); String str = new String(req, "UTF-8"); imsClient.getMsgDispatcher().receivedMsg( str ); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); Log.i(TAG, "channelReadComplete"); } } im_lib/src/main/java/com/anyun/im_lib/util/ByteUtil.java
New file @@ -0,0 +1,209 @@ package com.anyun.im_lib.util; import android.text.TextUtils; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; /** * MyApplication2 * Created by lzw on 2019/12/13. 12:01:18 * 邮箱:632393724@qq.com * All Rights Saved! Chongqing AnYun Tech co. LTD */ public class ByteUtil { /** * @功能: BCD码转为10进制串(阿拉伯数据) * @参数: BCD码 * @结果: 10进制串 */ public static String bcd2Str(byte[] bytes) { StringBuffer temp = new StringBuffer(bytes.length * 2); for (int i = 0; i < bytes.length; i++) { temp.append((byte) ((bytes[i] & 0xf0) >>> 4)); temp.append((byte) (bytes[i] & 0x0f)); } return temp.toString().substring(0, 1).equalsIgnoreCase("0") ? temp .toString().substring(1) : temp.toString(); } /** * @功能: 10进制串转为BCD码 * @参数: 10进制串 * @结果: BCD码 */ public static byte[] str2Bcd(String asc) { int len = asc.length(); int mod = len % 2; if (mod != 0) { asc = "0" + asc; len = asc.length(); } byte abt[] = new byte[len]; if (len >= 2) { len = len / 2; } byte bbt[] = new byte[len]; abt = asc.getBytes(); int j, k; for (int p = 0; p < asc.length() / 2; p++) { if ((abt[2 * p] >= '0') && (abt[2 * p] <= '9')) { j = abt[2 * p] - '0'; } else if ((abt[2 * p] >= 'a') && (abt[2 * p] <= 'z')) { j = abt[2 * p] - 'a' + 0x0a; } else { j = abt[2 * p] - 'A' + 0x0a; } if ((abt[2 * p + 1] >= '0') && (abt[2 * p + 1] <= '9')) { k = abt[2 * p + 1] - '0'; } else if ((abt[2 * p + 1] >= 'a') && (abt[2 * p + 1] <= 'z')) { k = abt[2 * p + 1] - 'a' + 0x0a; } else { k = abt[2 * p + 1] - 'A' + 0x0a; } int a = (j << 4) + k; byte b = (byte) a; bbt[p] = b; } return bbt; } public static String getString(byte[] bytes, String charsetName) { return new String(bytes, Charset.forName(charsetName)); } public static String getString(byte[] bytes) { return getString(bytes, "GBK"); } /** * bytes转换成十六进制字符串 * @param byte[] b byte数组 * @return String 每个Byte值之间空格分隔 */ public static String byte2HexStr(byte[] b) { String stmp=""; StringBuilder sb = new StringBuilder(""); for (int n=0;n<b.length;n++) { stmp = Integer.toHexString(b[n] & 0xFF); sb.append((stmp.length()==1)? "0"+stmp : stmp); sb.append(" "); } return sb.toString().toUpperCase().trim(); } /** * bytes字符串转换为Byte值 * @param String src Byte字符串,每个Byte之间没有分隔符 * @return byte[] */ public static byte[] hexStr2Bytes(String src) { int m=0,n=0; int l=src.length()/2; System.out.println(l); byte[] ret = new byte[l]; for (int i = 0; i < l; i++) { m=i*2+1; n=m+1; ret[i] = Byte.decode("0x" + src.substring(i*2, m) + src.substring(m,n)); } return ret; } /** * 16进制的字符串表示转成字节数组 * * @param hexString 16进制格式的字符串 * @return 转换后的字节数组 **/ public static byte[] toByteArray(String hexString) { if (TextUtils.isEmpty(hexString)) throw new IllegalArgumentException("this hexString must not be empty"); hexString = hexString.toLowerCase(); final byte[] byteArray = new byte[hexString.length() / 2]; int k = 0; for (int i = 0; i < byteArray.length; i++) {//因为是16进制,最多只会占用4位,转换成字节需要两个16进制的字符,高位在先 byte high = (byte) (Character.digit(hexString.charAt(k), 16) & 0xff); byte low = (byte) (Character.digit(hexString.charAt(k + 1), 16) & 0xff); byteArray[i] = (byte) (high << 4 | low); k += 2; } return byteArray; } public static int getInt(byte i) { int low = i&0x0f; int hight= (i>>4)&0x0f; return low+hight*16; } public static int getBCDInt(byte i) { int low = i&0x0f; int hight= (i>>4)&0x0f; return low+hight*10; } /** * ByteBuffer 转换 String * @param buffer * @return */ public static String getString(ByteBuffer buffer) { Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; try { charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空 // charBuffer = decoder.decode(buffer.asReadOnlyBuffer()); return charBuffer.toString(); } catch (Exception ex) { ex.printStackTrace(); return ""; } } /** * short转为字节数组 * @param data * @return 包含2个字节的字节数组 */ public static byte[] shortGetBytes(short data) { byte[] bytes = new byte[2]; bytes[0] = (byte) (data & 0xff); bytes[1] = (byte) ((data & 0xff00) >> 8); return bytes; } public static byte[] shortGetByte(short data){ byte[] bytes = new byte[1]; bytes[0] = (byte)(data & 0xff); return bytes; } public static void main(String[] args){ System.out.println(byte2HexStr(shortGetBytes((short) 65535))); short b = (short) 32768; b++; System.out.println(byte2HexStr(shortGetBytes((short) b))); } } settings.gradle
@@ -1 +1 @@ include ':app', ':lib' include ':app', ':lib', ':im_lib'