CCSDS_study project

This commit is contained in:
2026-05-05 21:54:35 +08:00
commit 9be41f9270
585 changed files with 91275 additions and 0 deletions

View File

@@ -0,0 +1,495 @@
#!/usr/bin/env python3
"""Teach unknown-frame analysis with Netzob on Tianwen-1 raw frame data.
这个脚本的目标不是“直接使用已知的 Tianwen-1 / CCSDS 解析器”,而是假设我们
只拿到一段连续的二进制帧数据,不知道具体空间帧协议,然后用 Netzob 的核心
概念做一次可运行的协议探索教学。
重点演示的 Netzob 概念:
1. RawMessage把每一帧原始字节包装成 Netzob 消息。
2. Symbol把一组相似消息放进同一个协议符号。
3. Format.splitStatic根据样本中固定/变化的字节位置自动切字段。
4. Format.clusterByKeyField选择某个字段作为 key把消息按字段值聚类。
5. Field / Raw在已有观察基础上手工建立一个“候选帧格式”模型。
注意:
- 本脚本不会 import Tianwen.ccsds也不会调用 AOSFrame.parse。
- 为了教学和运行速度,默认只抽样前 96 帧做 Netzob 推断。
- 原始数据较大,完整协议逆向通常需要多轮实验;这里侧重方法和工具用法。
"""
from __future__ import annotations
import argparse
import math
import sys
from collections import Counter
from pathlib import Path
# 让 print 尽量按行输出,便于长流程运行时看到进度。
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(line_buffering=True)
# ---------------------------------------------------------------------------
# 路径准备
# ---------------------------------------------------------------------------
# 当前脚本位于 /home/zjz/CCSDS_study/test/。
# parent.parent 回到项目根目录 /home/zjz/CCSDS_study。
PROJECT_ROOT = Path(__file__).resolve().parent.parent
# 本仓库里有一份本地 Netzob 源码/测试目录。优先使用它,保证和仓库现有示例一致。
LOCAL_NETZOB_SRC = PROJECT_ROOT / "netzob-030" / "test" / "src"
if LOCAL_NETZOB_SRC.exists():
sys.path.insert(0, str(LOCAL_NETZOB_SRC))
# 导入 Netzob 公共 API。
# noqa 注释是告诉代码检查器:星号导入是教学脚本为了贴近 Netzob 教程而保留。
from netzob.all import * # noqa: F401,F403,E402
# 原始天问一号帧字节文件。这里把它当成未知二进制样本,不使用已知解析器。
DEFAULT_INPUT = PROJECT_ROOT / "Tianwen" / "tianwen1_frames_20200730.u8"
def section(title: str) -> None:
"""打印一个清晰的教学分节标题。"""
print("\n" + "=" * 78)
print(title)
print("=" * 78)
def short_hex(data: bytes, max_bytes: int = 48) -> str:
"""把 bytes 转成短十六进制字符串,避免一帧 220 字节全部刷屏。"""
head = data[:max_bytes].hex(" ")
return head + (" ..." if len(data) > max_bytes else "")
def entropy(values) -> float:
"""计算一组离散值的 Shannon entropy。
entropy 越低,说明这个字节位置越稳定,越像版本号、固定标识或填充。
entropy 越高,说明这个字节位置变化越丰富,越像计数器、时间戳或载荷。
"""
counts = Counter(values)
total = len(values)
return -sum((count / total) * math.log2(count / total) for count in counts.values())
def load_raw_bytes(path: Path) -> bytes:
"""读取原始二进制文件。"""
if not path.exists():
raise FileNotFoundError(f"input file not found: {path}")
return path.read_bytes()
def estimate_frame_size(
raw: bytes,
candidate_min: int,
candidate_max: int,
sample_frames: int,
header_columns: int,
):
"""在不知道帧长时,用“候选帧长打分”的方式找可能帧长。
思路很简单:
- 如果帧长猜对了,那么每一行的开头会对齐到真实帧头。
- 真实帧头通常包含版本号、ID、计数器等结构化字段。
- 这些字段的熵一般比随机载荷低。
- 所以对每个候选帧长,把数据切成多行,计算前若干列的平均熵。
- 平均熵越低,越可能是正确帧长。
这不是严格证明,只是未知协议分析中常用的启发式方法。
"""
results = []
for frame_size in range(candidate_min, candidate_max + 1):
frame_count = min(len(raw) // frame_size, sample_frames)
if frame_count < 8:
continue
frames = [
raw[i * frame_size : (i + 1) * frame_size] for i in range(frame_count)
]
columns = min(header_columns, frame_size)
entropies = []
unique_counts = []
for offset in range(columns):
values = [frame[offset] for frame in frames]
entropies.append(entropy(values))
unique_counts.append(len(set(values)))
results.append(
{
"frame_size": frame_size,
"avg_entropy": sum(entropies) / len(entropies),
"avg_unique": sum(unique_counts) / len(unique_counts),
"frame_count": frame_count,
}
)
return sorted(results, key=lambda item: (item["avg_entropy"], item["avg_unique"]))
def slice_frames(raw: bytes, frame_size: int, limit: int | None = None) -> list[bytes]:
"""把连续字节流切成固定长度帧。"""
total_frames = len(raw) // frame_size
if limit is not None:
total_frames = min(total_frames, limit)
return [raw[i * frame_size : (i + 1) * frame_size] for i in range(total_frames)]
def build_symbol(frames: list[bytes], name: str) -> Symbol:
"""把 bytes 帧列表包装成 Netzob RawMessage再放入 Symbol。"""
messages = [RawMessage(data=frame) for frame in frames]
symbol = Symbol(messages=messages, name=name)
# HexaString 让 Netzob 打印 Symbol 时用十六进制展示,更适合二进制协议。
symbol.encodingFunctions.add(TypeEncodingFunction(HexaString))
return symbol
def byte_statistics(frames: list[bytes]) -> list[dict]:
"""按字节偏移统计唯一值数量、熵和最常见取值。"""
frame_size = len(frames[0])
stats = []
for offset in range(frame_size):
values = [frame[offset] for frame in frames]
counts = Counter(values)
stats.append(
{
"offset": offset,
"unique": len(counts),
"entropy": entropy(values),
"top": counts.most_common(4),
}
)
return stats
def print_byte_stats(stats: list[dict], first_columns: int = 32) -> None:
"""打印前若干字节位置的统计表。"""
print("offset unique entropy most common byte values")
print("------ ------ ------- -----------------------")
for item in stats[:first_columns]:
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
print(
f"{item['offset']:>6} {item['unique']:>6} "
f"{item['entropy']:>7.3f} {top}"
)
def print_static_dynamic_regions(stats: list[dict]) -> None:
"""根据 unique 数量粗略标出固定区、低变化区和高变化区。"""
labels = []
for item in stats:
if item["unique"] == 1:
labels.append("static")
elif item["unique"] <= 8:
labels.append("low-var")
else:
labels.append("dynamic")
regions = []
start = 0
current = labels[0]
for index, label in enumerate(labels[1:], start=1):
if label != current:
regions.append((start, index - 1, current))
start = index
current = label
regions.append((start, len(labels) - 1, current))
print("candidate byte regions from simple statistics:")
for start, end, label in regions[:40]:
width = end - start + 1
print(f" bytes {start:03d}-{end:03d} width={width:03d} {label}")
if len(regions) > 40:
print(f" ... {len(regions) - 40} more regions omitted")
def demonstrate_split_static(frames: list[bytes]) -> Symbol:
"""用 Netzob splitStatic 展示自动字段切分。"""
symbol = build_symbol(frames, "unknown_tianwen_frames")
print("Before splitStatic, Netzob sees one raw field:")
print(f" number of fields: {len(symbol.fields)}")
# splitStatic 会比较同一 Symbol 下所有消息的每个位置:
# - 所有样本都相同的位置会变成 static field。
# - 样本之间变化的位置会变成 dynamic field。
Format.splitStatic(
symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=True,
mergeAdjacentDynamicFields=True,
)
print("\nAfter splitStatic(unitSize=8, merge adjacent static/dynamic fields):")
print(f" number of inferred fields: {len(symbol.fields)}")
print(
" teaching note: if most byte positions vary at least once, adjacent "
"dynamic bytes can merge into one large dynamic field."
)
bytewise_symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
Format.splitStatic(
bytewise_symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=False,
mergeAdjacentDynamicFields=False,
)
print("\nAfter splitStatic(unitSize=8, do not merge adjacent fields):")
print(f" number of inferred byte-level fields: {len(bytewise_symbol.fields)}")
print(" first inferred field labels:")
for index, field in enumerate(bytewise_symbol.fields[:24]):
print(f" field[{index:02d}] {field}")
if len(bytewise_symbol.fields) > 24:
print(f" ... {len(bytewise_symbol.fields) - 24} more fields")
return bytewise_symbol
def build_bytewise_symbol(frames: list[bytes]) -> Symbol:
"""把每个字节都切成独立 Field方便选择某个 offset 做聚类 key。"""
symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
Format.splitStatic(
symbol,
unitSize=UnitSize.SIZE_8,
mergeAdjacentStaticFields=False,
mergeAdjacentDynamicFields=False,
)
return symbol
def demonstrate_cluster_by_key_field(
frames: list[bytes], stats: list[dict], cluster_sample_size: int
) -> None:
"""演示如何用某个候选字段作为 key 进行 Netzob 聚类。"""
# 在未知协议中,低变化字段常常适合作为聚类 key比如版本、航天器 ID、
# 虚拟信道 ID、消息类型等。这里先用统计找出一些候选 offset。
candidates = [
item
for item in stats[:32]
if 1 < item["unique"] <= 12
]
candidates = sorted(candidates, key=lambda item: (item["unique"], item["entropy"]))
if not candidates:
print("No low-variation key candidates found in the first 32 bytes.")
return
print("candidate key byte offsets from the first 32 bytes:")
for item in candidates[:8]:
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
print(
f" offset {item['offset']:02d}: unique={item['unique']}, "
f"entropy={item['entropy']:.3f}, top=[{top}]"
)
# 选择最靠前且变化种类较少的字段作为演示 key。
key_offset = candidates[0]["offset"]
cluster_frames = frames[:cluster_sample_size]
print(
f" using {len(cluster_frames)} frames for this cluster demo "
f"(kept small because Netzob clustering can be expensive)"
)
bytewise_symbol = build_bytewise_symbol(cluster_frames)
print(f"\nNetzob clusterByKeyField demo on byte offset {key_offset}:")
print(f" bytewise fields available: {len(bytewise_symbol.fields)}")
# clusterByKeyField 会把拥有相同 key 字段值的消息放进同一个 Symbol。
clusters = Format.clusterByKeyField(bytewise_symbol, bytewise_symbol.fields[key_offset])
print(f" clusters created: {len(clusters)}")
for key, cluster_symbol in list(clusters.items())[:12]:
key_hex = key.hex() if isinstance(key, (bytes, bytearray)) else str(key)
print(
f" key=0x{key_hex:<4} messages={len(cluster_symbol.messages):>4} "
f"fields={len(cluster_symbol.fields)}"
)
def demonstrate_manual_candidate_model(frame_size: int) -> None:
"""用 Netzob Field/Raw 手工搭建一个候选格式模型。
这一步不是声称字段含义已经确定,而是演示逆向分析常见工作流:
先用统计和 splitStatic 找到疑似字段边界,再用 Netzob 明确描述一个候选模型。
"""
section("Step 7 - Manual candidate model with Netzob Field/Raw")
# 这里故意使用“candidate_”前缀表示这些字段只是初步假设。
# 对未知空间帧,通常可以先把开头若干字节当成候选头部,
# 中间大段当成候选数据区,末尾若干字节当成候选尾部/校验/填充。
candidate_header = Field(Raw(nbBytes=6), name="candidate_header_0_5")
candidate_insert_or_secondary = Field(
Raw(nbBytes=8), name="candidate_insert_or_secondary_6_13"
)
candidate_payload = Field(
Raw(nbBytes=max(frame_size - 18, 0)), name="candidate_payload"
)
candidate_tail = Field(Raw(nbBytes=4), name="candidate_tail_4_bytes")
symbol = Symbol(
name="manual_candidate_tianwen_like_frame",
fields=[
candidate_header,
candidate_insert_or_secondary,
candidate_payload,
candidate_tail,
],
)
print("This is a teaching model, not a confirmed Tianwen-1 specification:")
print(symbol.str_structure())
def parse_args() -> argparse.Namespace:
"""命令行参数。"""
parser = argparse.ArgumentParser(
description=(
"Use Netzob to teach unknown binary frame analysis on Tianwen-1 raw data."
)
)
parser.add_argument(
"--input",
type=Path,
default=DEFAULT_INPUT,
help=f"raw binary input file, default: {DEFAULT_INPUT}",
)
parser.add_argument(
"--frame-size",
type=int,
default=None,
help=(
"known or chosen frame size. If omitted, the script estimates it "
"from candidate sizes."
),
)
parser.add_argument(
"--candidate-min",
type=int,
default=180,
help="minimum frame-size candidate used when estimating frame length",
)
parser.add_argument(
"--candidate-max",
type=int,
default=260,
help="maximum frame-size candidate used when estimating frame length",
)
parser.add_argument(
"--sample-size",
type=int,
default=96,
help="number of frames used for Netzob analysis",
)
parser.add_argument(
"--show-samples",
type=int,
default=4,
help="number of raw sample frames to print as short hex",
)
parser.add_argument(
"--cluster-sample-size",
type=int,
default=8,
help="number of frames used only for the Netzob clusterByKeyField demo",
)
return parser.parse_args()
def main() -> None:
"""Run the teaching analysis."""
args = parse_args()
section("Step 1 - Read unknown binary data")
raw = load_raw_bytes(args.input)
print(f"input file: {args.input}")
print(f"total bytes: {len(raw):,}")
section("Step 2 - Estimate or choose a fixed frame size")
if args.frame_size is None:
ranked = estimate_frame_size(
raw,
candidate_min=args.candidate_min,
candidate_max=args.candidate_max,
sample_frames=args.sample_size,
header_columns=16,
)
print("Top frame-size candidates by low average header entropy:")
for item in ranked[:10]:
print(
f" size={item['frame_size']:>3} "
f"avg_entropy={item['avg_entropy']:.3f} "
f"avg_unique={item['avg_unique']:.2f} "
f"frames_tested={item['frame_count']}"
)
frame_size = ranked[0]["frame_size"]
print(f"\nChosen frame size for the rest of this teaching run: {frame_size}")
else:
frame_size = args.frame_size
print(f"Using user-provided frame size: {frame_size}")
all_frame_count = len(raw) // frame_size
frames = slice_frames(raw, frame_size, limit=args.sample_size)
print(f"complete frames in file with this size: {all_frame_count:,}")
print(f"frames sampled for Netzob: {len(frames):,}")
print("\nFirst sample frames as short hex:")
for index, frame in enumerate(frames[: args.show_samples]):
print(f" frame[{index:03d}] {short_hex(frame)}")
section("Step 3 - Wrap samples as Netzob RawMessage and Symbol")
teaching_symbol = build_symbol(frames, "unknown_tianwen_frames")
print("Netzob objects created:")
print(f" RawMessage count: {len(teaching_symbol.messages)}")
print(f" Symbol name: {teaching_symbol.name}")
print(f" Initial field count: {len(teaching_symbol.fields)}")
print(
"Teaching point: at the beginning Netzob only knows each frame is raw bytes; "
"it does not know the protocol fields."
)
section("Step 4 - Byte-position statistics before protocol knowledge")
stats = byte_statistics(frames)
print_byte_stats(stats, first_columns=40)
print()
print_static_dynamic_regions(stats)
section("Step 5 - Netzob Format.splitStatic field inference")
demonstrate_split_static(frames)
section("Step 6 - Netzob clusterByKeyField on candidate key bytes")
demonstrate_cluster_by_key_field(frames, stats, args.cluster_sample_size)
demonstrate_manual_candidate_model(frame_size)
section("Done - What to try next")
print(
"1. Increase --sample-size to see whether inferred fields remain stable.\n"
"2. Try --frame-size with another candidate and compare splitStatic results.\n"
"3. Choose another low-variation offset as cluster key and inspect clusters.\n"
"4. After a candidate field map is stable, then compare it with known CCSDS/Tianwen parsing.\n"
"5. Treat this as protocol-discovery scaffolding, not as a final specification."
)
if __name__ == "__main__":
main()