TCP 消息封帧:从字节流中重组长度前缀消息
TCP 是一个流协议——它没有消息边界的概念。一次 send() 调用可能以三次 recv() 到达,三次 send() 调用可能合并到一个缓冲区。如果你在 TCP 上构建交易协议,你需要消息封帧(message framing):一种让接收方知道一条消息在哪里结束、下一条从哪里开始的方法。
本文介绍最简单的封帧方案——长度前缀(length-prefixing)——以及一个处理所有边界情况的完整 C++ 实现。
问题
TCP 的抽象是一个双向字节流:
发送方: send(msg1) send(msg2) send(msg3)
│ │ │
└─────────────┼─────────────┘
▼
TCP 字节流
│
┌────────────────┼────────────────┐
▼ ▼ ▼
接收方: recv() → 4字节 recv() → 11字节 recv() → -1 (EOF)
接收方不知道消息边界在哪里。两种常见的封帧策略:
| 策略 | 描述 | 场景 |
|---|---|---|
| 分隔符 | 消息以 \n 或特殊字节序列结尾 | 文本协议、HTTP |
| 长度前缀 | 每条消息以 N 字节编码其总长度开头 | 二进制协议、FIX、交易所行情 |
对交易系统,长度前缀是标准做法。它是确定性的、无需转义、接收方可以精确知道要等多少字节。
消息格式
我们的格式是最简单的:2 字节小端头编码消息总长度(含自身),后跟载荷。
| 字节 | 描述 |
|---|---|
| 0–1 | 消息长度(uint16_t,小端序)——包含这 2 字节的消息总大小 |
| 2…N | 载荷 —— (length - 2) 字节 |
示例:[0x05, 0x00, 'A', 'B', 'C'] → length = 0x0005 = 5 → 总共 5 字节 → 载荷 = “ABC”。
长度包含头字节——这很重要,因为最小有效消息是 2 字节(仅长度字段,空载荷)。通常长度为 0 或 1 应视为无效。
接口
给定两个抽象:
struct IDataProvider {
virtual int GetData(std::byte* data, int maxLength) { return 0; }
virtual ~IDataProvider() = default;
};
struct ITcpSocket {
virtual void OnMessage(std::byte* bytes, int length) { };
virtual ~ITcpSocket() = default;
};
GetData 模拟 recv()——最多用 maxLength 字节填充 data,返回实际读取的字节数,-1 表示 EOF。OnMessage 是每个完整帧的回调。
任务:实现 TcpSocket::Process(),循环直到 EOF,对每条完整消息调用 OnMessage,处理所有边界情况。
实现
状态机
一个运行状态就够了:totalSize。为 0 时等待头部。非零时知道当前消息需要多少字节,正在累积。
class TcpSocket : public ITcpSocket {
public:
TcpSocket(IDataProvider* provider) : provider_{provider} {}
void Process() {
const auto AllocationSize = 65655;
auto bytes = std::make_unique<std::byte[]>(AllocationSize);
int totalReceived = 0;
uint16_t totalSize = 0;
while (true) {
// 确定要请求多少字节
int remaining = totalReceived < 2
? AllocationSize - totalReceived // 还不知道消息长度
: totalSize - totalReceived; // 精确知道还要等多少
int received = provider_->GetData(
bytes.get() + totalReceived,
remaining
);
if (received == -1)
break;
totalReceived += received;
// 至少有 2 字节时解析头部
if (totalReceived > 1 && totalSize == 0) {
totalSize = static_cast<uint16_t>(bytes[0])
| (static_cast<uint16_t>(bytes[1]) << 8);
}
// 处理完整消息
while (totalSize > 0 && totalReceived >= totalSize) {
OnMessage(bytes.get(), totalSize);
// 将剩余数据平移到前端
if (totalReceived > totalSize) {
std::memmove(
bytes.get(),
bytes.get() + totalSize,
totalReceived - totalSize
);
}
totalReceived -= totalSize;
totalSize = 0;
// 如果有足够的尾部数据,重新解析头部
if (totalReceived > 1) {
totalSize = static_cast<uint16_t>(bytes[0])
| (static_cast<uint16_t>(bytes[1]) << 8);
}
}
}
}
private:
IDataProvider* provider_;
};
关键操作拆解
1. 读取正确的数量
int remaining = totalReceived < 2
? AllocationSize - totalReceived // 还不知道消息长度
: totalSize - totalReceived; // 知道精确字节数
解析头部前,尽量多地请求。解析后,只请求剩余的精确字节。这防止过度读取——如果下一条消息在同一 GetData 调用中开始,本来也需要 AllocationSize 才能装下。
2. 解析小端 uint16_t
totalSize = static_cast<uint16_t>(bytes[0])
| (static_cast<uint16_t>(bytes[1]) << 8);
字节 0 是 LSB,字节 1 是 MSB。按位 OR 组合。这是与主机字节序无关的——在小端(x86)和大端机器上都正确,因为我们从已知字节位置显式重建。
同样有效的替代方案:
uint16_t totalSize;
std::memcpy(&totalSize, bytes, sizeof(totalSize));
在小端主机上是零操作;大端主机则需要 __builtin_bswap16()。手动移位更显式,速度一样(编译器在 x86 上优化为单条 mov)。
3. 用 memmove 平移剩余数据
if (totalReceived > totalSize) {
std::memmove(
bytes.get(),
bytes.get() + totalSize,
totalReceived - totalSize
);
}
这是初学者常犯错误的地方。处理完一条消息后,如果缓冲区还有额外字节,这些属于下一条消息。必须将它们平移到前端,不能丢弃。
为什么是 memmove 而不是 memcpy? 源和目标区域重叠——bytes.get() + totalSize 在 bytes.get() 前面。memcpy 在重叠内存上有未定义行为。memmove 正确处理重叠。
4. 平移后重新解析
totalReceived -= totalSize;
totalSize = 0;
if (totalReceived > 1) {
totalSize = static_cast<uint16_t>(bytes[0])
| (static_cast<uint16_t>(bytes[1]) << 8);
}
平移后,剩余字节在缓冲区前端。如果至少有 2 字节,立即解析下一条消息的头部。这处理了单次 GetData 包含多条完整消息的情况。
逐个边界情况
头部被分割
Read 1: [0x05] → totalReceived=1, totalSize=0 (需要 2 字节才够)
Read 2: [0x00, 'A','B','C'] → totalReceived=5, 解析 length=0x0005, 完整消息!
消息体被分割
Read 1: [0x05, 0x00, 'A'] → totalReceived=3, totalSize=5, 不够
Read 2: ['B', 'C'] → totalReceived=5, 完整消息!
一次读取多条消息
GetData 返回: [0x03,0x00,'X', 0x04,0x00,'Y','Z']
↑ msg1 (3B) ↑ msg2 (4B)
处理过程:
- 解析头 → length=3
- totalReceived=7 >= 3 → 投递 msg1(字节 0-2)
- 平移字节 [3,6] 到前端 → buffer = [0x04,0x00,‘Y’,‘Z’, …]
- totalReceived=4,重新解析头 → length=4
- totalReceived=4 >= 4 → 投递 msg2(字节 0-3)
- totalReceived=0,循环在 EOF 退出
无数据 / 立即 EOF
GetData 返回: -1 → 退出循环,无消息投递
追问:防止内存耗尽
常见面试追问:如果恶意客户端发送巨大的长度值怎么办?
修复:在分配或等待数据前验证长度:
constexpr uint16_t MAX_MESSAGE_SIZE = 4096;
if (totalReceived > 1 && totalSize == 0) {
totalSize = static_cast<uint16_t>(bytes[0])
| (static_cast<uint16_t>(bytes[1]) << 8);
if (totalSize == 0 || totalSize > MAX_MESSAGE_SIZE) {
// 协议违规——关闭连接
return;
}
}
没有这个检查,totalSize 为 0xFFFF(65535)会让你分配并等待 65KB 数据。在有数千连接的生产交易网关上,这成为拒绝服务攻击向量。
追问:4 字节头部
如果头是 4 字节而不是 2 字节呢?
if (totalReceived > 3 && totalSize == 0) {
totalSize = static_cast<uint32_t>(bytes[0])
| (static_cast<uint32_t>(bytes[1]) << 8)
| (static_cast<uint32_t>(bytes[2]) << 16)
| (static_cast<uint32_t>(bytes[3]) << 24);
}
逻辑完全相同——只是解析前需要累积更多字节。4 字节头支持最大约 4GB 的消息,对大多数交易协议来说太大了(FIX 消息通常 < 8KB)。
核心要点
- TCP 是流 —— 消息边界是你的责任
- 长度前缀 是二进制协议最简单的封帧策略
- 维护运行状态 —— 在读取间追踪
totalReceived和totalSize - 用
memmove而不是memcpy—— 缓冲区平移是重叠拷贝 - 平移后重新解析 —— 处理单次读取中的多条消息
- 验证长度 —— 防止恶意或损坏头导致内存耗尽
- 主机无关的字节序 —— 手动字节重建在任何地方都有效
消息封帧是那种看起来简单、直到你凌晨三点被漏掉的 memmove 咬一口才会意识到复杂的问题。正确的实现处理部分读取、多条消息和尾部数据,一个字节都不丢——现在你有一个了。