Files
CCSDS_study/test/run_toy_protocol.md
2026-05-05 21:54:35 +08:00

284 lines
10 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# run_toy_protocol.py 功能讲解
`run_toy_protocol.py` 是一个基于 Netzob 的玩具协议推断示例脚本。它参考 `discover_features.rst` 教程,把 PCAP 中捕获到的消息导入后,逐步完成字段切分、按命令聚类、字段对齐、长度关系推断,以及协议状态机生成。
脚本建模的协议格式如下:
```text
<CMD/RES name> '#' <4-byte little-endian length> [<data>]
```
也就是说,每条消息由三类内容组成:
- `CMD/RES name`:命令或响应名称,例如 `CMDidentify``RESencrypt`
- `#`:固定分隔符,用来区分命令名和后续载荷部分。
- `4-byte little-endian length`4 字节小端长度字段。
- `[data]`:可选数据字段,长度通常由前面的长度字段描述。
## 依赖与输入数据
脚本开头通过 `sys.path.insert` 把本地 Netzob 测试源码目录加入 Python 模块搜索路径:
```python
sys.path.insert(0, '/home/zjz/CCSDS_study/netzob-030/test/src')
```
随后用:
```python
from netzob.all import *
```
导入 Netzob 的主要 API。该脚本依赖本地目录 `/home/zjz/CCSDS_study/netzob-030/`,而不是项目 `pyproject.toml` 中声明的普通包依赖。
PCAP 文件目录固定为:
```python
PCAP_DIR = "/home/zjz/CCSDS_study/netzob-030/test/resources/pcaps"
```
脚本读取三个会话文件:
- `target_src_v1_session1.pcap`
- `target_src_v1_session2.pcap`
- `target_src_v1_session3.pcap`
其中 `session1``session2` 被合并为训练消息集合:
```python
messages = messages_session1 + messages_session2
```
`session3` 没有参与字段推断和聚类,主要用于最后生成 PTA 自动机时展示另一条会话路径。
## Step 1从 PCAP 导入消息
脚本使用 Netzob 的 `PCAPImporter.readFile` 读取 PCAP
```python
messages_session1 = list(PCAPImporter.readFile(...).values())
```
`PCAPImporter.readFile` 返回的是一个类似字典的对象,脚本只取其中的 `.values()`,并转成列表。三个列表分别代表三个会话中的原始消息。
这一阶段会打印:
- 每个 session 中的消息数量。
- `session1 + session2` 中所有消息的内容。
这一步的作用是把网络抓包中的实际报文转成 Netzob 可以分析的消息对象。
## Step 2按 `#` 分隔符切分字段
脚本先把所有训练消息放入一个通用 `Symbol`
```python
symbol = Symbol(messages=messages)
```
在 Netzob 中,`Symbol` 可以理解为一组具有相似格式的消息抽象。初始时,所有消息都被放在同一个符号中,字段结构还没有细分。
随后调用:
```python
Format.splitDelimiter(symbol, String("#"))
```
这会根据字符 `#` 把消息切成多个字段。对于本协议,`#` 前面是命令或响应名称,`#` 后面是长度字段和可选数据。
切分后,典型字段大致可以理解为:
```text
field[0] = CMD/RES name
field[1] = '#'
field[2] = length + data
```
脚本会打印切分后的 `symbol`,用于观察 Netzob 推断出的字段结构。
## Step 3按命令名字段聚类
切分出第一个字段后,脚本使用它作为 key field
```python
symbols = Format.clusterByKeyField(symbol, symbol.fields[0])
```
这一步会根据 `field[0]` 的取值把消息拆成多个更具体的 `Symbol`。例如:
- `CMDidentify` 消息会进入一个单独符号。
- `RESidentify` 消息会进入另一个符号。
- `CMDencrypt``RESencrypt``CMDdecrypt` 等也会分别形成自己的符号。
这样做的原因是,不同命令或响应虽然共享大体格式,但其载荷语义可能不同。按命令名聚类后,后续字段对齐和长度关系推断会更准确。
这一阶段会打印:
- 聚类后符号数量。
- 每个符号的名称。
## Step 4对载荷字段做序列对齐
脚本遍历每个聚类后的符号:
```python
for name, sym in symbols.items():
if len(sym.fields) >= 3:
Format.splitAligned(sym.fields[2], doInternalSlick=True)
```
这里重点处理 `sym.fields[2]`,也就是 `#` 后面的部分。根据协议格式,这个字段内部通常包含:
```text
4 字节长度字段 + 可选数据字段
```
`Format.splitAligned` 会对同一符号下的多条消息做序列对齐,尝试把稳定部分、变化部分、长度字段、数据字段进一步拆开。
例如对于不同长度的 `CMDencrypt` 消息:
```text
CMDencrypt#\x06\x00\x00\x00abcdef
CMDencrypt#\x0a\x00\x00\x00123456test
```
序列对齐有机会把 `\x06\x00\x00\x00` / `\x0a\x00\x00\x00` 识别为一个长度相关字段,把后面的 `abcdef` / `123456test` 识别为数据字段。
`doInternalSlick=True` 表示启用内部清理或简化逻辑,让字段切分结果更规整。
这一阶段会打印每个符号对齐后的结构。
## Step 5查找并应用长度关系
脚本使用:
```python
rels = RelationFinder.findOnSymbol(sym)
```
对每个符号查找字段间关系。这里最重要的是长度关系,也就是某个字段的数值描述另一个字段的长度。
当找到关系时,脚本会打印类似信息:
```text
<relation_type>: '<x_attribute>' <-> '<y_attribute>'
```
随后脚本应用找到的第一个关系:
```python
rels[0]["x_fields"][0].domain = Size(rels[0]["y_fields"], factor=1/8.0)
```
这行代码的含义是:把关系中的 `x_fields[0]` 设置为一个 `Size` 域,使它表示 `y_fields` 的大小。
`factor=1/8.0` 的作用是把位数换算成字节数。Netzob 内部很多大小计算以 bit 为单位,而协议里的长度字段表示的是 byte 数,因此需要除以 8。
应用关系后,脚本专门打印 `CMDencrypt` 的调试结构:
```python
print(symbols["CMDencrypt"]._str_debug())
```
这是为了展示长度关系被写回符号模型之后,字段域发生了什么变化。
## Step 6生成协议自动机
完成消息格式推断后,脚本把所有符号转成列表:
```python
sym_list = list(symbols.values())
```
然后基于会话序列生成三类自动机。
### Step 6a链式状态自动机
```python
session1 = Session(messages_session1)
abstract1 = session1.abstract(sym_list)
automata_chained = Automata.generateChainedStatesAutomata(abstract1, sym_list)
```
`Session(messages_session1)` 表示一条实际通信会话。`session1.abstract(sym_list)` 会把原始消息序列映射为符号序列,例如把某条具体消息抽象成 `CMDidentify``RESidentify`
`generateChainedStatesAutomata` 会按 `session1` 的顺序生成链式状态机。它更接近“这次会话实际发生了什么”,每一步消息转换通常对应一段状态迁移。
脚本用:
```python
automata_chained.generateDotCode()
```
输出 Graphviz DOT 格式文本。
### Step 6b单状态自动机
```python
automata_one = Automata.generateOneStateAutomata(abstract1, sym_list)
```
单状态自动机把消息符号都挂在一个状态周围,结构更粗略。它强调“这些消息都可能出现”,但弱化了严格顺序关系。
这种形式适合做简单协议建模或快速查看符号集合,但表达会话流程的能力不如链式状态机。
### Step 6cPTA 自动机
```python
session3 = Session(messages_session3)
abstract3 = session3.abstract(sym_list)
automata_pta = Automata.generatePTAAutomata([abstract1, abstract3], sym_list)
```
PTA 是 Prefix Tree Acceptor即前缀树接收器。这里它基于 `session1``session3` 两条抽象会话生成。
与只使用一条 session 的链式自动机相比PTA 可以表达多条会话之间的共同前缀和分支路径。脚本中特意用 `session3` 参与 PTA 生成,是为了展示不同会话路径如何合并进同一个状态模型。
最后同样输出 DOT 代码,可用 Graphviz 转成图片:
```bash
python test/run_toy_protocol.py | dot -Tsvg -o out.svg
```
不过脚本的标准输出不只包含 DOT也包含大量说明文字和中间结构。因此如果要直接交给 `dot`,通常需要先单独提取某一段 DOT 输出,或者修改脚本只输出目标自动机的 DOT 内容。
## 整体执行流程
完整流程可以概括为:
1. 从三个 PCAP 文件导入原始消息。
2.`session1 + session2` 作为训练数据。
3. 把所有训练消息先放进一个通用 `Symbol`
4.`#` 分隔符切出命令名和载荷部分。
5. 根据命令名字段把消息聚类成多个协议符号。
6. 对每类消息的载荷字段做序列对齐,进一步拆分长度和数据。
7. 自动查找长度字段与数据字段之间的关系。
8. 把找到的长度关系写回符号模型。
9. 把 session 中的具体消息抽象为符号序列。
10. 生成链式、单状态和 PTA 三种协议自动机。
## 脚本输出内容
运行脚本时,输出主要分为几类:
- PCAP 导入结果:每个 session 的消息数量和训练消息列表。
- 字段切分结果:`splitDelimiter('#')` 后的 `Symbol`
- 聚类结果:按命令名拆出来的符号列表。
- 序列对齐结果:每个符号内部的字段结构。
- 关系推断结果:字段间的长度关系。
- `CMDencrypt` 调试结构:展示应用 `Size` 关系后的模型。
- 三段 DOT 代码:分别对应链式状态自动机、单状态自动机和 PTA 自动机。
## 注意事项
- 路径是硬编码的,脚本只能在当前仓库布局下直接运行。
- 依赖的 `netzob` 来自本地 `netzob-030/test/src`,不是标准安装路径。
- `messages` 只包含 `session1``session2`,因此字段模型主要由这两个会话推断出来。
- `session3` 只在 PTA 自动机阶段使用,用于引入另一条会话路径。
- `rels[0]` 只应用每个符号找到的第一个关系,如果一个符号存在多个候选关系,脚本不会逐一验证。
- DOT 输出和普通日志混在一起,直接管道到 `dot` 可能不能得到有效图像,需要提取 DOT 段或调整脚本输出。
## 这个文件的定位
这个脚本不是面向生产环境的协议解析器,而是一个学习和演示脚本。它展示了 Netzob 如何从示例流量中推断协议结构,并进一步生成协议状态机。对理解协议逆向、字段抽象、长度关系建模和会话自动机生成很有帮助。