Files
CCSDS_study/test/run_toy_protocol_old.py

175 lines
6.2 KiB
Python
Raw Permalink Normal View History

2026-05-05 21:54:35 +08:00
"""
Toy Protocol Inference - based on discover_features.rst tutorial
Reconstructed from the documented message samples (no PCAP files needed).
Protocol structure:
<CMD/RES name> '#' <4-byte little-endian length> [<data>]
Steps covered:
1. Build raw messages from known session data
2. Split by '#' delimiter
3. Cluster by key field (command name)
4. Sequence alignment on payload field
5. Find and apply size relations
6. Generate automata (chained / one-state / PTA)
"""
import sys
sys.path.insert(0, '/home/zjz/CCSDS_study/netzob-030/test/src')
from netzob.all import *
# ---------------------------------------------------------------------------
# 1. Raw session messages (reconstructed from tutorial output)
# ---------------------------------------------------------------------------
def make_msg(raw: bytes) -> RawMessage:
return RawMessage(data=raw)
session1_raw = [
b"CMDidentify#\x07\x00\x00\x00Roberto",
b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDinfo#\x00\x00\x00\x00",
b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info",
b"CMDstats#\x00\x00\x00\x00",
b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats",
b"CMDauthentify#\n\x00\x00\x00aStrongPwd",
b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDencrypt#\x06\x00\x00\x00abcdef",
b"RESencrypt#\x00\x00\x00\x00\x06\x00\x00\x00$ !&'$",
b"CMDdecrypt#\x06\x00\x00\x00$ !&'$",
b"RESdecrypt#\x00\x00\x00\x00\x06\x00\x00\x00abcdef",
b"CMDbye#\x00\x00\x00\x00",
b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00",
]
session2_raw = [
b"CMDidentify#\x04\x00\x00\x00fred",
b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDinfo#\x00\x00\x00\x00",
b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info",
b"CMDstats#\x00\x00\x00\x00",
b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats",
b"CMDauthentify#\t\x00\x00\x00myPasswd!",
b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDencrypt#\n\x00\x00\x00123456test",
b"RESencrypt#\x00\x00\x00\x00\n\x00\x00\x00spqvwt6'16",
b"CMDdecrypt#\n\x00\x00\x00spqvwt6'16",
b"RESdecrypt#\x00\x00\x00\x00\n\x00\x00\x00123456test",
b"CMDbye#\x00\x00\x00\x00",
b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00",
]
# session3: skip decrypt step (different path for PTA automata)
session3_raw = [
b"CMDidentify#\n\x00\x00\x00123456test",
b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDinfo#\x00\x00\x00\x00",
b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info",
b"CMDstats#\x00\x00\x00\x00",
b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats",
b"CMDauthentify#\n\x00\x00\x00123456test",
b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00",
b"CMDdecrypt#\x06\x00\x00\x00abcdef",
b"RESdecrypt#\x00\x00\x00\x00\x06\x00\x00\x00abcdef",
b"CMDbye#\x00\x00\x00\x00",
b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00",
]
messages_session1 = [make_msg(r) for r in session1_raw]
messages_session2 = [make_msg(r) for r in session2_raw]
messages_session3 = [make_msg(r) for r in session3_raw]
messages = messages_session1 + messages_session2
print("=" * 60)
print("Step 1: Raw messages")
print("=" * 60)
for m in messages:
print(repr(m.data))
# ---------------------------------------------------------------------------
# 2. Split by '#' delimiter
# ---------------------------------------------------------------------------
symbol = Symbol(messages=messages)
Format.splitDelimiter(symbol, ASCII("#"))
print("\n" + "=" * 60)
print("Step 2: After splitDelimiter('#')")
print("=" * 60)
print(symbol)
# ---------------------------------------------------------------------------
# 3. Cluster by key field (first field = command name)
# ---------------------------------------------------------------------------
symbols = Format.clusterByKeyField(symbol, symbol.fields[0])
print("\n" + "=" * 60)
print("Step 3: Symbols after clusterByKeyField")
print("=" * 60)
print(f"Number of symbols: {len(symbols)}")
for name in sorted(symbols.keys()):
print(f" * {name}")
# ---------------------------------------------------------------------------
# 4. Sequence alignment on payload field (field[2])
# ---------------------------------------------------------------------------
print("\n" + "=" * 60)
print("Step 4: Sequence alignment on payload field")
print("=" * 60)
for name, sym in symbols.items():
if len(sym.fields) >= 3:
Format.splitAligned(sym.fields[2], doInternalSlick=True)
print(f"\n[{name}]")
print(sym)
# ---------------------------------------------------------------------------
# 5. Find and apply size relations
# ---------------------------------------------------------------------------
print("\n" + "=" * 60)
print("Step 5: Find and apply size relations")
print("=" * 60)
for name, sym in symbols.items():
rels = RelationFinder.findOnSymbol(sym)
if rels:
print(f"\n[{name}] Relations found:")
for rel in rels:
print(f" {rel['relation_type']}: '{rel['x_attribute']}' <-> '{rel['y_attribute']}'")
# Apply first relation
rels[0]["x_fields"][0].domain = Size(rels[0]["y_fields"], factor=1/8.0)
print("\n[CMDencrypt] structure after applying Size relation:")
if "CMDencrypt" in symbols:
print(symbols["CMDencrypt"]._str_debug())
# ---------------------------------------------------------------------------
# 6. Generate automata
# ---------------------------------------------------------------------------
sym_list = list(symbols.values())
print("\n" + "=" * 60)
print("Step 6a: Chained states automaton (session1)")
print("=" * 60)
session1 = Session(messages_session1)
abstract1 = session1.abstract(sym_list)
automata_chained = Automata.generateChainedStatesAutomata(abstract1, sym_list)
print(automata_chained.generateDotCode())
print("\n" + "=" * 60)
print("Step 6b: One-state automaton (session1)")
print("=" * 60)
automata_one = Automata.generateOneStateAutomata(abstract1, sym_list)
print(automata_one.generateDotCode())
print("\n" + "=" * 60)
print("Step 6c: PTA automaton (session1 + session3)")
print("=" * 60)
session3 = Session(messages_session3)
abstract3 = session3.abstract(sym_list)
automata_pta = Automata.generatePTAAutomata([abstract1, abstract3], sym_list)
print(automata_pta.generateDotCode())
print("\n" + "=" * 60)
print("Done. To visualize dot output: pipe to 'dot -Tsvg -o out.svg'")
print("=" * 60)