Files
CCSDS_study/test/Tianwen-1-parse-netzob.py
2026-05-05 21:54:35 +08:00

496 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Teach unknown-frame analysis with Netzob on Tianwen-1 raw frame data.
这个脚本的目标不是“直接使用已知的 Tianwen-1 / CCSDS 解析器”,而是假设我们
只拿到一段连续的二进制帧数据,不知道具体空间帧协议,然后用 Netzob 的核心
概念做一次可运行的协议探索教学。
重点演示的 Netzob 概念:
1. RawMessage把每一帧原始字节包装成 Netzob 消息。
2. Symbol把一组相似消息放进同一个协议符号。
3. Format.splitStatic根据样本中固定/变化的字节位置自动切字段。
4. Format.clusterByKeyField选择某个字段作为 key把消息按字段值聚类。
5. Field / Raw在已有观察基础上手工建立一个“候选帧格式”模型。
注意:
- 本脚本不会 import Tianwen.ccsds也不会调用 AOSFrame.parse。
- 为了教学和运行速度,默认只抽样前 96 帧做 Netzob 推断。
- 原始数据较大,完整协议逆向通常需要多轮实验;这里侧重方法和工具用法。
"""
from __future__ import annotations
import argparse
import math
import sys
from collections import Counter
from pathlib import Path
# 让 print 尽量按行输出,便于长流程运行时看到进度。
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(line_buffering=True)
# ---------------------------------------------------------------------------
# 路径准备
# ---------------------------------------------------------------------------
# 当前脚本位于 /home/zjz/CCSDS_study/test/。
# parent.parent 回到项目根目录 /home/zjz/CCSDS_study。
PROJECT_ROOT = Path(__file__).resolve().parent.parent
# 本仓库里有一份本地 Netzob 源码/测试目录。优先使用它,保证和仓库现有示例一致。
LOCAL_NETZOB_SRC = PROJECT_ROOT / "netzob-030" / "test" / "src"
if LOCAL_NETZOB_SRC.exists():
sys.path.insert(0, str(LOCAL_NETZOB_SRC))
# 导入 Netzob 公共 API。
# noqa 注释是告诉代码检查器:星号导入是教学脚本为了贴近 Netzob 教程而保留。
from netzob.all import * # noqa: F401,F403,E402
# 原始天问一号帧字节文件。这里把它当成未知二进制样本,不使用已知解析器。
DEFAULT_INPUT = PROJECT_ROOT / "Tianwen" / "tianwen1_frames_20200730.u8"
def section(title: str) -> None:
"""打印一个清晰的教学分节标题。"""
print("\n" + "=" * 78)
print(title)
print("=" * 78)
def short_hex(data: bytes, max_bytes: int = 48) -> str:
"""把 bytes 转成短十六进制字符串,避免一帧 220 字节全部刷屏。"""
head = data[:max_bytes].hex(" ")
return head + (" ..." if len(data) > max_bytes else "")
def entropy(values) -> float:
"""计算一组离散值的 Shannon entropy。
entropy 越低,说明这个字节位置越稳定,越像版本号、固定标识或填充。
entropy 越高,说明这个字节位置变化越丰富,越像计数器、时间戳或载荷。
"""
counts = Counter(values)
total = len(values)
return -sum((count / total) * math.log2(count / total) for count in counts.values())
def load_raw_bytes(path: Path) -> bytes:
"""读取原始二进制文件。"""
if not path.exists():
raise FileNotFoundError(f"input file not found: {path}")
return path.read_bytes()
def estimate_frame_size(
raw: bytes,
candidate_min: int,
candidate_max: int,
sample_frames: int,
header_columns: int,
):
"""在不知道帧长时,用“候选帧长打分”的方式找可能帧长。
思路很简单:
- 如果帧长猜对了,那么每一行的开头会对齐到真实帧头。
- 真实帧头通常包含版本号、ID、计数器等结构化字段。
- 这些字段的熵一般比随机载荷低。
- 所以对每个候选帧长,把数据切成多行,计算前若干列的平均熵。
- 平均熵越低,越可能是正确帧长。
这不是严格证明,只是未知协议分析中常用的启发式方法。
"""
results = []
for frame_size in range(candidate_min, candidate_max + 1):
frame_count = min(len(raw) // frame_size, sample_frames)
if frame_count < 8:
continue
frames = [
raw[i * frame_size : (i + 1) * frame_size] for i in range(frame_count)
]
columns = min(header_columns, frame_size)
entropies = []
unique_counts = []
for offset in range(columns):
values = [frame[offset] for frame in frames]
entropies.append(entropy(values))
unique_counts.append(len(set(values)))
results.append(
{
"frame_size": frame_size,
"avg_entropy": sum(entropies) / len(entropies),
"avg_unique": sum(unique_counts) / len(unique_counts),
"frame_count": frame_count,
}
)
return sorted(results, key=lambda item: (item["avg_entropy"], item["avg_unique"]))
def slice_frames(raw: bytes, frame_size: int, limit: int | None = None) -> list[bytes]:
"""把连续字节流切成固定长度帧。"""
total_frames = len(raw) // frame_size
if limit is not None:
total_frames = min(total_frames, limit)
return [raw[i * frame_size : (i + 1) * frame_size] for i in range(total_frames)]
def build_symbol(frames: list[bytes], name: str) -> Symbol:
"""把 bytes 帧列表包装成 Netzob RawMessage再放入 Symbol。"""
messages = [RawMessage(data=frame) for frame in frames]
symbol = Symbol(messages=messages, name=name)
# HexaString 让 Netzob 打印 Symbol 时用十六进制展示,更适合二进制协议。
symbol.encodingFunctions.add(TypeEncodingFunction(HexaString))
return symbol
def byte_statistics(frames: list[bytes]) -> list[dict]:
"""按字节偏移统计唯一值数量、熵和最常见取值。"""
frame_size = len(frames[0])
stats = []
for offset in range(frame_size):
values = [frame[offset] for frame in frames]
counts = Counter(values)
stats.append(
{
"offset": offset,
"unique": len(counts),
"entropy": entropy(values),
"top": counts.most_common(4),
}
)
return stats
def print_byte_stats(stats: list[dict], first_columns: int = 32) -> None:
"""打印前若干字节位置的统计表。"""
print("offset unique entropy most common byte values")
print("------ ------ ------- -----------------------")
for item in stats[:first_columns]:
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
print(
f"{item['offset']:>6} {item['unique']:>6} "
f"{item['entropy']:>7.3f} {top}"
)
def print_static_dynamic_regions(stats: list[dict]) -> None:
"""根据 unique 数量粗略标出固定区、低变化区和高变化区。"""
labels = []
for item in stats:
if item["unique"] == 1:
labels.append("static")
elif item["unique"] <= 8:
labels.append("low-var")
else:
labels.append("dynamic")
regions = []
start = 0
current = labels[0]
for index, label in enumerate(labels[1:], start=1):
if label != current:
regions.append((start, index - 1, current))
start = index
current = label
regions.append((start, len(labels) - 1, current))
print("candidate byte regions from simple statistics:")
for start, end, label in regions[:40]:
width = end - start + 1
print(f" bytes {start:03d}-{end:03d} width={width:03d} {label}")
if len(regions) > 40:
print(f" ... {len(regions) - 40} more regions omitted")
def demonstrate_split_static(frames: list[bytes]) -> Symbol:
"""用 Netzob splitStatic 展示自动字段切分。"""
symbol = build_symbol(frames, "unknown_tianwen_frames")
print("Before splitStatic, Netzob sees one raw field:")
print(f" number of fields: {len(symbol.fields)}")
# splitStatic 会比较同一 Symbol 下所有消息的每个位置:
# - 所有样本都相同的位置会变成 static field。
# - 样本之间变化的位置会变成 dynamic field。
Format.splitStatic(
symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=True,
mergeAdjacentDynamicFields=True,
)
print("\nAfter splitStatic(unitSize=8, merge adjacent static/dynamic fields):")
print(f" number of inferred fields: {len(symbol.fields)}")
print(
" teaching note: if most byte positions vary at least once, adjacent "
"dynamic bytes can merge into one large dynamic field."
)
bytewise_symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
Format.splitStatic(
bytewise_symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=False,
mergeAdjacentDynamicFields=False,
)
print("\nAfter splitStatic(unitSize=8, do not merge adjacent fields):")
print(f" number of inferred byte-level fields: {len(bytewise_symbol.fields)}")
print(" first inferred field labels:")
for index, field in enumerate(bytewise_symbol.fields[:24]):
print(f" field[{index:02d}] {field}")
if len(bytewise_symbol.fields) > 24:
print(f" ... {len(bytewise_symbol.fields) - 24} more fields")
return bytewise_symbol
def build_bytewise_symbol(frames: list[bytes]) -> Symbol:
"""把每个字节都切成独立 Field方便选择某个 offset 做聚类 key。"""
symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
Format.splitStatic(
symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=False,
mergeAdjacentDynamicFields=False,
)
return symbol
def demonstrate_cluster_by_key_field(
frames: list[bytes], stats: list[dict], cluster_sample_size: int
) -> None:
"""演示如何用某个候选字段作为 key 进行 Netzob 聚类。"""
# 在未知协议中,低变化字段常常适合作为聚类 key比如版本、航天器 ID、
# 虚拟信道 ID、消息类型等。这里先用统计找出一些候选 offset。
candidates = [
item
for item in stats[:32]
if 1 < item["unique"] <= 12
]
candidates = sorted(candidates, key=lambda item: (item["unique"], item["entropy"]))
if not candidates:
print("No low-variation key candidates found in the first 32 bytes.")
return
print("candidate key byte offsets from the first 32 bytes:")
for item in candidates[:8]:
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
print(
f" offset {item['offset']:02d}: unique={item['unique']}, "
f"entropy={item['entropy']:.3f}, top=[{top}]"
)
# 选择最靠前且变化种类较少的字段作为演示 key。
key_offset = candidates[0]["offset"]
cluster_frames = frames[:cluster_sample_size]
print(
f" using {len(cluster_frames)} frames for this cluster demo "
f"(kept small because Netzob clustering can be expensive)"
)
bytewise_symbol = build_bytewise_symbol(cluster_frames)
print(f"\nNetzob clusterByKeyField demo on byte offset {key_offset}:")
print(f" bytewise fields available: {len(bytewise_symbol.fields)}")
# clusterByKeyField 会把拥有相同 key 字段值的消息放进同一个 Symbol。
clusters = Format.clusterByKeyField(bytewise_symbol, bytewise_symbol.fields[key_offset])
print(f" clusters created: {len(clusters)}")
for key, cluster_symbol in list(clusters.items())[:12]:
key_hex = key.hex() if isinstance(key, (bytes, bytearray)) else str(key)
print(
f" key=0x{key_hex:<4} messages={len(cluster_symbol.messages):>4} "
f"fields={len(cluster_symbol.fields)}"
)
def demonstrate_manual_candidate_model(frame_size: int) -> None:
"""用 Netzob Field/Raw 手工搭建一个候选格式模型。
这一步不是声称字段含义已经确定,而是演示逆向分析常见工作流:
先用统计和 splitStatic 找到疑似字段边界,再用 Netzob 明确描述一个候选模型。
"""
section("Step 7 - Manual candidate model with Netzob Field/Raw")
# 这里故意使用“candidate_”前缀表示这些字段只是初步假设。
# 对未知空间帧,通常可以先把开头若干字节当成候选头部,
# 中间大段当成候选数据区,末尾若干字节当成候选尾部/校验/填充。
candidate_header = Field(Raw(nbBytes=6), name="candidate_header_0_5")
candidate_insert_or_secondary = Field(
Raw(nbBytes=8), name="candidate_insert_or_secondary_6_13"
)
candidate_payload = Field(
Raw(nbBytes=max(frame_size - 18, 0)), name="candidate_payload"
)
candidate_tail = Field(Raw(nbBytes=4), name="candidate_tail_4_bytes")
symbol = Symbol(
name="manual_candidate_tianwen_like_frame",
fields=[
candidate_header,
candidate_insert_or_secondary,
candidate_payload,
candidate_tail,
],
)
print("This is a teaching model, not a confirmed Tianwen-1 specification:")
print(symbol.str_structure())
def parse_args() -> argparse.Namespace:
"""命令行参数。"""
parser = argparse.ArgumentParser(
description=(
"Use Netzob to teach unknown binary frame analysis on Tianwen-1 raw data."
)
)
parser.add_argument(
"--input",
type=Path,
default=DEFAULT_INPUT,
help=f"raw binary input file, default: {DEFAULT_INPUT}",
)
parser.add_argument(
"--frame-size",
type=int,
default=None,
help=(
"known or chosen frame size. If omitted, the script estimates it "
"from candidate sizes."
),
)
parser.add_argument(
"--candidate-min",
type=int,
default=180,
help="minimum frame-size candidate used when estimating frame length",
)
parser.add_argument(
"--candidate-max",
type=int,
default=260,
help="maximum frame-size candidate used when estimating frame length",
)
parser.add_argument(
"--sample-size",
type=int,
default=96,
help="number of frames used for Netzob analysis",
)
parser.add_argument(
"--show-samples",
type=int,
default=4,
help="number of raw sample frames to print as short hex",
)
parser.add_argument(
"--cluster-sample-size",
type=int,
default=8,
help="number of frames used only for the Netzob clusterByKeyField demo",
)
return parser.parse_args()
def main() -> None:
"""Run the teaching analysis."""
args = parse_args()
section("Step 1 - Read unknown binary data")
raw = load_raw_bytes(args.input)
print(f"input file: {args.input}")
print(f"total bytes: {len(raw):,}")
section("Step 2 - Estimate or choose a fixed frame size")
if args.frame_size is None:
ranked = estimate_frame_size(
raw,
candidate_min=args.candidate_min,
candidate_max=args.candidate_max,
sample_frames=args.sample_size,
header_columns=16,
)
print("Top frame-size candidates by low average header entropy:")
for item in ranked[:10]:
print(
f" size={item['frame_size']:>3} "
f"avg_entropy={item['avg_entropy']:.3f} "
f"avg_unique={item['avg_unique']:.2f} "
f"frames_tested={item['frame_count']}"
)
frame_size = ranked[0]["frame_size"]
print(f"\nChosen frame size for the rest of this teaching run: {frame_size}")
else:
frame_size = args.frame_size
print(f"Using user-provided frame size: {frame_size}")
all_frame_count = len(raw) // frame_size
frames = slice_frames(raw, frame_size, limit=args.sample_size)
print(f"complete frames in file with this size: {all_frame_count:,}")
print(f"frames sampled for Netzob: {len(frames):,}")
print("\nFirst sample frames as short hex:")
for index, frame in enumerate(frames[: args.show_samples]):
print(f" frame[{index:03d}] {short_hex(frame)}")
section("Step 3 - Wrap samples as Netzob RawMessage and Symbol")
teaching_symbol = build_symbol(frames, "unknown_tianwen_frames")
print("Netzob objects created:")
print(f" RawMessage count: {len(teaching_symbol.messages)}")
print(f" Symbol name: {teaching_symbol.name}")
print(f" Initial field count: {len(teaching_symbol.fields)}")
print(
"Teaching point: at the beginning Netzob only knows each frame is raw bytes; "
"it does not know the protocol fields."
)
section("Step 4 - Byte-position statistics before protocol knowledge")
stats = byte_statistics(frames)
print_byte_stats(stats, first_columns=40)
print()
print_static_dynamic_regions(stats)
section("Step 5 - Netzob Format.splitStatic field inference")
demonstrate_split_static(frames)
section("Step 6 - Netzob clusterByKeyField on candidate key bytes")
demonstrate_cluster_by_key_field(frames, stats, args.cluster_sample_size)
demonstrate_manual_candidate_model(frame_size)
section("Done - What to try next")
print(
"1. Increase --sample-size to see whether inferred fields remain stable.\n"
"2. Try --frame-size with another candidate and compare splitStatic results.\n"
"3. Choose another low-variation offset as cluster key and inspect clusters.\n"
"4. After a candidate field map is stable, then compare it with known CCSDS/Tianwen parsing.\n"
"5. Treat this as protocol-discovery scaffolding, not as a final specification."
)
if __name__ == "__main__":
main()