""" Toy Protocol Inference - based on discover_features.rst tutorial Reconstructed from the documented message samples (no PCAP files needed). Protocol structure: '#' <4-byte little-endian length> [] Steps covered: 1. Build raw messages from known session data 2. Split by '#' delimiter 3. Cluster by key field (command name) 4. Sequence alignment on payload field 5. Find and apply size relations 6. Generate automata (chained / one-state / PTA) """ import sys sys.path.insert(0, '/home/zjz/CCSDS_study/netzob-030/test/src') from netzob.all import * # --------------------------------------------------------------------------- # 1. Raw session messages (reconstructed from tutorial output) # --------------------------------------------------------------------------- def make_msg(raw: bytes) -> RawMessage: return RawMessage(data=raw) session1_raw = [ b"CMDidentify#\x07\x00\x00\x00Roberto", b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDinfo#\x00\x00\x00\x00", b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info", b"CMDstats#\x00\x00\x00\x00", b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats", b"CMDauthentify#\n\x00\x00\x00aStrongPwd", b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDencrypt#\x06\x00\x00\x00abcdef", b"RESencrypt#\x00\x00\x00\x00\x06\x00\x00\x00$ !&'$", b"CMDdecrypt#\x06\x00\x00\x00$ !&'$", b"RESdecrypt#\x00\x00\x00\x00\x06\x00\x00\x00abcdef", b"CMDbye#\x00\x00\x00\x00", b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00", ] session2_raw = [ b"CMDidentify#\x04\x00\x00\x00fred", b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDinfo#\x00\x00\x00\x00", b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info", b"CMDstats#\x00\x00\x00\x00", b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats", b"CMDauthentify#\t\x00\x00\x00myPasswd!", b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDencrypt#\n\x00\x00\x00123456test", b"RESencrypt#\x00\x00\x00\x00\n\x00\x00\x00spqvwt6'16", b"CMDdecrypt#\n\x00\x00\x00spqvwt6'16", b"RESdecrypt#\x00\x00\x00\x00\n\x00\x00\x00123456test", b"CMDbye#\x00\x00\x00\x00", b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00", ] # session3: skip decrypt step (different path for PTA automata) session3_raw = [ b"CMDidentify#\n\x00\x00\x00123456test", b"RESidentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDinfo#\x00\x00\x00\x00", b"RESinfo#\x00\x00\x00\x00\x04\x00\x00\x00info", b"CMDstats#\x00\x00\x00\x00", b"RESstats#\x00\x00\x00\x00\x05\x00\x00\x00stats", b"CMDauthentify#\n\x00\x00\x00123456test", b"RESauthentify#\x00\x00\x00\x00\x00\x00\x00\x00", b"CMDdecrypt#\x06\x00\x00\x00abcdef", b"RESdecrypt#\x00\x00\x00\x00\x06\x00\x00\x00abcdef", b"CMDbye#\x00\x00\x00\x00", b"RESbye#\x00\x00\x00\x00\x00\x00\x00\x00", ] messages_session1 = [make_msg(r) for r in session1_raw] messages_session2 = [make_msg(r) for r in session2_raw] messages_session3 = [make_msg(r) for r in session3_raw] messages = messages_session1 + messages_session2 print("=" * 60) print("Step 1: Raw messages") print("=" * 60) for m in messages: print(repr(m.data)) # --------------------------------------------------------------------------- # 2. Split by '#' delimiter # --------------------------------------------------------------------------- symbol = Symbol(messages=messages) Format.splitDelimiter(symbol, ASCII("#")) print("\n" + "=" * 60) print("Step 2: After splitDelimiter('#')") print("=" * 60) print(symbol) # --------------------------------------------------------------------------- # 3. Cluster by key field (first field = command name) # --------------------------------------------------------------------------- symbols = Format.clusterByKeyField(symbol, symbol.fields[0]) print("\n" + "=" * 60) print("Step 3: Symbols after clusterByKeyField") print("=" * 60) print(f"Number of symbols: {len(symbols)}") for name in sorted(symbols.keys()): print(f" * {name}") # --------------------------------------------------------------------------- # 4. Sequence alignment on payload field (field[2]) # --------------------------------------------------------------------------- print("\n" + "=" * 60) print("Step 4: Sequence alignment on payload field") print("=" * 60) for name, sym in symbols.items(): if len(sym.fields) >= 3: Format.splitAligned(sym.fields[2], doInternalSlick=True) print(f"\n[{name}]") print(sym) # --------------------------------------------------------------------------- # 5. Find and apply size relations # --------------------------------------------------------------------------- print("\n" + "=" * 60) print("Step 5: Find and apply size relations") print("=" * 60) for name, sym in symbols.items(): rels = RelationFinder.findOnSymbol(sym) if rels: print(f"\n[{name}] Relations found:") for rel in rels: print(f" {rel['relation_type']}: '{rel['x_attribute']}' <-> '{rel['y_attribute']}'") # Apply first relation rels[0]["x_fields"][0].domain = Size(rels[0]["y_fields"], factor=1/8.0) print("\n[CMDencrypt] structure after applying Size relation:") if "CMDencrypt" in symbols: print(symbols["CMDencrypt"]._str_debug()) # --------------------------------------------------------------------------- # 6. Generate automata # --------------------------------------------------------------------------- sym_list = list(symbols.values()) print("\n" + "=" * 60) print("Step 6a: Chained states automaton (session1)") print("=" * 60) session1 = Session(messages_session1) abstract1 = session1.abstract(sym_list) automata_chained = Automata.generateChainedStatesAutomata(abstract1, sym_list) print(automata_chained.generateDotCode()) print("\n" + "=" * 60) print("Step 6b: One-state automaton (session1)") print("=" * 60) automata_one = Automata.generateOneStateAutomata(abstract1, sym_list) print(automata_one.generateDotCode()) print("\n" + "=" * 60) print("Step 6c: PTA automaton (session1 + session3)") print("=" * 60) session3 = Session(messages_session3) abstract3 = session3.abstract(sym_list) automata_pta = Automata.generatePTAAutomata([abstract1, abstract3], sym_list) print(automata_pta.generateDotCode()) print("\n" + "=" * 60) print("Done. To visualize dot output: pipe to 'dot -Tsvg -o out.svg'") print("=" * 60)