package com.anyun.h264; import timber.log.Timber; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; /** * H264文件传输器 * 从H264文件读取数据,按照JT/T 1076-2016协议通过TCP/UDP传输 * * 使用示例: *
 * // 创建传输器
 * H264FileTransmitter transmitter = new H264FileTransmitter();
 * 
 * // 设置服务器地址和协议类型
 * transmitter.setServerAddress("192.168.1.100", 8888);
 * transmitter.setProtocolType(JT1076ProtocolHelper.PROTOCOL_TYPE_TCP); // 或 PROTOCOL_TYPE_UDP
 * 
 * // 设置协议参数
 * transmitter.setProtocolParams("013120122580", (byte)1);
 * 
 * // 设置帧率(用于计算时间戳间隔)
 * transmitter.setFrameRate(25);
 * 
 * // 初始化Socket
 * if (transmitter.initialize()) {
 *     // 开始传输文件
 *     transmitter.transmitFile("/path/to/video.h264");
 * }
 * 
 * // 停止传输
 * transmitter.stop();
 * 
*/ public class H264FileTransmitter { private static final String TAG = "H264FileTransmitter"; // H264 NAL起始码 private static final byte[] START_CODE_3 = {0x00, 0x00, 0x01}; private static final byte[] START_CODE_4 = {0x00, 0x00, 0x00, 0x01}; // JT/T 1076-2016 协议工具类 private JT1076ProtocolHelper protocolHelper; // 传输控制 private AtomicBoolean isRunning = new AtomicBoolean(false); private Thread transmitThread; // 参数配置 private int frameRate = 25; // 帧率,用于计算时间戳间隔 private long frameInterval = 1000 / 25; // 帧间隔(毫秒) // SPS/PPS缓存 private byte[] spsBuffer; private byte[] ppsBuffer; // 时间戳管理 private long lastIFrameTime = 0; // 上一个I帧时间 private long lastFrameTime = 0; // 上一帧时间 private long baseTimestamp = 0; // 基准时间戳 /** * 传输进度回调接口 */ public interface OnTransmitProgressCallback { /** * 传输进度回调 * @param currentFrame 当前帧序号(从1开始) * @param totalFrames 总帧数(如果未知则为-1) */ void onProgress(int currentFrame, int totalFrames); /** * 传输完成回调 */ void onComplete(); /** * 传输错误回调 * @param error 错误信息 */ void onError(String error); } private OnTransmitProgressCallback progressCallback; public H264FileTransmitter() { this.protocolHelper = new JT1076ProtocolHelper(); // 默认使用TCP协议 protocolHelper.setProtocolType(JT1076ProtocolHelper.PROTOCOL_TYPE_TCP); } /** * 设置服务器地址 */ public void setServerAddress(String ip, int port) { protocolHelper.setServerAddress(ip, port); } /** * 设置传输协议类型(UDP或TCP) * @param protocolType PROTOCOL_TYPE_UDP 或 PROTOCOL_TYPE_TCP */ public void setProtocolType(int protocolType) { protocolHelper.setProtocolType(JT1076ProtocolHelper.PROTOCOL_TYPE_TCP); } /** * 设置SIM卡号和逻辑通道号 */ public void setProtocolParams(String simCardNumber, byte logicalChannelNumber) { protocolHelper.setProtocolParams(simCardNumber, logicalChannelNumber); } /** * 设置帧率(用于计算时间戳间隔) * @param frameRate 帧率(fps) */ public void setFrameRate(int frameRate) { this.frameRate = frameRate > 0 ? frameRate : 25; this.frameInterval = 1000 / this.frameRate; Timber.d("Frame rate set to: %d fps, interval: %d ms", this.frameRate, this.frameInterval); } /** * 设置传输进度回调 */ public void setOnTransmitProgressCallback(OnTransmitProgressCallback callback) { this.progressCallback = callback; } /** * 初始化Socket连接 * @return 是否成功 */ public boolean initialize() { if (isRunning.get()) { Timber.w("Transmitter is already running"); return false; } if (!protocolHelper.initializeSocket()) { Timber.e("Failed to initialize socket"); return false; } // 重置序号 protocolHelper.resetSequenceNumber(); // 初始化时间戳 baseTimestamp = System.currentTimeMillis(); lastIFrameTime = 0; lastFrameTime = 0; spsBuffer = null; ppsBuffer = null; Timber.d("Socket initialized successfully"); return true; } /** * 开始传输H264文件 * @param filePath H264文件路径 */ public void transmitFile(String filePath) { if (isRunning.get()) { Timber.w("Transmitter is already running"); return; } File file = new File(filePath); if (!file.exists() || !file.isFile()) { Timber.e("File does not exist: %s", filePath); if (progressCallback != null) { progressCallback.onError("File does not exist: " + filePath); } return; } isRunning.set(true); // 启动传输线程 transmitThread = new Thread(new Runnable() { @Override public void run() { transmitFileInternal(file); } }); transmitThread.start(); Timber.d("Started transmitting file: %s", filePath); } /** * 传输文件的内部实现 */ private void transmitFileInternal(File file) { FileInputStream fis = null; int frameCount = 0; try { fis = new FileInputStream(file); // 读取整个文件到内存 // 注意:对于大文件,可以改为流式读取,但为简化实现,这里使用一次性读取 long fileSize = file.length(); if (fileSize > Integer.MAX_VALUE) { throw new IOException("File too large: " + fileSize + " bytes"); } byte[] fileData = new byte[(int) fileSize]; int bytesRead = 0; int totalRead = 0; while (totalRead < fileData.length && (bytesRead = fis.read(fileData, totalRead, fileData.length - totalRead)) > 0) { totalRead += bytesRead; } if (totalRead != fileData.length) { Timber.w("File read incomplete, expected: %d, actual: %d", fileData.length, totalRead); } Timber.d("File read complete, size: %d bytes", fileData.length); // 按帧解析并传输(一个帧包含从一个起始码到下一个起始码之间的所有数据,包括起始码) int offset = 0; while (offset < fileData.length && isRunning.get()) { // 查找下一个帧的起始位置 int nextFrameStart = findNextFrameStart(fileData, offset); if (nextFrameStart < 0) { // 没有找到下一个起始码,当前offset到文件末尾是一个完整的帧 if (offset < fileData.length) { byte[] frameData = Arrays.copyOfRange(fileData, offset, fileData.length); transmitFrame(frameData, frameCount); frameCount++; // 通知进度 if (progressCallback != null) { progressCallback.onProgress(frameCount, -1); } } break; } // 提取当前帧数据(包含起始码) if (nextFrameStart > offset) { byte[] frameData = Arrays.copyOfRange(fileData, offset, nextFrameStart); transmitFrame(frameData, frameCount); frameCount++; // 通知进度 if (progressCallback != null) { progressCallback.onProgress(frameCount, -1); } } // 移动到下一个帧的起始位置 offset = nextFrameStart; } Timber.d("Transmission complete, total frames: %d", frameCount); if (progressCallback != null) { progressCallback.onComplete(); } } catch (IOException e) { Timber.e(e, "Error transmitting file"); if (progressCallback != null) { progressCallback.onError("IO Error: " + e.getMessage()); } } catch (Exception e) { Timber.e(e, "Unexpected error during transmission"); if (progressCallback != null) { progressCallback.onError("Error: " + e.getMessage()); } } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { Timber.e(e, "Error closing file"); } } isRunning.set(false); } } /** * 查找下一个帧的起始位置(下一个起始码的位置) * @param data 文件数据 * @param currentFrameStart 当前帧的起始位置(起始码位置) * @return 下一个帧的起始码位置,如果未找到返回-1 */ private int findNextFrameStart(byte[] data, int currentFrameStart) { if (currentFrameStart >= data.length) { return -1; } // 跳过当前帧的起始码 int offset = currentFrameStart; if (isStartCodeAt(data, offset)) { offset += getStartCodeLength(data, offset); } // 查找下一个起始码 for (int i = offset; i < data.length - 3; i++) { if (isStartCodeAt(data, i)) { return i; } } return -1; } /** * 检查指定位置是否为起始码 */ private boolean isStartCodeAt(byte[] data, int offset) { if (offset + 3 > data.length) { return false; } // 检查4字节起始码 if (offset + 4 <= data.length) { if (data[offset] == 0x00 && data[offset + 1] == 0x00 && data[offset + 2] == 0x00 && data[offset + 3] == 0x01) { return true; } } // 检查3字节起始码(确保前面不是0x00) if (offset == 0 || data[offset - 1] != 0x00) { if (data[offset] == 0x00 && data[offset + 1] == 0x00 && data[offset + 2] == 0x01) { return true; } } return false; } /** * 获取起始码长度 */ private int getStartCodeLength(byte[] data, int offset) { if (offset + 4 <= data.length && data[offset] == 0x00 && data[offset + 1] == 0x00 && data[offset + 2] == 0x00 && data[offset + 3] == 0x01) { return 4; } return 3; } /** * 判断帧数据是否为I帧(IDR) * @param frameData 帧数据(包含起始码) * @return 是否为I帧 */ private boolean isIFrame(byte[] frameData) { if (frameData == null || frameData.length < 5) { return false; } // 查找帧中所有的NAL单元,检查是否包含IDR帧 int offset = 0; boolean hasSpsPps = false; while (offset < frameData.length - 3) { // 检查当前位置是否为起始码 if (isStartCodeAt(frameData, offset)) { // 跳过起始码 int startCodeLen = getStartCodeLength(frameData, offset); int nalStart = offset + startCodeLen; if (nalStart < frameData.length) { // 获取NAL类型(第一个字节的低5位) int nalType = frameData[nalStart] & 0x1F; // NAL类型5 = IDR (Instantaneous Decoder Refresh) 关键帧 if (nalType == 5) { return true; // 找到IDR帧,确定是I帧 } // NAL类型7 = SPS, 类型8 = PPS if (nalType == 7 || nalType == 8) { hasSpsPps = true; } } // 移动到下一个可能的位置继续查找 offset = nalStart + 1; } else { offset++; } } // 如果没有找到IDR,但包含SPS/PPS,也认为是关键帧相关的数据 // (有些编码器可能将SPS/PPS单独作为一个"帧"发送) return hasSpsPps; } /** * 传输一个完整的帧数据(类似H264Encoder的方式) * @param frameData 帧数据(包含起始码,与H264Encoder输出的格式一致) * @param frameIndex 帧序号 */ private void transmitFrame(byte[] frameData, int frameIndex) { if (frameData == null || frameData.length == 0) { return; } try { // 判断帧类型 boolean isKeyFrame = isIFrame(frameData); // 计算时间戳 long timestamp = baseTimestamp + (frameIndex * frameInterval); // 计算时间间隔 long lastIFrameInterval = (lastIFrameTime > 0) ? (timestamp - lastIFrameTime) : 0; long lastFrameInterval = (lastFrameTime > 0) ? (timestamp - lastFrameTime) : frameInterval; if (isKeyFrame) { lastIFrameTime = timestamp; } lastFrameTime = timestamp; // 判断帧类型(用于协议) int dataType = isKeyFrame ? JT1076ProtocolHelper.DATA_TYPE_I_FRAME : JT1076ProtocolHelper.DATA_TYPE_P_FRAME; processNalUnits(frameData, timestamp, lastIFrameInterval, lastFrameInterval); // 控制发送速率(模拟帧率) if (frameInterval > 0) { try { Thread.sleep(frameInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Timber.d("Transmission interrupted"); return; } } } catch (Exception e) { Timber.e(e, "Error transmitting frame"); } } /** * 停止传输 */ public void stop() { if (!isRunning.get()) { return; } isRunning.set(false); // 等待传输线程结束 if (transmitThread != null) { try { transmitThread.join(2000); } catch (InterruptedException e) { Timber.e(e, "Wait transmit thread error"); } } // 关闭Socket if (protocolHelper != null) { protocolHelper.closeSocket(); } spsBuffer = null; ppsBuffer = null; Timber.d("H264 file transmitter stopped"); } /** * 解析帧中的NAL单元并根据类型处理 */ private void processNalUnits(byte[] frameData, long timestamp, long lastIFrameInterval, long lastFrameInterval) { if (frameData == null || frameData.length == 0) { return; } boolean nalProcessed = false; int offset = 0; while (offset < frameData.length) { int startCodePos = findStartCode(frameData, offset); if (startCodePos < 0) { break; } int startCodeLen = getStartCodeLength(frameData, startCodePos); int nalDataStart = startCodePos + startCodeLen; if (nalDataStart >= frameData.length) { break; } int nextStart = findStartCode(frameData, nalDataStart); int nalEnd = (nextStart == -1) ? frameData.length : nextStart; int nalLength = nalEnd - startCodePos; if (nalLength <= startCodeLen) { break; } byte[] nalUnit = Arrays.copyOfRange(frameData, startCodePos, nalEnd); int nalType = nalUnit[startCodeLen] & 0x1F; handleNalUnit(nalUnit, nalType, timestamp, lastIFrameInterval, lastFrameInterval); nalProcessed = true; offset = nalEnd; } if (!nalProcessed) { // 没有解析出NAL单元时,直接按P帧发送 sendFramePayload(frameData, timestamp, false, lastIFrameInterval, lastFrameInterval); } } private void handleNalUnit(byte[] nalUnit, int nalType, long timestamp, long lastIFrameInterval, long lastFrameInterval) { switch (nalType) { case 7: // SPS spsBuffer = nalUnit.clone(); Timber.d("Cached SPS, size: %d", spsBuffer.length); break; case 8: // PPS ppsBuffer = nalUnit.clone(); Timber.d("Cached PPS, size: %d", ppsBuffer.length); break; case 5: // IDR if (spsBuffer != null && ppsBuffer != null) { byte[] combined = combineFrameData(spsBuffer, ppsBuffer, nalUnit); sendFramePayload(combined, timestamp, true, lastIFrameInterval, lastFrameInterval); } else { Timber.w("IDR frame without SPS/PPS cache, sending raw IDR"); sendFramePayload(nalUnit, timestamp, true, lastIFrameInterval, lastFrameInterval); } break; case 1: // P frame sendFramePayload(nalUnit, timestamp, false, lastIFrameInterval, lastFrameInterval); break; default: Timber.d("Forwarding NAL type %d, size: %d", nalType, nalUnit.length); sendFramePayload(nalUnit, timestamp, false, lastIFrameInterval, lastFrameInterval); break; } } private void sendFramePayload(byte[] payload, long timestamp, boolean isKeyFrame, long lastIFrameInterval, long lastFrameInterval) { if (payload == null || payload.length == 0) { return; } int dataType = isKeyFrame ? JT1076ProtocolHelper.DATA_TYPE_I_FRAME : JT1076ProtocolHelper.DATA_TYPE_P_FRAME; int offset = 0; int totalPackets = (int) Math.ceil((double) payload.length / JT1076ProtocolHelper.MAX_PACKET_SIZE); for (int i = 0; i < totalPackets; i++) { int packetDataSize = Math.min(JT1076ProtocolHelper.MAX_PACKET_SIZE, payload.length - offset); byte[] packetData = Arrays.copyOfRange(payload, offset, offset + packetDataSize); int packetMark; if (totalPackets == 1) { packetMark = JT1076ProtocolHelper.PACKET_MARK_ATOMIC; } else if (i == 0) { packetMark = JT1076ProtocolHelper.PACKET_MARK_FIRST; } else if (i == totalPackets - 1) { packetMark = JT1076ProtocolHelper.PACKET_MARK_LAST; } else { packetMark = JT1076ProtocolHelper.PACKET_MARK_MIDDLE; } byte[] rtpPacket = protocolHelper.createVideoRtpPacket( packetData, timestamp, dataType, packetMark, lastIFrameInterval, lastFrameInterval); protocolHelper.sendPacket(rtpPacket); offset += packetDataSize; } } private int findStartCode(byte[] data, int startPos) { if (data == null || startPos >= data.length - 3) { return -1; } for (int i = startPos; i < data.length - 3; i++) { if (isStartCodeAt(data, i)) { return i; } } // 处理剩余不足4字节但可能是3字节起始码的情况 for (int i = Math.max(startPos, data.length - 3); i < data.length - 2; i++) { if (isStartCodeAt(data, i)) { return i; } } return -1; } private byte[] combineFrameData(byte[] sps, byte[] pps, byte[] idr) { int totalLength = (sps != null ? sps.length : 0) + (pps != null ? pps.length : 0) + (idr != null ? idr.length : 0); byte[] combined = new byte[totalLength]; int offset = 0; if (sps != null) { System.arraycopy(sps, 0, combined, offset, sps.length); offset += sps.length; } if (pps != null) { System.arraycopy(pps, 0, combined, offset, pps.length); offset += pps.length; } if (idr != null) { System.arraycopy(idr, 0, combined, offset, idr.length); } Timber.d("Combined SPS/PPS/IDR payload, total: %d", totalLength); return combined; } /** * 释放资源 */ public void release() { stop(); } }