#!/usr/bin/env python3 """Teach unknown-frame analysis with Netzob on Tianwen-1 raw frame data. 这个脚本的目标不是“直接使用已知的 Tianwen-1 / CCSDS 解析器”,而是假设我们 只拿到一段连续的二进制帧数据,不知道具体空间帧协议,然后用 Netzob 的核心 概念做一次可运行的协议探索教学。 重点演示的 Netzob 概念: 1. RawMessage:把每一帧原始字节包装成 Netzob 消息。 2. Symbol:把一组相似消息放进同一个协议符号。 3. Format.splitStatic:根据样本中固定/变化的字节位置自动切字段。 4. Format.clusterByKeyField:选择某个字段作为 key,把消息按字段值聚类。 5. Field / Raw:在已有观察基础上,手工建立一个“候选帧格式”模型。 注意: - 本脚本不会 import Tianwen.ccsds,也不会调用 AOSFrame.parse。 - 为了教学和运行速度,默认只抽样前 96 帧做 Netzob 推断。 - 原始数据较大,完整协议逆向通常需要多轮实验;这里侧重方法和工具用法。 """ from __future__ import annotations import argparse import math import sys from collections import Counter from pathlib import Path # 让 print 尽量按行输出,便于长流程运行时看到进度。 if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(line_buffering=True) # --------------------------------------------------------------------------- # 路径准备 # --------------------------------------------------------------------------- # 当前脚本位于 /home/zjz/CCSDS_study/test/。 # parent.parent 回到项目根目录 /home/zjz/CCSDS_study。 PROJECT_ROOT = Path(__file__).resolve().parent.parent # 本仓库里有一份本地 Netzob 源码/测试目录。优先使用它,保证和仓库现有示例一致。 LOCAL_NETZOB_SRC = PROJECT_ROOT / "netzob-030" / "test" / "src" if LOCAL_NETZOB_SRC.exists(): sys.path.insert(0, str(LOCAL_NETZOB_SRC)) # 导入 Netzob 公共 API。 # noqa 注释是告诉代码检查器:星号导入是教学脚本为了贴近 Netzob 教程而保留。 from netzob.all import * # noqa: F401,F403,E402 # 原始天问一号帧字节文件。这里把它当成未知二进制样本,不使用已知解析器。 DEFAULT_INPUT = PROJECT_ROOT / "Tianwen" / "tianwen1_frames_20200730.u8" def section(title: str) -> None: """打印一个清晰的教学分节标题。""" print("\n" + "=" * 78) print(title) print("=" * 78) def short_hex(data: bytes, max_bytes: int = 48) -> str: """把 bytes 转成短十六进制字符串,避免一帧 220 字节全部刷屏。""" head = data[:max_bytes].hex(" ") return head + (" ..." if len(data) > max_bytes else "") def entropy(values) -> float: """计算一组离散值的 Shannon entropy。 entropy 越低,说明这个字节位置越稳定,越像版本号、固定标识或填充。 entropy 越高,说明这个字节位置变化越丰富,越像计数器、时间戳或载荷。 """ counts = Counter(values) total = len(values) return -sum((count / total) * math.log2(count / total) for count in counts.values()) def load_raw_bytes(path: Path) -> bytes: """读取原始二进制文件。""" if not path.exists(): raise FileNotFoundError(f"input file not found: {path}") return path.read_bytes() def estimate_frame_size( raw: bytes, candidate_min: int, candidate_max: int, sample_frames: int, header_columns: int, ): """在不知道帧长时,用“候选帧长打分”的方式找可能帧长。 思路很简单: - 如果帧长猜对了,那么每一行的开头会对齐到真实帧头。 - 真实帧头通常包含版本号、ID、计数器等结构化字段。 - 这些字段的熵一般比随机载荷低。 - 所以对每个候选帧长,把数据切成多行,计算前若干列的平均熵。 - 平均熵越低,越可能是正确帧长。 这不是严格证明,只是未知协议分析中常用的启发式方法。 """ results = [] for frame_size in range(candidate_min, candidate_max + 1): frame_count = min(len(raw) // frame_size, sample_frames) if frame_count < 8: continue frames = [ raw[i * frame_size : (i + 1) * frame_size] for i in range(frame_count) ] columns = min(header_columns, frame_size) entropies = [] unique_counts = [] for offset in range(columns): values = [frame[offset] for frame in frames] entropies.append(entropy(values)) unique_counts.append(len(set(values))) results.append( { "frame_size": frame_size, "avg_entropy": sum(entropies) / len(entropies), "avg_unique": sum(unique_counts) / len(unique_counts), "frame_count": frame_count, } ) return sorted(results, key=lambda item: (item["avg_entropy"], item["avg_unique"])) def slice_frames(raw: bytes, frame_size: int, limit: int | None = None) -> list[bytes]: """把连续字节流切成固定长度帧。""" total_frames = len(raw) // frame_size if limit is not None: total_frames = min(total_frames, limit) return [raw[i * frame_size : (i + 1) * frame_size] for i in range(total_frames)] def build_symbol(frames: list[bytes], name: str) -> Symbol: """把 bytes 帧列表包装成 Netzob RawMessage,再放入 Symbol。""" messages = [RawMessage(data=frame) for frame in frames] symbol = Symbol(messages=messages, name=name) # HexaString 让 Netzob 打印 Symbol 时用十六进制展示,更适合二进制协议。 symbol.encodingFunctions.add(TypeEncodingFunction(HexaString)) return symbol def byte_statistics(frames: list[bytes]) -> list[dict]: """按字节偏移统计唯一值数量、熵和最常见取值。""" frame_size = len(frames[0]) stats = [] for offset in range(frame_size): values = [frame[offset] for frame in frames] counts = Counter(values) stats.append( { "offset": offset, "unique": len(counts), "entropy": entropy(values), "top": counts.most_common(4), } ) return stats def print_byte_stats(stats: list[dict], first_columns: int = 32) -> None: """打印前若干字节位置的统计表。""" print("offset unique entropy most common byte values") print("------ ------ ------- -----------------------") for item in stats[:first_columns]: top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"]) print( f"{item['offset']:>6} {item['unique']:>6} " f"{item['entropy']:>7.3f} {top}" ) def print_static_dynamic_regions(stats: list[dict]) -> None: """根据 unique 数量粗略标出固定区、低变化区和高变化区。""" labels = [] for item in stats: if item["unique"] == 1: labels.append("static") elif item["unique"] <= 8: labels.append("low-var") else: labels.append("dynamic") regions = [] start = 0 current = labels[0] for index, label in enumerate(labels[1:], start=1): if label != current: regions.append((start, index - 1, current)) start = index current = label regions.append((start, len(labels) - 1, current)) print("candidate byte regions from simple statistics:") for start, end, label in regions[:40]: width = end - start + 1 print(f" bytes {start:03d}-{end:03d} width={width:03d} {label}") if len(regions) > 40: print(f" ... {len(regions) - 40} more regions omitted") def demonstrate_split_static(frames: list[bytes]) -> Symbol: """用 Netzob splitStatic 展示自动字段切分。""" symbol = build_symbol(frames, "unknown_tianwen_frames") print("Before splitStatic, Netzob sees one raw field:") print(f" number of fields: {len(symbol.fields)}") # splitStatic 会比较同一 Symbol 下所有消息的每个位置: # - 所有样本都相同的位置会变成 static field。 # - 样本之间变化的位置会变成 dynamic field。 Format.splitStatic( symbol, unitSize=UnitSize.SIZE_8, mergeAdjacentStaticFields=True, mergeAdjacentDynamicFields=True, ) print("\nAfter splitStatic(unitSize=8, merge adjacent static/dynamic fields):") print(f" number of inferred fields: {len(symbol.fields)}") print( " teaching note: if most byte positions vary at least once, adjacent " "dynamic bytes can merge into one large dynamic field." ) bytewise_symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise") Format.splitStatic( bytewise_symbol, unitSize=UnitSize.SIZE_8, mergeAdjacentStaticFields=False, mergeAdjacentDynamicFields=False, ) print("\nAfter splitStatic(unitSize=8, do not merge adjacent fields):") print(f" number of inferred byte-level fields: {len(bytewise_symbol.fields)}") print(" first inferred field labels:") for index, field in enumerate(bytewise_symbol.fields[:24]): print(f" field[{index:02d}] {field}") if len(bytewise_symbol.fields) > 24: print(f" ... {len(bytewise_symbol.fields) - 24} more fields") return bytewise_symbol def build_bytewise_symbol(frames: list[bytes]) -> Symbol: """把每个字节都切成独立 Field,方便选择某个 offset 做聚类 key。""" symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise") Format.splitStatic( symbol, unitSize=UnitSize.SIZE_8, mergeAdjacentStaticFields=False, mergeAdjacentDynamicFields=False, ) return symbol def demonstrate_cluster_by_key_field( frames: list[bytes], stats: list[dict], cluster_sample_size: int ) -> None: """演示如何用某个候选字段作为 key 进行 Netzob 聚类。""" # 在未知协议中,低变化字段常常适合作为聚类 key,比如版本、航天器 ID、 # 虚拟信道 ID、消息类型等。这里先用统计找出一些候选 offset。 candidates = [ item for item in stats[:32] if 1 < item["unique"] <= 12 ] candidates = sorted(candidates, key=lambda item: (item["unique"], item["entropy"])) if not candidates: print("No low-variation key candidates found in the first 32 bytes.") return print("candidate key byte offsets from the first 32 bytes:") for item in candidates[:8]: top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"]) print( f" offset {item['offset']:02d}: unique={item['unique']}, " f"entropy={item['entropy']:.3f}, top=[{top}]" ) # 选择最靠前且变化种类较少的字段作为演示 key。 key_offset = candidates[0]["offset"] cluster_frames = frames[:cluster_sample_size] print( f" using {len(cluster_frames)} frames for this cluster demo " f"(kept small because Netzob clustering can be expensive)" ) bytewise_symbol = build_bytewise_symbol(cluster_frames) print(f"\nNetzob clusterByKeyField demo on byte offset {key_offset}:") print(f" bytewise fields available: {len(bytewise_symbol.fields)}") # clusterByKeyField 会把拥有相同 key 字段值的消息放进同一个 Symbol。 clusters = Format.clusterByKeyField(bytewise_symbol, bytewise_symbol.fields[key_offset]) print(f" clusters created: {len(clusters)}") for key, cluster_symbol in list(clusters.items())[:12]: key_hex = key.hex() if isinstance(key, (bytes, bytearray)) else str(key) print( f" key=0x{key_hex:<4} messages={len(cluster_symbol.messages):>4} " f"fields={len(cluster_symbol.fields)}" ) def demonstrate_manual_candidate_model(frame_size: int) -> None: """用 Netzob Field/Raw 手工搭建一个候选格式模型。 这一步不是声称字段含义已经确定,而是演示逆向分析常见工作流: 先用统计和 splitStatic 找到疑似字段边界,再用 Netzob 明确描述一个候选模型。 """ section("Step 7 - Manual candidate model with Netzob Field/Raw") # 这里故意使用“candidate_”前缀,表示这些字段只是初步假设。 # 对未知空间帧,通常可以先把开头若干字节当成候选头部, # 中间大段当成候选数据区,末尾若干字节当成候选尾部/校验/填充。 candidate_header = Field(Raw(nbBytes=6), name="candidate_header_0_5") candidate_insert_or_secondary = Field( Raw(nbBytes=8), name="candidate_insert_or_secondary_6_13" ) candidate_payload = Field( Raw(nbBytes=max(frame_size - 18, 0)), name="candidate_payload" ) candidate_tail = Field(Raw(nbBytes=4), name="candidate_tail_4_bytes") symbol = Symbol( name="manual_candidate_tianwen_like_frame", fields=[ candidate_header, candidate_insert_or_secondary, candidate_payload, candidate_tail, ], ) print("This is a teaching model, not a confirmed Tianwen-1 specification:") print(symbol.str_structure()) def parse_args() -> argparse.Namespace: """命令行参数。""" parser = argparse.ArgumentParser( description=( "Use Netzob to teach unknown binary frame analysis on Tianwen-1 raw data." ) ) parser.add_argument( "--input", type=Path, default=DEFAULT_INPUT, help=f"raw binary input file, default: {DEFAULT_INPUT}", ) parser.add_argument( "--frame-size", type=int, default=None, help=( "known or chosen frame size. If omitted, the script estimates it " "from candidate sizes." ), ) parser.add_argument( "--candidate-min", type=int, default=180, help="minimum frame-size candidate used when estimating frame length", ) parser.add_argument( "--candidate-max", type=int, default=260, help="maximum frame-size candidate used when estimating frame length", ) parser.add_argument( "--sample-size", type=int, default=96, help="number of frames used for Netzob analysis", ) parser.add_argument( "--show-samples", type=int, default=4, help="number of raw sample frames to print as short hex", ) parser.add_argument( "--cluster-sample-size", type=int, default=8, help="number of frames used only for the Netzob clusterByKeyField demo", ) return parser.parse_args() def main() -> None: """Run the teaching analysis.""" args = parse_args() section("Step 1 - Read unknown binary data") raw = load_raw_bytes(args.input) print(f"input file: {args.input}") print(f"total bytes: {len(raw):,}") section("Step 2 - Estimate or choose a fixed frame size") if args.frame_size is None: ranked = estimate_frame_size( raw, candidate_min=args.candidate_min, candidate_max=args.candidate_max, sample_frames=args.sample_size, header_columns=16, ) print("Top frame-size candidates by low average header entropy:") for item in ranked[:10]: print( f" size={item['frame_size']:>3} " f"avg_entropy={item['avg_entropy']:.3f} " f"avg_unique={item['avg_unique']:.2f} " f"frames_tested={item['frame_count']}" ) frame_size = ranked[0]["frame_size"] print(f"\nChosen frame size for the rest of this teaching run: {frame_size}") else: frame_size = args.frame_size print(f"Using user-provided frame size: {frame_size}") all_frame_count = len(raw) // frame_size frames = slice_frames(raw, frame_size, limit=args.sample_size) print(f"complete frames in file with this size: {all_frame_count:,}") print(f"frames sampled for Netzob: {len(frames):,}") print("\nFirst sample frames as short hex:") for index, frame in enumerate(frames[: args.show_samples]): print(f" frame[{index:03d}] {short_hex(frame)}") section("Step 3 - Wrap samples as Netzob RawMessage and Symbol") teaching_symbol = build_symbol(frames, "unknown_tianwen_frames") print("Netzob objects created:") print(f" RawMessage count: {len(teaching_symbol.messages)}") print(f" Symbol name: {teaching_symbol.name}") print(f" Initial field count: {len(teaching_symbol.fields)}") print( "Teaching point: at the beginning Netzob only knows each frame is raw bytes; " "it does not know the protocol fields." ) section("Step 4 - Byte-position statistics before protocol knowledge") stats = byte_statistics(frames) print_byte_stats(stats, first_columns=40) print() print_static_dynamic_regions(stats) section("Step 5 - Netzob Format.splitStatic field inference") demonstrate_split_static(frames) section("Step 6 - Netzob clusterByKeyField on candidate key bytes") demonstrate_cluster_by_key_field(frames, stats, args.cluster_sample_size) demonstrate_manual_candidate_model(frame_size) section("Done - What to try next") print( "1. Increase --sample-size to see whether inferred fields remain stable.\n" "2. Try --frame-size with another candidate and compare splitStatic results.\n" "3. Choose another low-variation offset as cluster key and inspect clusters.\n" "4. After a candidate field map is stable, then compare it with known CCSDS/Tianwen parsing.\n" "5. Treat this as protocol-discovery scaffolding, not as a final specification." ) if __name__ == "__main__": main()