package com.anyun.h264; import com.anyun.h264.util.BytesUtils; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import timber.log.Timber; import java.util.concurrent.TimeUnit; /** * JT/T 1076-2016 TCP客户端工具类 * 使用Netty实现TCP连接和RTP包发送 */ public class JT1076TcpClient { private static final String TAG = "JT1076TcpClient"; private String serverIp; private int serverPort; private EventLoopGroup workerGroup; private Channel channel; private boolean isConnected = false; // 连接状态回调接口 public interface ConnectionListener { void onConnected(); void onDisconnected(); void onError(Throwable cause); } private ConnectionListener connectionListener; /** * 设置服务器地址 */ public void setServerAddress(String ip, int port) { this.serverIp = ip; this.serverPort = port; } /** * 设置连接状态监听器 */ public void setConnectionListener(ConnectionListener listener) { this.connectionListener = listener; } /** * 初始化TCP连接(异步) */ public void connect() { if (serverIp == null || serverIp.isEmpty()) { Timber.e( "Server IP not set"); if (connectionListener != null) { connectionListener.onError(new IllegalArgumentException("Server IP not set")); } return; } if (workerGroup != null) { Timber.w("TCP client already initialized, disconnecting first"); disconnect(); } // 创建EventLoopGroup(使用单线程组即可) workerGroup = new NioEventLoopGroup(1); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法,降低延迟 .option(ChannelOption.SO_KEEPALIVE, true) // 启用TCP keepalive .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // 连接超时5秒 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TcpClientHandler()); } }); Timber.d("Connecting to TCP server: " + serverIp + ":" + serverPort); // 异步连接 ChannelFuture future = bootstrap.connect(serverIp, serverPort); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { channel = future.channel(); isConnected = true; Timber.d("TCP connection established: " + serverIp + ":" + serverPort); if (connectionListener != null) { connectionListener.onConnected(); } } else { isConnected = false; Throwable cause = future.cause(); Timber.e(cause, "TCP connection failed: " + serverIp + ":" + serverPort); if (connectionListener != null) { connectionListener.onError(cause); } // 连接失败时清理资源 shutdown(); } } }); } catch (Exception e) { Timber.e(e, "Initialize TCP client failed"); isConnected = false; if (connectionListener != null) { connectionListener.onError(e); } shutdown(); } } /** * 发送RTP包(TCP方式) */ public void sendPacket(byte[] packet) { if (!isConnected || channel == null || !channel.isActive()) { Timber.w( "TCP channel not connected, packet dropped"); return; } try { // 将字节数组包装为ByteBuf ByteBuf buffer = Unpooled.wrappedBuffer(packet); String str = BytesUtils.bytesToHexString( BytesUtils.subArray(buffer.array(),0,30)); Timber.i( "Send TCP packet:"+ str); // 异步写入 ChannelFuture future = channel.writeAndFlush(buffer); // 可选:监听发送结果 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { Timber.e(future.cause(), "Send TCP packet failed"); } } }); } catch (Exception e) { Timber.e(e, "Send TCP packet error"); } } /** * 断开TCP连接 */ public void disconnect() { isConnected = false; if (channel != null) { try { ChannelFuture future = channel.close(); future.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { Timber.w(e,"Close channel interrupted"); Thread.currentThread().interrupt(); } catch (Exception e) { Timber.e(e, "Close channel error"); } channel = null; } shutdown(); if (connectionListener != null) { connectionListener.onDisconnected(); } Timber.d("TCP connection closed"); } /** * 关闭EventLoopGroup(清理资源) */ private void shutdown() { if (workerGroup != null) { try { workerGroup.shutdownGracefully().await(3, TimeUnit.SECONDS); } catch (InterruptedException e) { Timber.w(e, "Shutdown worker group interrupted"); Thread.currentThread().interrupt(); } catch (Exception e) { Timber.e(e, "Shutdown worker group error"); } workerGroup = null; } } /** * 检查是否已连接 */ public boolean isConnected() { return isConnected && channel != null && channel.isActive(); } /** * TCP客户端通道处理器 */ private class TcpClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); Timber.d("TCP channel active: " + ctx.channel().remoteAddress()); isConnected = true; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); Timber.d("TCP channel inactive: " + ctx.channel().remoteAddress()); isConnected = false; channel = null; if (connectionListener != null) { connectionListener.onDisconnected(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Timber.e(cause, "TCP channel exception"); isConnected = false; if (connectionListener != null) { connectionListener.onError(cause); } ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 如果服务器有响应,可以在这里处理 // 对于JT/T 1076-2016 RTP发送,通常不需要处理响应 ByteBuf buf = (ByteBuf) msg; Timber.d("Received data from server: " + buf.readableBytes() + " bytes"); buf.release(); // 释放ByteBuf } } }