""" Toy Protocol Inference - based on discover_features.rst tutorial Protocol structure: '#' <4-byte little-endian length> [] Steps covered: 1. Import messages from PCAP files 2. Split by '#' delimiter 3. Cluster by key field (command name) 4. Sequence alignment on payload field 5. Find and apply size relations 6. Generate automata (chained / one-state / PTA) """ import sys sys.path.insert(0, '/home/zjz/CCSDS_study/netzob-030/test/src') from netzob.all import * PCAP_DIR = "/home/zjz/CCSDS_study/netzob-030/test/resources/pcaps" # --------------------------------------------------------------------------- # 1. Import messages from PCAP files # --------------------------------------------------------------------------- messages_session1 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session1.pcap").values()) messages_session2 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session2.pcap").values()) messages_session3 = list(PCAPImporter.readFile(f"{PCAP_DIR}/target_src_v1_session3.pcap").values()) messages = messages_session1 + messages_session2 print("=" * 60) print("Step 1: Messages imported from PCAP") print("=" * 60) print(f"Session1: {len(messages_session1)} messages") print(f"Session2: {len(messages_session2)} messages") print(f"Session3: {len(messages_session3)} messages") for m in messages: print(m) # --------------------------------------------------------------------------- # 2. Split by '#' delimiter # --------------------------------------------------------------------------- symbol = Symbol(messages=messages) Format.splitDelimiter(symbol, String("#")) print("\n" + "=" * 60) print("Step 2: After splitDelimiter('#')") print("=" * 60) print(symbol) # --------------------------------------------------------------------------- # 3. Cluster by key field (first field = command name) # --------------------------------------------------------------------------- symbols = Format.clusterByKeyField(symbol, symbol.fields[0]) print("\n" + "=" * 60) print("Step 3: Symbols after clusterByKeyField") print("=" * 60) print(f"Number of symbols: {len(symbols)}") for name in sorted(symbols.keys()): print(f" * {name}") # --------------------------------------------------------------------------- # 4. Sequence alignment on payload field (field[2]) # --------------------------------------------------------------------------- print("\n" + "=" * 60) print("Step 4: Sequence alignment on payload field") print("=" * 60) for name, sym in symbols.items(): if len(sym.fields) >= 3: Format.splitAligned(sym.fields[2], doInternalSlick=True) print(f"\n[{name}]") print(sym) # --------------------------------------------------------------------------- # 5. Find and apply size relations # --------------------------------------------------------------------------- print("\n" + "=" * 60) print("Step 5: Find and apply size relations") print("=" * 60) for name, sym in symbols.items(): rels = RelationFinder.findOnSymbol(sym) if rels: print(f"\n[{name}] Relations found:") for rel in rels: print(f" {rel['relation_type']}: '{rel['x_attribute']}' <-> '{rel['y_attribute']}'") # Apply first relation rels[0]["x_fields"][0].domain = Size(rels[0]["y_fields"], factor=1/8.0) print("\n[CMDencrypt] structure after applying Size relation:") if "CMDencrypt" in symbols: print(symbols["CMDencrypt"]._str_debug()) # --------------------------------------------------------------------------- # 6. Generate automata # --------------------------------------------------------------------------- sym_list = list(symbols.values()) print("\n" + "=" * 60) print("Step 6a: Chained states automaton (session1)") print("=" * 60) session1 = Session(messages_session1) abstract1 = session1.abstract(sym_list) automata_chained = Automata.generateChainedStatesAutomata(abstract1, sym_list) print(automata_chained.generateDotCode()) print("\n" + "=" * 60) print("Step 6b: One-state automaton (session1)") print("=" * 60) automata_one = Automata.generateOneStateAutomata(abstract1, sym_list) print(automata_one.generateDotCode()) print("\n" + "=" * 60) print("Step 6c: PTA automaton (session1 + session3)") print("=" * 60) session3 = Session(messages_session3) abstract3 = session3.abstract(sym_list) automata_pta = Automata.generatePTAAutomata([abstract1, abstract3], sym_list) print(automata_pta.generateDotCode()) print("\n" + "=" * 60) print("Done. To visualize dot output: pipe to 'dot -Tsvg -o out.svg'") print("=" * 60)