package com.anyun.im_lib.netty;
|
|
import android.text.TextUtils;
|
import android.util.Log;
|
|
import com.anyun.im_lib.ExecutorServiceFactory;
|
import com.anyun.im_lib.HeartbeatHandler;
|
import com.anyun.im_lib.IMSConfig;
|
import com.anyun.im_lib.MsgDispatcher;
|
import com.anyun.im_lib.MsgTimeOutTimerManager;
|
import com.anyun.im_lib.interf.IMSClientInteface;
|
import com.anyun.im_lib.listener.IMSConnectStatusCallback;
|
import com.anyun.im_lib.listener.OnEventListener;
|
|
import java.nio.ByteBuffer;
|
import java.util.Vector;
|
import java.util.function.ToDoubleBiFunction;
|
|
import io.netty.bootstrap.Bootstrap;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelOption;
|
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.handler.timeout.IdleStateHandler;
|
|
|
/**
|
* MyApplication2
|
* Created by lzw on 2019/12/2. 13:14:52
|
* 邮箱:632393724@qq.com
|
* All Rights Saved! Chongqing AnYun Tech co. LTD
|
*/
|
public class NettyTcpClient implements IMSClientInteface {
|
|
private static final String TAG = NettyTcpClient.class.getSimpleName();
|
|
private static volatile NettyTcpClient instance;
|
|
private Bootstrap bootstrap;
|
private Channel channel;
|
|
/****标识IMS是否已关闭***/
|
private boolean isClosed = false;
|
|
/****ims服务器地址组***/
|
private Vector<String> serverUrlList;
|
|
/****与应用层交互的listener***/
|
private OnEventListener mOnEventListener;
|
/*** ims连接状态回调监听器**/
|
private IMSConnectStatusCallback imsConnectStatusCallback;
|
|
/****消息转发器***/
|
private MsgDispatcher msgDispatcher;
|
|
/***线程池工厂**/
|
private ExecutorServiceFactory loopGroup;
|
|
/****是否正在进行重连***/
|
private boolean isReconnecting = false;
|
/*** ims连接状态,初始化为连接失败 ***/
|
private int connectStatus = IMSConfig.CONNECT_STATE_FAILURE;
|
|
/*** 重连间隔时长 ***/
|
private int reconnectInterval = IMSConfig.DEFAULT_RECONNECT_BASE_DELAY_TIME;
|
/*** 连接超时时长 ***/
|
private int connectTimeOut = IMSConfig.DEFAULT_CONNECT_TIMEOUT;
|
|
private int heartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_FOREGROUND;
|
private int foregroundHeartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_FOREGROUND;
|
private int backgroundHeartBeatInterval = IMSConfig.DEFAULT_HEARTBEAT_INTERVAL_BACKGROUND;
|
|
private int appStatus = IMSConfig.APP_STATUS_FOREGROUND;
|
/*** 消息发送超时重发次数***/
|
private int resendCount = IMSConfig.DEFAULT_RESEND_COUNT;
|
/*** 消息发送失败重发间隔时长***/
|
private int resendInterval = IMSConfig.DEFAULT_RESEND_INTERVAL;
|
|
/*** 当前连接host***/
|
private String currentHost = null;
|
/*** 当前连接port***/
|
private int currentPort = -1;
|
/*** 消息发送超时定时管理器***/
|
private MsgTimeOutTimerManager msgTimeOutTimerManager;
|
|
|
private NettyTcpClient(){
|
|
}
|
|
public static IMSClientInteface getInstance() {
|
if (null == instance){
|
synchronized (NettyTcpClient.class){
|
if (null==instance){
|
instance = new NettyTcpClient();
|
}
|
}
|
}
|
return instance;
|
}
|
|
/**
|
* 初始化
|
* @param serverUrlList 服务器地址列表
|
* @param listener 与应用层交互的listener
|
* @param callback ims连接状态回调
|
*/
|
@Override
|
public void init(Vector<String> serverUrlList, OnEventListener listener, IMSConnectStatusCallback callback) {
|
close();
|
isClosed = false;
|
this.serverUrlList = serverUrlList;
|
this.mOnEventListener = listener;
|
this.imsConnectStatusCallback = callback;
|
msgDispatcher = new MsgDispatcher();
|
msgDispatcher.setOnEventListener(listener);
|
loopGroup = new ExecutorServiceFactory();
|
loopGroup.initBossLoopGroup();
|
msgTimeOutTimerManager = new MsgTimeOutTimerManager(this);
|
/*** 进行第一次连接***/
|
resetConnect(true);
|
}
|
|
@Override
|
public void resetConnect() {
|
this.resetConnect(false);
|
}
|
|
@Override
|
public void resetConnect(boolean isFirst) {
|
if (!isFirst){
|
try {
|
Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
}
|
//只有第一个调用者才能赋值并调用重连
|
if (!isClosed && !isReconnecting){
|
synchronized (this){
|
if (!isClosed && !isReconnecting){
|
//标识重连任务进行中...
|
isReconnecting = true;
|
|
onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING);
|
closeChannel();
|
loopGroup.execBossTask(new ResetConnectRunnable(isFirst));
|
}
|
}
|
}
|
}
|
|
|
|
|
/***
|
* 初始化Bootstrap
|
*/
|
private void initBootstrap() {
|
EventLoopGroup loopGroup = new NioEventLoopGroup(4);
|
bootstrap = new Bootstrap();
|
bootstrap.group(loopGroup).channel(NioSocketChannel.class);
|
// 设置该项以后,如果在两小时内没有数据通信时,TCP会自动发送一个活动探测数据报文
|
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
|
//设置禁用nagle算法
|
bootstrap.option(ChannelOption.TCP_NODELAY,true);
|
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,getConnectTimeout());
|
bootstrap.handler(new TCPChannelInitializerHandler(this));
|
}
|
|
|
/**
|
* 真正连接服务器的地方
|
*/
|
private void toServer() {
|
try {
|
channel = bootstrap.connect(currentHost,currentPort).sync().channel();
|
} catch (Exception e) {
|
Log.i(TAG, String.format("连接Server(ip[%s],port[%d]失败)",currentHost,currentPort));
|
try {
|
Thread.sleep(500);
|
} catch (InterruptedException ex) {
|
ex.printStackTrace();
|
}
|
|
|
channel = null;
|
}
|
}
|
|
private boolean isNetworkAvaliable() {
|
if (mOnEventListener != null){
|
return mOnEventListener.isNetWorkAvailable();
|
}
|
return false;
|
}
|
|
/**
|
* 回调ims连接状态
|
* @param connectStateConnecting
|
*/
|
private void onConnectStatusCallback(int connectStateConnecting) {
|
this.connectStatus = connectStateConnecting;
|
switch (connectStatus){
|
case IMSConfig.CONNECT_STATE_CONNECTING:
|
Log.i(TAG, "onConnectStatusCallback: ims连接中...");
|
if (imsConnectStatusCallback != null){
|
imsConnectStatusCallback.onConnecting();
|
}
|
break;
|
case IMSConfig.CONNECT_STATE_SUCCESSFUL:
|
Log.i(TAG, String.format("onConnectStatusCallback: ims连接成功,host[%s],port[%s]",currentHost,currentPort));
|
if (imsConnectStatusCallback != null){
|
imsConnectStatusCallback.onConnected();
|
}
|
// TODO: 2019/12/12 连接成功 ,发送握手消息(需要的话——)
|
try {
|
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
|
byteBuf.writeBytes(mOnEventListener.getRegisterMessage());
|
channel.writeAndFlush(byteBuf);
|
} catch (Exception e) {
|
Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t");
|
}
|
break;
|
case IMSConfig.CONNECT_STATE_FAILURE:
|
default:
|
Log.i(TAG, "onConnectStatusCallback: ims连接失败");
|
if (imsConnectStatusCallback != null){
|
imsConnectStatusCallback.onConnectFailed();
|
}
|
break;
|
}
|
}
|
|
/**
|
* 关闭连接,同时释放资源
|
*/
|
@Override
|
public void close() {
|
if (isClosed){
|
return;
|
}
|
isClosed = true;
|
/*** 关闭channel***/
|
try {
|
closeChannel();
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
/*** 关闭bootstrap ***/
|
try {
|
if (bootstrap != null){
|
bootstrap.group().shutdownGracefully();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
/*** 释放线程池 ***/
|
try {
|
if (loopGroup != null){
|
loopGroup.destroy();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
} finally {
|
try {
|
if (serverUrlList != null){
|
serverUrlList.clear();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
isReconnecting = false;
|
channel = null;
|
bootstrap = null;
|
}
|
}
|
|
private void closeChannel() {
|
if (channel != null){
|
removeHandler(HeartbeatHandler.class.getSimpleName());
|
removeHandler(TCPReadHandler.class.getSimpleName());
|
removeHandler(IdleStateHandler.class.getSimpleName());
|
}
|
}
|
|
/**
|
* 移除指定handler
|
* @param handlerName
|
*/
|
private void removeHandler(String handlerName) {
|
try {
|
if (channel.pipeline().get(handlerName) != null){
|
channel.pipeline().remove(handlerName);
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
Log.i(TAG, "removeHandler fail,handlerName="+handlerName);
|
}
|
}
|
|
@Override
|
public boolean isClosed() {
|
return false;
|
}
|
|
@Override
|
public void sendMsg(String msg) {
|
this.sendMsg(msg,true);
|
}
|
|
/**
|
*
|
* @param msg
|
* @param isJoinTimeoutManager 是否加入超时管理器
|
*/
|
@Override
|
public void sendMsg(String msg, boolean isJoinTimeoutManager) {
|
if (msg==null ){
|
return;
|
}
|
// TODO: 2019/12/12 根据MSG_ID 来加入 超时, 暂不写
|
|
if (channel == null){
|
Log.i(TAG, "sendMsg fail,channel为空"+msg);
|
}
|
try {
|
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
|
byteBuf.writeBytes(msg.getBytes());
|
channel.writeAndFlush(byteBuf);
|
} catch (Exception e) {
|
Log.i(TAG, "发送消息失败,reason="+e.getMessage()+"\t"+msg);
|
}
|
}
|
|
/**
|
* 获取客户设置的重连时长
|
* @return
|
*/
|
@Override
|
public int getReconnectInterval() {
|
if (mOnEventListener != null && mOnEventListener.getReConnectInterval()>0){
|
return reconnectInterval = mOnEventListener.getReConnectInterval();
|
}
|
return reconnectInterval;
|
}
|
|
@Override
|
public int getConnectTimeout() {
|
if(mOnEventListener != null && mOnEventListener.getConnectTimeout()>0){
|
connectTimeOut = mOnEventListener.getConnectTimeout();
|
}
|
return connectTimeOut;
|
}
|
|
@Override
|
public int getForegroundHeartbeatInterval() {
|
return 0;
|
}
|
|
@Override
|
public void setAppStatus(int appStatus) {
|
|
}
|
|
@Override
|
public int getBackgroundHeartbeatInterval() {
|
return 0;
|
}
|
|
@Override
|
public int getResendCount() {
|
return 0;
|
}
|
|
@Override
|
public int getResendInterval() {
|
return 0;
|
}
|
|
@Override
|
public MsgDispatcher getMsgDispatcher() {
|
return msgDispatcher;
|
}
|
|
@Override
|
public MsgTimeOutTimerManager getMsgTimeOutTimerManager() {
|
return msgTimeOutTimerManager;
|
}
|
|
@Override
|
public byte[] getRegisterMessage() {
|
if (mOnEventListener != null){
|
return mOnEventListener.getRegisterMessage();
|
}
|
return null;
|
}
|
|
|
/**
|
* 重连任务
|
*/
|
private class ResetConnectRunnable implements Runnable{
|
private boolean isFirst;
|
public ResetConnectRunnable(boolean isFirst) {
|
this.isFirst = isFirst;
|
}
|
|
@Override
|
public void run() {
|
Log.i(TAG, "执行重连任务");
|
if (!isFirst){
|
onConnectStatusCallback(IMSConfig.CONNECT_STATE_FAILURE);
|
}
|
try {
|
//重连时,释放工作组线程池,也就是停止心跳
|
loopGroup.destroyWorkLoopGroup();
|
while (!isClosed){
|
if (!isNetworkAvaliable()){
|
try {
|
Thread.sleep(2000);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
continue;
|
}
|
//网络可用才进行连接
|
int status;
|
if ((status=reConnect()) == IMSConfig.CONNECT_STATE_SUCCESSFUL){
|
onConnectStatusCallback(status);
|
//连接成功,调出循环
|
break;
|
}
|
|
if (status == IMSConfig.CONNECT_STATE_FAILURE){
|
onConnectStatusCallback(status);
|
try {
|
Thread.sleep(IMSConfig.DEFAULT_RECONNECT_INTERVAL);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
} finally {
|
//标识重连任务停止
|
isReconnecting = false;
|
}
|
}
|
|
/**
|
* 重连,首次连接也认为是第一次重连
|
* @return
|
*/
|
private int reConnect() {
|
if (!isClosed){
|
try {
|
//先释放EventLoop线程组
|
if (bootstrap != null){
|
bootstrap.group().shutdownGracefully();
|
}
|
} finally {
|
bootstrap = null;
|
}
|
initBootstrap();
|
return connectServer();
|
}
|
return IMSConfig.CONNECT_STATE_FAILURE;
|
}
|
|
private int connectServer(){
|
//服务器地址无效,直接回调连接状态,不再进行连接
|
if (serverUrlList == null || serverUrlList.size() == 0){
|
return IMSConfig.CONNECT_STATE_FAILURE;
|
}
|
for (int i = 0; i < serverUrlList.size(); i++) {
|
String serverUrl = serverUrlList.get(i);
|
//服务器地址无效,直接回调连接状态,不再进行连接
|
if (TextUtils.isEmpty(serverUrl)){
|
return IMSConfig.CONNECT_STATE_FAILURE;
|
}
|
String[] address = serverUrl.split(" ");
|
for (int j = 1; j <= IMSConfig.DEFAULT_RECONNECT_COUNT; j++) {
|
//如果ims已经关闭,或网络不可用,直接回调连接状态,不再进行连接
|
if (isClosed || !isNetworkAvaliable()){
|
return IMSConfig.CONNECT_STATE_FAILURE;
|
}
|
//回调连接状态
|
if(connectStatus != IMSConfig.CONNECT_STATE_CONNECTING){
|
onConnectStatusCallback(IMSConfig.CONNECT_STATE_CONNECTING);
|
}
|
Log.i(TAG, String.format("正在进行connectServer【%s】的第[%d]连接,当前重连延时时长为[%dms]: ",serverUrl,j,j*getReconnectInterval()));
|
try {
|
currentPort = Integer.parseInt(address[1]);
|
currentHost = address[0];
|
//连接服务器
|
toServer();
|
//channel不为空,则认为连接已经成功
|
if(channel != null){
|
return IMSConfig.CONNECT_STATE_SUCCESSFUL;
|
}else{
|
Thread.sleep(j*getReconnectInterval());
|
}
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
close();
|
break;
|
}
|
}
|
}
|
return IMSConfig.CONNECT_STATE_FAILURE;
|
}
|
|
}
|
|
|
|
|
}
|