496 lines
18 KiB
Python
496 lines
18 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""Teach unknown-frame analysis with Netzob on Tianwen-1 raw frame data.
|
|||
|
|
|
|||
|
|
这个脚本的目标不是“直接使用已知的 Tianwen-1 / CCSDS 解析器”,而是假设我们
|
|||
|
|
只拿到一段连续的二进制帧数据,不知道具体空间帧协议,然后用 Netzob 的核心
|
|||
|
|
概念做一次可运行的协议探索教学。
|
|||
|
|
|
|||
|
|
重点演示的 Netzob 概念:
|
|||
|
|
1. RawMessage:把每一帧原始字节包装成 Netzob 消息。
|
|||
|
|
2. Symbol:把一组相似消息放进同一个协议符号。
|
|||
|
|
3. Format.splitStatic:根据样本中固定/变化的字节位置自动切字段。
|
|||
|
|
4. Format.clusterByKeyField:选择某个字段作为 key,把消息按字段值聚类。
|
|||
|
|
5. Field / Raw:在已有观察基础上,手工建立一个“候选帧格式”模型。
|
|||
|
|
|
|||
|
|
注意:
|
|||
|
|
- 本脚本不会 import Tianwen.ccsds,也不会调用 AOSFrame.parse。
|
|||
|
|
- 为了教学和运行速度,默认只抽样前 96 帧做 Netzob 推断。
|
|||
|
|
- 原始数据较大,完整协议逆向通常需要多轮实验;这里侧重方法和工具用法。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import math
|
|||
|
|
import sys
|
|||
|
|
from collections import Counter
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
# 让 print 尽量按行输出,便于长流程运行时看到进度。
|
|||
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|||
|
|
sys.stdout.reconfigure(line_buffering=True)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# 路径准备
|
|||
|
|
# ---------------------------------------------------------------------------
|
|||
|
|
# 当前脚本位于 /home/zjz/CCSDS_study/test/。
|
|||
|
|
# parent.parent 回到项目根目录 /home/zjz/CCSDS_study。
|
|||
|
|
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
|||
|
|
|
|||
|
|
# 本仓库里有一份本地 Netzob 源码/测试目录。优先使用它,保证和仓库现有示例一致。
|
|||
|
|
LOCAL_NETZOB_SRC = PROJECT_ROOT / "netzob-030" / "test" / "src"
|
|||
|
|
if LOCAL_NETZOB_SRC.exists():
|
|||
|
|
sys.path.insert(0, str(LOCAL_NETZOB_SRC))
|
|||
|
|
|
|||
|
|
# 导入 Netzob 公共 API。
|
|||
|
|
# noqa 注释是告诉代码检查器:星号导入是教学脚本为了贴近 Netzob 教程而保留。
|
|||
|
|
from netzob.all import * # noqa: F401,F403,E402
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 原始天问一号帧字节文件。这里把它当成未知二进制样本,不使用已知解析器。
|
|||
|
|
DEFAULT_INPUT = PROJECT_ROOT / "Tianwen" / "tianwen1_frames_20200730.u8"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def section(title: str) -> None:
|
|||
|
|
"""打印一个清晰的教学分节标题。"""
|
|||
|
|
|
|||
|
|
print("\n" + "=" * 78)
|
|||
|
|
print(title)
|
|||
|
|
print("=" * 78)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def short_hex(data: bytes, max_bytes: int = 48) -> str:
|
|||
|
|
"""把 bytes 转成短十六进制字符串,避免一帧 220 字节全部刷屏。"""
|
|||
|
|
|
|||
|
|
head = data[:max_bytes].hex(" ")
|
|||
|
|
return head + (" ..." if len(data) > max_bytes else "")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def entropy(values) -> float:
|
|||
|
|
"""计算一组离散值的 Shannon entropy。
|
|||
|
|
|
|||
|
|
entropy 越低,说明这个字节位置越稳定,越像版本号、固定标识或填充。
|
|||
|
|
entropy 越高,说明这个字节位置变化越丰富,越像计数器、时间戳或载荷。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
counts = Counter(values)
|
|||
|
|
total = len(values)
|
|||
|
|
return -sum((count / total) * math.log2(count / total) for count in counts.values())
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_raw_bytes(path: Path) -> bytes:
|
|||
|
|
"""读取原始二进制文件。"""
|
|||
|
|
|
|||
|
|
if not path.exists():
|
|||
|
|
raise FileNotFoundError(f"input file not found: {path}")
|
|||
|
|
return path.read_bytes()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def estimate_frame_size(
|
|||
|
|
raw: bytes,
|
|||
|
|
candidate_min: int,
|
|||
|
|
candidate_max: int,
|
|||
|
|
sample_frames: int,
|
|||
|
|
header_columns: int,
|
|||
|
|
):
|
|||
|
|
"""在不知道帧长时,用“候选帧长打分”的方式找可能帧长。
|
|||
|
|
|
|||
|
|
思路很简单:
|
|||
|
|
- 如果帧长猜对了,那么每一行的开头会对齐到真实帧头。
|
|||
|
|
- 真实帧头通常包含版本号、ID、计数器等结构化字段。
|
|||
|
|
- 这些字段的熵一般比随机载荷低。
|
|||
|
|
- 所以对每个候选帧长,把数据切成多行,计算前若干列的平均熵。
|
|||
|
|
- 平均熵越低,越可能是正确帧长。
|
|||
|
|
|
|||
|
|
这不是严格证明,只是未知协议分析中常用的启发式方法。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
results = []
|
|||
|
|
for frame_size in range(candidate_min, candidate_max + 1):
|
|||
|
|
frame_count = min(len(raw) // frame_size, sample_frames)
|
|||
|
|
if frame_count < 8:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
frames = [
|
|||
|
|
raw[i * frame_size : (i + 1) * frame_size] for i in range(frame_count)
|
|||
|
|
]
|
|||
|
|
columns = min(header_columns, frame_size)
|
|||
|
|
entropies = []
|
|||
|
|
unique_counts = []
|
|||
|
|
for offset in range(columns):
|
|||
|
|
values = [frame[offset] for frame in frames]
|
|||
|
|
entropies.append(entropy(values))
|
|||
|
|
unique_counts.append(len(set(values)))
|
|||
|
|
|
|||
|
|
results.append(
|
|||
|
|
{
|
|||
|
|
"frame_size": frame_size,
|
|||
|
|
"avg_entropy": sum(entropies) / len(entropies),
|
|||
|
|
"avg_unique": sum(unique_counts) / len(unique_counts),
|
|||
|
|
"frame_count": frame_count,
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return sorted(results, key=lambda item: (item["avg_entropy"], item["avg_unique"]))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def slice_frames(raw: bytes, frame_size: int, limit: int | None = None) -> list[bytes]:
|
|||
|
|
"""把连续字节流切成固定长度帧。"""
|
|||
|
|
|
|||
|
|
total_frames = len(raw) // frame_size
|
|||
|
|
if limit is not None:
|
|||
|
|
total_frames = min(total_frames, limit)
|
|||
|
|
return [raw[i * frame_size : (i + 1) * frame_size] for i in range(total_frames)]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_symbol(frames: list[bytes], name: str) -> Symbol:
|
|||
|
|
"""把 bytes 帧列表包装成 Netzob RawMessage,再放入 Symbol。"""
|
|||
|
|
|
|||
|
|
messages = [RawMessage(data=frame) for frame in frames]
|
|||
|
|
symbol = Symbol(messages=messages, name=name)
|
|||
|
|
|
|||
|
|
# HexaString 让 Netzob 打印 Symbol 时用十六进制展示,更适合二进制协议。
|
|||
|
|
symbol.encodingFunctions.add(TypeEncodingFunction(HexaString))
|
|||
|
|
return symbol
|
|||
|
|
|
|||
|
|
|
|||
|
|
def byte_statistics(frames: list[bytes]) -> list[dict]:
|
|||
|
|
"""按字节偏移统计唯一值数量、熵和最常见取值。"""
|
|||
|
|
|
|||
|
|
frame_size = len(frames[0])
|
|||
|
|
stats = []
|
|||
|
|
for offset in range(frame_size):
|
|||
|
|
values = [frame[offset] for frame in frames]
|
|||
|
|
counts = Counter(values)
|
|||
|
|
stats.append(
|
|||
|
|
{
|
|||
|
|
"offset": offset,
|
|||
|
|
"unique": len(counts),
|
|||
|
|
"entropy": entropy(values),
|
|||
|
|
"top": counts.most_common(4),
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
return stats
|
|||
|
|
|
|||
|
|
|
|||
|
|
def print_byte_stats(stats: list[dict], first_columns: int = 32) -> None:
|
|||
|
|
"""打印前若干字节位置的统计表。"""
|
|||
|
|
|
|||
|
|
print("offset unique entropy most common byte values")
|
|||
|
|
print("------ ------ ------- -----------------------")
|
|||
|
|
for item in stats[:first_columns]:
|
|||
|
|
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
|
|||
|
|
print(
|
|||
|
|
f"{item['offset']:>6} {item['unique']:>6} "
|
|||
|
|
f"{item['entropy']:>7.3f} {top}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def print_static_dynamic_regions(stats: list[dict]) -> None:
|
|||
|
|
"""根据 unique 数量粗略标出固定区、低变化区和高变化区。"""
|
|||
|
|
|
|||
|
|
labels = []
|
|||
|
|
for item in stats:
|
|||
|
|
if item["unique"] == 1:
|
|||
|
|
labels.append("static")
|
|||
|
|
elif item["unique"] <= 8:
|
|||
|
|
labels.append("low-var")
|
|||
|
|
else:
|
|||
|
|
labels.append("dynamic")
|
|||
|
|
|
|||
|
|
regions = []
|
|||
|
|
start = 0
|
|||
|
|
current = labels[0]
|
|||
|
|
for index, label in enumerate(labels[1:], start=1):
|
|||
|
|
if label != current:
|
|||
|
|
regions.append((start, index - 1, current))
|
|||
|
|
start = index
|
|||
|
|
current = label
|
|||
|
|
regions.append((start, len(labels) - 1, current))
|
|||
|
|
|
|||
|
|
print("candidate byte regions from simple statistics:")
|
|||
|
|
for start, end, label in regions[:40]:
|
|||
|
|
width = end - start + 1
|
|||
|
|
print(f" bytes {start:03d}-{end:03d} width={width:03d} {label}")
|
|||
|
|
if len(regions) > 40:
|
|||
|
|
print(f" ... {len(regions) - 40} more regions omitted")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def demonstrate_split_static(frames: list[bytes]) -> Symbol:
|
|||
|
|
"""用 Netzob splitStatic 展示自动字段切分。"""
|
|||
|
|
|
|||
|
|
symbol = build_symbol(frames, "unknown_tianwen_frames")
|
|||
|
|
print("Before splitStatic, Netzob sees one raw field:")
|
|||
|
|
print(f" number of fields: {len(symbol.fields)}")
|
|||
|
|
|
|||
|
|
# splitStatic 会比较同一 Symbol 下所有消息的每个位置:
|
|||
|
|
# - 所有样本都相同的位置会变成 static field。
|
|||
|
|
# - 样本之间变化的位置会变成 dynamic field。
|
|||
|
|
Format.splitStatic(
|
|||
|
|
symbol,
|
|||
|
|
unitSize=UnitSize.SIZE_8,
|
|||
|
|
mergeAdjacentStaticFields=True,
|
|||
|
|
mergeAdjacentDynamicFields=True,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print("\nAfter splitStatic(unitSize=8, merge adjacent static/dynamic fields):")
|
|||
|
|
print(f" number of inferred fields: {len(symbol.fields)}")
|
|||
|
|
print(
|
|||
|
|
" teaching note: if most byte positions vary at least once, adjacent "
|
|||
|
|
"dynamic bytes can merge into one large dynamic field."
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
bytewise_symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
|
|||
|
|
Format.splitStatic(
|
|||
|
|
bytewise_symbol,
|
|||
|
|
unitSize=UnitSize.SIZE_8,
|
|||
|
|
mergeAdjacentStaticFields=False,
|
|||
|
|
mergeAdjacentDynamicFields=False,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print("\nAfter splitStatic(unitSize=8, do not merge adjacent fields):")
|
|||
|
|
print(f" number of inferred byte-level fields: {len(bytewise_symbol.fields)}")
|
|||
|
|
print(" first inferred field labels:")
|
|||
|
|
for index, field in enumerate(bytewise_symbol.fields[:24]):
|
|||
|
|
print(f" field[{index:02d}] {field}")
|
|||
|
|
if len(bytewise_symbol.fields) > 24:
|
|||
|
|
print(f" ... {len(bytewise_symbol.fields) - 24} more fields")
|
|||
|
|
return bytewise_symbol
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_bytewise_symbol(frames: list[bytes]) -> Symbol:
|
|||
|
|
"""把每个字节都切成独立 Field,方便选择某个 offset 做聚类 key。"""
|
|||
|
|
|
|||
|
|
symbol = build_symbol(frames, "unknown_tianwen_frames_bytewise")
|
|||
|
|
Format.splitStatic(
|
|||
|
|
symbol,
|
|||
|
|
unitSize=UnitSize.SIZE_8,
|
|||
|
|
mergeAdjacentStaticFields=False,
|
|||
|
|
mergeAdjacentDynamicFields=False,
|
|||
|
|
)
|
|||
|
|
return symbol
|
|||
|
|
|
|||
|
|
|
|||
|
|
def demonstrate_cluster_by_key_field(
|
|||
|
|
frames: list[bytes], stats: list[dict], cluster_sample_size: int
|
|||
|
|
) -> None:
|
|||
|
|
"""演示如何用某个候选字段作为 key 进行 Netzob 聚类。"""
|
|||
|
|
|
|||
|
|
# 在未知协议中,低变化字段常常适合作为聚类 key,比如版本、航天器 ID、
|
|||
|
|
# 虚拟信道 ID、消息类型等。这里先用统计找出一些候选 offset。
|
|||
|
|
candidates = [
|
|||
|
|
item
|
|||
|
|
for item in stats[:32]
|
|||
|
|
if 1 < item["unique"] <= 12
|
|||
|
|
]
|
|||
|
|
candidates = sorted(candidates, key=lambda item: (item["unique"], item["entropy"]))
|
|||
|
|
|
|||
|
|
if not candidates:
|
|||
|
|
print("No low-variation key candidates found in the first 32 bytes.")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print("candidate key byte offsets from the first 32 bytes:")
|
|||
|
|
for item in candidates[:8]:
|
|||
|
|
top = ", ".join(f"0x{value:02x}:{count}" for value, count in item["top"])
|
|||
|
|
print(
|
|||
|
|
f" offset {item['offset']:02d}: unique={item['unique']}, "
|
|||
|
|
f"entropy={item['entropy']:.3f}, top=[{top}]"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 选择最靠前且变化种类较少的字段作为演示 key。
|
|||
|
|
key_offset = candidates[0]["offset"]
|
|||
|
|
cluster_frames = frames[:cluster_sample_size]
|
|||
|
|
print(
|
|||
|
|
f" using {len(cluster_frames)} frames for this cluster demo "
|
|||
|
|
f"(kept small because Netzob clustering can be expensive)"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
bytewise_symbol = build_bytewise_symbol(cluster_frames)
|
|||
|
|
|
|||
|
|
print(f"\nNetzob clusterByKeyField demo on byte offset {key_offset}:")
|
|||
|
|
print(f" bytewise fields available: {len(bytewise_symbol.fields)}")
|
|||
|
|
|
|||
|
|
# clusterByKeyField 会把拥有相同 key 字段值的消息放进同一个 Symbol。
|
|||
|
|
clusters = Format.clusterByKeyField(bytewise_symbol, bytewise_symbol.fields[key_offset])
|
|||
|
|
|
|||
|
|
print(f" clusters created: {len(clusters)}")
|
|||
|
|
for key, cluster_symbol in list(clusters.items())[:12]:
|
|||
|
|
key_hex = key.hex() if isinstance(key, (bytes, bytearray)) else str(key)
|
|||
|
|
print(
|
|||
|
|
f" key=0x{key_hex:<4} messages={len(cluster_symbol.messages):>4} "
|
|||
|
|
f"fields={len(cluster_symbol.fields)}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def demonstrate_manual_candidate_model(frame_size: int) -> None:
|
|||
|
|
"""用 Netzob Field/Raw 手工搭建一个候选格式模型。
|
|||
|
|
|
|||
|
|
这一步不是声称字段含义已经确定,而是演示逆向分析常见工作流:
|
|||
|
|
先用统计和 splitStatic 找到疑似字段边界,再用 Netzob 明确描述一个候选模型。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
section("Step 7 - Manual candidate model with Netzob Field/Raw")
|
|||
|
|
|
|||
|
|
# 这里故意使用“candidate_”前缀,表示这些字段只是初步假设。
|
|||
|
|
# 对未知空间帧,通常可以先把开头若干字节当成候选头部,
|
|||
|
|
# 中间大段当成候选数据区,末尾若干字节当成候选尾部/校验/填充。
|
|||
|
|
candidate_header = Field(Raw(nbBytes=6), name="candidate_header_0_5")
|
|||
|
|
candidate_insert_or_secondary = Field(
|
|||
|
|
Raw(nbBytes=8), name="candidate_insert_or_secondary_6_13"
|
|||
|
|
)
|
|||
|
|
candidate_payload = Field(
|
|||
|
|
Raw(nbBytes=max(frame_size - 18, 0)), name="candidate_payload"
|
|||
|
|
)
|
|||
|
|
candidate_tail = Field(Raw(nbBytes=4), name="candidate_tail_4_bytes")
|
|||
|
|
|
|||
|
|
symbol = Symbol(
|
|||
|
|
name="manual_candidate_tianwen_like_frame",
|
|||
|
|
fields=[
|
|||
|
|
candidate_header,
|
|||
|
|
candidate_insert_or_secondary,
|
|||
|
|
candidate_payload,
|
|||
|
|
candidate_tail,
|
|||
|
|
],
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print("This is a teaching model, not a confirmed Tianwen-1 specification:")
|
|||
|
|
print(symbol.str_structure())
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_args() -> argparse.Namespace:
|
|||
|
|
"""命令行参数。"""
|
|||
|
|
|
|||
|
|
parser = argparse.ArgumentParser(
|
|||
|
|
description=(
|
|||
|
|
"Use Netzob to teach unknown binary frame analysis on Tianwen-1 raw data."
|
|||
|
|
)
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--input",
|
|||
|
|
type=Path,
|
|||
|
|
default=DEFAULT_INPUT,
|
|||
|
|
help=f"raw binary input file, default: {DEFAULT_INPUT}",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--frame-size",
|
|||
|
|
type=int,
|
|||
|
|
default=None,
|
|||
|
|
help=(
|
|||
|
|
"known or chosen frame size. If omitted, the script estimates it "
|
|||
|
|
"from candidate sizes."
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--candidate-min",
|
|||
|
|
type=int,
|
|||
|
|
default=180,
|
|||
|
|
help="minimum frame-size candidate used when estimating frame length",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--candidate-max",
|
|||
|
|
type=int,
|
|||
|
|
default=260,
|
|||
|
|
help="maximum frame-size candidate used when estimating frame length",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--sample-size",
|
|||
|
|
type=int,
|
|||
|
|
default=96,
|
|||
|
|
help="number of frames used for Netzob analysis",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--show-samples",
|
|||
|
|
type=int,
|
|||
|
|
default=4,
|
|||
|
|
help="number of raw sample frames to print as short hex",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--cluster-sample-size",
|
|||
|
|
type=int,
|
|||
|
|
default=8,
|
|||
|
|
help="number of frames used only for the Netzob clusterByKeyField demo",
|
|||
|
|
)
|
|||
|
|
return parser.parse_args()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main() -> None:
|
|||
|
|
"""Run the teaching analysis."""
|
|||
|
|
|
|||
|
|
args = parse_args()
|
|||
|
|
|
|||
|
|
section("Step 1 - Read unknown binary data")
|
|||
|
|
raw = load_raw_bytes(args.input)
|
|||
|
|
print(f"input file: {args.input}")
|
|||
|
|
print(f"total bytes: {len(raw):,}")
|
|||
|
|
|
|||
|
|
section("Step 2 - Estimate or choose a fixed frame size")
|
|||
|
|
if args.frame_size is None:
|
|||
|
|
ranked = estimate_frame_size(
|
|||
|
|
raw,
|
|||
|
|
candidate_min=args.candidate_min,
|
|||
|
|
candidate_max=args.candidate_max,
|
|||
|
|
sample_frames=args.sample_size,
|
|||
|
|
header_columns=16,
|
|||
|
|
)
|
|||
|
|
print("Top frame-size candidates by low average header entropy:")
|
|||
|
|
for item in ranked[:10]:
|
|||
|
|
print(
|
|||
|
|
f" size={item['frame_size']:>3} "
|
|||
|
|
f"avg_entropy={item['avg_entropy']:.3f} "
|
|||
|
|
f"avg_unique={item['avg_unique']:.2f} "
|
|||
|
|
f"frames_tested={item['frame_count']}"
|
|||
|
|
)
|
|||
|
|
frame_size = ranked[0]["frame_size"]
|
|||
|
|
print(f"\nChosen frame size for the rest of this teaching run: {frame_size}")
|
|||
|
|
else:
|
|||
|
|
frame_size = args.frame_size
|
|||
|
|
print(f"Using user-provided frame size: {frame_size}")
|
|||
|
|
|
|||
|
|
all_frame_count = len(raw) // frame_size
|
|||
|
|
frames = slice_frames(raw, frame_size, limit=args.sample_size)
|
|||
|
|
print(f"complete frames in file with this size: {all_frame_count:,}")
|
|||
|
|
print(f"frames sampled for Netzob: {len(frames):,}")
|
|||
|
|
|
|||
|
|
print("\nFirst sample frames as short hex:")
|
|||
|
|
for index, frame in enumerate(frames[: args.show_samples]):
|
|||
|
|
print(f" frame[{index:03d}] {short_hex(frame)}")
|
|||
|
|
|
|||
|
|
section("Step 3 - Wrap samples as Netzob RawMessage and Symbol")
|
|||
|
|
teaching_symbol = build_symbol(frames, "unknown_tianwen_frames")
|
|||
|
|
print("Netzob objects created:")
|
|||
|
|
print(f" RawMessage count: {len(teaching_symbol.messages)}")
|
|||
|
|
print(f" Symbol name: {teaching_symbol.name}")
|
|||
|
|
print(f" Initial field count: {len(teaching_symbol.fields)}")
|
|||
|
|
print(
|
|||
|
|
"Teaching point: at the beginning Netzob only knows each frame is raw bytes; "
|
|||
|
|
"it does not know the protocol fields."
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
section("Step 4 - Byte-position statistics before protocol knowledge")
|
|||
|
|
stats = byte_statistics(frames)
|
|||
|
|
print_byte_stats(stats, first_columns=40)
|
|||
|
|
print()
|
|||
|
|
print_static_dynamic_regions(stats)
|
|||
|
|
|
|||
|
|
section("Step 5 - Netzob Format.splitStatic field inference")
|
|||
|
|
demonstrate_split_static(frames)
|
|||
|
|
|
|||
|
|
section("Step 6 - Netzob clusterByKeyField on candidate key bytes")
|
|||
|
|
demonstrate_cluster_by_key_field(frames, stats, args.cluster_sample_size)
|
|||
|
|
|
|||
|
|
demonstrate_manual_candidate_model(frame_size)
|
|||
|
|
|
|||
|
|
section("Done - What to try next")
|
|||
|
|
print(
|
|||
|
|
"1. Increase --sample-size to see whether inferred fields remain stable.\n"
|
|||
|
|
"2. Try --frame-size with another candidate and compare splitStatic results.\n"
|
|||
|
|
"3. Choose another low-variation offset as cluster key and inspect clusters.\n"
|
|||
|
|
"4. After a candidate field map is stable, then compare it with known CCSDS/Tianwen parsing.\n"
|
|||
|
|
"5. Treat this as protocol-discovery scaffolding, not as a final specification."
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|