127 lines
4.5 KiB
Python
127 lines
4.5 KiB
Python
|
|
"""
|
||
|
|
Toy Protocol Inference - based on discover_features.rst tutorial
|
||
|
|
|
||
|
|
Protocol structure:
|
||
|
|
<CMD/RES name> '#' <4-byte little-endian length> [<data>]
|
||
|
|
|
||
|
|
Steps covered:
|
||
|
|
1. Import messages from PCAP files
|
||
|
|
2. Split by '#' delimiter
|
||
|
|
3. Cluster by key field (command name)
|
||
|
|
4. Sequence alignment on payload field
|
||
|
|
5. Find and apply size relations
|
||
|
|
6. Generate automata (chained / one-state / PTA)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import sys
|
||
|
|
sys.path.insert(0, '/home/zjz/CCSDS_study/netzob-030/test/src')
|
||
|
|
|
||
|
|
from netzob.all import *
|
||
|
|
|
||
|
|
PCAP_DIR = "/home/zjz/CCSDS_study/netzob-030/test/resources/pcaps"
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 1. Import messages from PCAP files
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
messages_session1 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session1.pcap").values())
|
||
|
|
messages_session2 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session2.pcap").values())
|
||
|
|
messages_session3 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session3.pcap").values())
|
||
|
|
|
||
|
|
messages = messages_session1 + messages_session2
|
||
|
|
|
||
|
|
print("=" * 60)
|
||
|
|
print("Step 1: Messages imported from PCAP")
|
||
|
|
print("=" * 60)
|
||
|
|
print(f"Session1: {len(messages_session1)} messages")
|
||
|
|
print(f"Session2: {len(messages_session2)} messages")
|
||
|
|
print(f"Session3: {len(messages_session3)} messages")
|
||
|
|
for m in messages:
|
||
|
|
print(m)
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 2. Split by '#' delimiter
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
symbol = Symbol(messages=messages)
|
||
|
|
Format.splitDelimiter(symbol, String("#"))
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 2: After splitDelimiter('#')")
|
||
|
|
print("=" * 60)
|
||
|
|
print(symbol)
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 3. Cluster by key field (first field = command name)
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
symbols = Format.clusterByKeyField(symbol, symbol.fields[0])
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 3: Symbols after clusterByKeyField")
|
||
|
|
print("=" * 60)
|
||
|
|
print(f"Number of symbols: {len(symbols)}")
|
||
|
|
for name in sorted(symbols.keys()):
|
||
|
|
print(f" * {name}")
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 4. Sequence alignment on payload field (field[2])
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 4: Sequence alignment on payload field")
|
||
|
|
print("=" * 60)
|
||
|
|
for name, sym in symbols.items():
|
||
|
|
if len(sym.fields) >= 3:
|
||
|
|
Format.splitAligned(sym.fields[2], doInternalSlick=True)
|
||
|
|
print(f"\n[{name}]")
|
||
|
|
print(sym)
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 5. Find and apply size relations
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 5: Find and apply size relations")
|
||
|
|
print("=" * 60)
|
||
|
|
for name, sym in symbols.items():
|
||
|
|
rels = RelationFinder.findOnSymbol(sym)
|
||
|
|
if rels:
|
||
|
|
print(f"\n[{name}] Relations found:")
|
||
|
|
for rel in rels:
|
||
|
|
print(f" {rel['relation_type']}: '{rel['x_attribute']}' <-> '{rel['y_attribute']}'")
|
||
|
|
# Apply first relation
|
||
|
|
rels[0]["x_fields"][0].domain = Size(rels[0]["y_fields"], factor=1/8.0)
|
||
|
|
|
||
|
|
print("\n[CMDencrypt] structure after applying Size relation:")
|
||
|
|
if "CMDencrypt" in symbols:
|
||
|
|
print(symbols["CMDencrypt"]._str_debug())
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# 6. Generate automata
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
sym_list = list(symbols.values())
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 6a: Chained states automaton (session1)")
|
||
|
|
print("=" * 60)
|
||
|
|
session1 = Session(messages_session1)
|
||
|
|
abstract1 = session1.abstract(sym_list)
|
||
|
|
automata_chained = Automata.generateChainedStatesAutomata(abstract1, sym_list)
|
||
|
|
print(automata_chained.generateDotCode())
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 6b: One-state automaton (session1)")
|
||
|
|
print("=" * 60)
|
||
|
|
automata_one = Automata.generateOneStateAutomata(abstract1, sym_list)
|
||
|
|
print(automata_one.generateDotCode())
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Step 6c: PTA automaton (session1 + session3)")
|
||
|
|
print("=" * 60)
|
||
|
|
session3 = Session(messages_session3)
|
||
|
|
abstract3 = session3.abstract(sym_list)
|
||
|
|
automata_pta = Automata.generatePTAAutomata([abstract1, abstract3], sym_list)
|
||
|
|
print(automata_pta.generateDotCode())
|
||
|
|
|
||
|
|
print("\n" + "=" * 60)
|
||
|
|
print("Done. To visualize dot output: pipe to 'dot -Tsvg -o out.svg'")
|
||
|
|
print("=" * 60)
|
||
|
|
|
||
|
|
|