Files
CCSDS_study/test/Tianwen-1-frame-analysis-v2.py
2026-05-05 21:54:35 +08:00

868 lines
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Tianwen-1 frame analysis notebook converted to a Python script.
本脚本由 `Tianwen-1 frame analysis.ipynb` 转换而来,保留原 notebook
的执行顺序,并把原 Markdown 标题和交互式观察步骤整理为注释。
脚本主要功能:
1. 读取 2020-07-30 的天问一号 AOS 帧二进制数据。
2. 使用本目录下的 `ccsds.py` 解析 AOS 主头、插入区和 Space Packet。
3. 按 spacecraft ID、virtual channel 和 APID 对数据分组。
4. 绘制帧计数、时间戳、载荷字节和若干遥测通道的变化曲线。
注意:
- 原 notebook 假设当前工作目录就是 `Tianwen/`。本脚本会自动切换到
文件所在目录,以便相对路径 `tianwen1_frames_20200730.u8` 能正常工作。
- notebook 中单独显示表达式结果的单元,在脚本中改为 `print(...)`。
- 绘图默认使用 matplotlib 当前后端;如果在无图形界面环境运行,可在
执行前设置 `MPLBACKEND=Agg`。
"""
# sys 用来修改 Python 的模块搜索路径。脚本在 test/ 目录下,
# 但要导入 Tianwen/ 目录里的模块,所以需要把项目根目录加进去。
import sys
# pathlib.Path 是处理文件路径的标准库工具。这里取别名 PathlibPath
# 是为了避免后面 `from construct import *` 导入的 Path 把它覆盖掉。
from pathlib import Path as PathlibPath
# os 用来切换当前工作目录、设置环境变量。
import os
# re 是正则表达式库,用来把图标题清洗成安全的文件名。
import re
# 获取项目根目录。当前文件在 test/ 下,所以 parent.parent 就是 CCSDS_study/。
PROJECT_ROOT = PathlibPath(__file__).resolve().parent.parent
# 把项目根目录加入 sys.path 后Python 才能通过 `import Tianwen.xxx`
# 找到 /home/zjz/CCSDS_study/Tianwen 这个包目录。
sys.path.append(str(PROJECT_ROOT))
# collections.Counter 可以方便地统计某个字段不同取值出现了多少次。
import collections
# struct 用来按指定字节序把 bytes 解析成整数;这里用于解析 packet 内部时间戳。
import struct
# Matplotlib 会写缓存文件。WSL 或某些 sandbox 环境下用户配置目录可能不可写,
# 因此把缓存目录放到 /tmp/matplotlib避免启动时出现权限警告。
os.environ.setdefault("MPLCONFIGDIR", "/tmp/matplotlib")
# 先导入 matplotlib 主模块,设置后端后再导入 pyplot。
import matplotlib
# Agg 是非交互式绘图后端,只负责把图片画到文件中,不需要 GUI 窗口。
# 这对 WSL 很重要,因为 WSL 下直接 plt.show() 经常会因为没有显示服务而报错。
matplotlib.use("Agg")
# pyplot 是 matplotlib 最常用的画图接口,习惯上命名为 plt。
import matplotlib.pyplot as plt
# 脚本会创建很多张图。这个参数关闭“打开超过 20 个 figure”的提示
# 因为这里的大量 figure 是预期行为,最后会统一保存并关闭。
plt.rcParams["figure.max_open_warning"] = 0
# numpy 用于高效读取二进制文件、重塑数组、计算差分和处理时间序列。
import numpy as np
# scipy.signal 是 notebook 原始导入;当前脚本没有直接使用,保留它是为了尽量
# 与原 notebook 环境一致,后续如果扩展信号处理步骤可直接使用。
import scipy.signal # 保留 notebook 原始导入;当前脚本未显式调用。
# construct 是解析二进制协议结构的库。ccsds.py 中的帧结构定义依赖它。
# 这里使用星号导入是 notebook 原始写法;新项目中通常更建议显式导入需要的名字。
from construct import * # noqa: F401,F403 - ccsds 解析结构依赖 construct 风格对象。
# ccsds.py 定义了 AOSFrame、SpacePacketPrimaryHeader 和 Space Packet 重组逻辑。
import Tianwen.ccsds as ccsds
# tianwen1_tm.py 定义了天问一号时间戳到 numpy.datetime64 的转换函数。
import Tianwen.tianwen1_tm as tianwen1_tm
# `construct import *` 会导入名为 Path 的对象,因此 pathlib.Path 必须使用别名。
# notebook 使用相对路径读取 Tianwen 目录下的数据文件;脚本位于 test/ 目录时,
# 运行前需要切换到真正的数据目录。
DATA_DIR = PROJECT_ROOT / "Tianwen"
# 所有 PNG 图片统一保存到这个目录,避免和源码文件混在一起。
OUTPUT_DIR = DATA_DIR / "frame_analysis_figures"
def safe_filename(text, max_length=120):
"""把图标题转换成适合作为文件名的短字符串。"""
# 去掉标题首尾空白,并统一转成小写,让文件名风格一致。
text = text.strip().lower()
# 把不适合放进文件名的字符替换为下划线。
text = re.sub(r"[^0-9a-zA-Z._() -]+", "_", text)
# 把连续空白压缩成一个下划线。
text = re.sub(r"\s+", "_", text)
# 去掉文件名开头/结尾的点、下划线、短横线和空格。
text = text.strip("._- ")
# 如果标题为空,就用 untitled再截断到最大长度避免文件名过长。
return (text or "untitled")[:max_length]
def save_all_figures(output_dir):
"""保存当前 matplotlib 会话中的所有 figure。
文件名格式为 `figure_序号_标题.png`。标题来自当前坐标轴的 title
如果没有标题,则使用 `untitled`。保存后关闭 figure降低长脚本的内存占用。
"""
output_dir.mkdir(parents=True, exist_ok=True)
# saved_paths 用来记录保存出的每一个 PNG 路径,方便最后返回或调试。
saved_paths = []
# plt.get_fignums() 返回当前还打开着的所有 figure 编号。
for figure_number in plt.get_fignums():
# 根据编号取回 figure 对象。
fig = plt.figure(figure_number)
# 一个 figure 里可以有一个或多个坐标轴 axes。
axes = fig.get_axes()
# 默认标题为空;后面会尝试从坐标轴里找第一个非空标题。
title = ""
# 遍历坐标轴,拿到用于生成文件名的图标题。
for ax in axes:
title = ax.get_title().strip()
if title:
break
# 文件名前缀使用 figure 序号,保证即使标题重复也不会互相覆盖。
filename = f"figure_{figure_number:03d}_{safe_filename(title)}.png"
# 拼出完整保存路径。
path = output_dir / filename
# 真正保存图片。dpi 控制分辨率bbox_inches='tight' 尽量裁掉多余白边。
fig.savefig(path, dpi=150, bbox_inches="tight")
# 记录保存路径。
saved_paths.append(path)
# 保存后关闭 figure释放内存。
plt.close(fig)
# 打印保存数量和目录,方便运行脚本后立刻知道去哪里看图片。
print(f"Saved {len(saved_paths)} figures to {output_dir}")
# 返回路径列表,便于以后如果把 main 拆成函数时继续使用。
return saved_paths
def get_packet(p):
"""返回 Space Packet 原始字节。
`ccsds.extract_space_packets(..., get_timestamps=True)` 返回
`(packet_bytes, timestamp)` 二元组;不带时间戳时只返回 packet bytes。
这个辅助函数屏蔽两种返回格式差异。
"""
# 如果 p 是 tuple说明它是 `(packet_bytes, timestamp)`
# 如果不是 tuple说明它本身就是 packet bytes。
return p[0] if type(p) is tuple else p
def packets_asarray(packets):
"""把 Space Packet 载荷转换为二维 uint8 数组。
每个 Space Packet 的前部是 CCSDS Space Packet Primary Header。
本函数跳过主头,只保留后续用户数据区,便于直接按字节或按数值类型
重解释为遥测通道矩阵。
"""
# 最终返回一个二维数组:行是 packet列是 packet 载荷中的字节位置。
return np.array(
[
np.frombuffer(
# get_packet(p) 先拿到原始 packet bytes
# 再跳过 Space Packet Primary Header只保留用户数据区。
get_packet(p)[ccsds.SpacePacketPrimaryHeader.sizeof() :],
"uint8",
)
for p in packets
]
)
def plot_apids(apids, sc, vc):
"""按 APID 绘制 Space Packet 载荷热力图。
横轴对应载荷字节位置,纵轴对应同一 APID 下的 packet 序号。这个图常用于
观察哪些字段固定、哪些字段随时间变化,以及是否存在明显的分段结构。
"""
for apid in sorted(apids.keys()):
# 每个 APID 单独生成一张图。
plt.figure(figsize=(16, 16), facecolor="w")
# 把这个 APID 下所有 packet 载荷堆叠成二维数组。
ps = packets_asarray(apids[apid])
# imshow 把二维字节数组画成热力图,颜色代表字节值。
plt.imshow(ps, aspect=ps.shape[1] / ps.shape[0])
# 标题写清楚 APID、航天器 ID 和虚拟信道 ID保存成文件后也容易辨认。
plt.title(f"APID {apid} Spacecraft {sc} Virtual channel {vc}")
def get_timestamps(aos):
"""从 AOS 帧插入区提取时间戳,并转换为 numpy.datetime64。"""
# a.insert_zone.timestamp 是 AOS 帧插入区中的原始时间戳字段。
return np.array(
[tianwen1_tm.parse_timestamp_datetime64(a.insert_zone.timestamp) for a in aos]
)
def get_packet_timestamps(packets):
"""从 `(packet, timestamp)` 形式的 Space Packet 列表提取时间戳。"""
# packets 中的 p[1] 是 extract_space_packets(..., get_timestamps=True)
# 附带返回的时间戳。
return np.array([tianwen1_tm.parse_timestamp_datetime64(p[1]) for p in packets])
def main():
"""按原 notebook 顺序执行完整分析流程。"""
# 切换到 Tianwen/ 数据目录后np.fromfile("tianwen1_frames_20200730.u8")
# 这种相对路径才能正确找到数据文件。
os.chdir(DATA_DIR)
# -------------------------------------------------------------------------
# 读取原始 AOS 帧数据
# -------------------------------------------------------------------------
# 数据文件由连续的 220 字节帧组成。若文件末尾存在不足一帧的残留字节,
# 先截断到完整帧边界,再 reshape 成 `帧数 x 220` 的二维数组。
# 每个 AOS 帧固定 220 字节,这是后面 reshape 的依据。
frame_size = 220
# 从二进制文件中一次性读取所有字节dtype='uint8' 表示每个元素是 0-255 的字节。
frames = np.fromfile("tianwen1_frames_20200730.u8", dtype="uint8")
# frames.size // frame_size 得到完整帧数量。
# 乘回 frame_size 后进行切片,可以丢弃文件末尾不满 220 字节的残余数据。
# reshape((-1, frame_size)) 把一维字节流变成二维表:每一行是一帧。
frames = frames[: frames.size // frame_size * frame_size].reshape((-1, frame_size))
# 打印总帧数,方便确认数据是否读取完整。
print("Number of frames:", frames.shape[0])
# -------------------------------------------------------------------------
# AOS headers
# -------------------------------------------------------------------------
# 解析所有 AOS 帧,得到 primary header、insert zone 等结构化字段。
# f 是一行 220 字节的 numpy 数组ccsds.AOSFrame.parse 会按 CCSDS AOS
# 帧格式把它解释为带字段名的 Container。
aos = [ccsds.AOSFrame.parse(f) for f in frames]
# 统计 AOS transfer frame version number。主流值应集中在有效版本上
# 零散异常值通常来自错误同步、误帧或数据噪声。
print(
"Transfer frame versions:",
# Counter 会返回类似 Counter({1: 111790, 2: 147}) 的统计结果。
collections.Counter([a.primary_header.transfer_frame_version_number for a in aos]),
)
# 统计 spacecraft ID确认捕获数据中主要包含哪些航天器标识。
print(
"Spacecraft IDs:",
# spacecraft_id 用来区分不同航天器或不同数据源。
collections.Counter([a.primary_header.spacecraft_id for a in aos]),
)
# 分别观察 spacecraft 245 和 82 下出现的 virtual channel。
print(
"SC 245 virtual channels:",
collections.Counter(
[
# virtual_channel_id 是同一 spacecraft 下的数据通道编号。
a.primary_header.virtual_channel_id
for a in aos
# 这里只统计 spacecraft_id 为 245 的帧。
if a.primary_header.spacecraft_id == 245
]
),
)
print(
"SC 82 virtual channels:",
collections.Counter(
[
a.primary_header.virtual_channel_id
for a in aos
# 这里只统计 spacecraft_id 为 82 的帧。
if a.primary_header.spacecraft_id == 82
]
),
)
# 打印前 20 帧的 `(spacecraft_id, virtual_channel_id)` 序列,观察复用模式。
print(
"First 20 SC/VC pairs:",
[
# 每个 tuple 展示一帧的 spacecraft 和 virtual channel。
(a.primary_header.spacecraft_id, a.primary_header.virtual_channel_id)
for a in aos[:20]
],
)
# -------------------------------------------------------------------------
# Spacecraft 82 Virtual channel 1
# -------------------------------------------------------------------------
# 取出 SC 82 / VC 1 的帧,检查帧计数连续性和插入区字段。
print(
"SC 82 VC 1 first headers:",
# 先打印前 10 个 header人工检查字段是否看起来合理。
[a.primary_header for a in aos if a.primary_header.spacecraft_id == 82][:10],
)
# 过滤出 spacecraft_id=82 且 virtual_channel_id=1 的所有 AOS 帧。
sc82_vc1 = [
a
for a in aos
if a.primary_header.spacecraft_id == 82
and a.primary_header.virtual_channel_id == 1
]
# 提取虚拟信道帧计数。正常情况下相邻帧计数应该连续递增。
fc = np.array([a.primary_header.virtual_channel_frame_count for a in sc82_vc1])
# 新建一张图,用来画帧丢失情况。
plt.figure(figsize=(12, 6), facecolor="w")
# np.diff(fc) 是相邻帧计数差;如果连续递增,差值应为 1。
# 减 1 后0 表示未丢帧,正数表示中间可能缺了多少帧。
plt.plot(np.diff(fc) - 1)
# 图标题和坐标轴标签会显示在图片上,也会参与生成保存文件名。
plt.title("Spacecraft 82 Virtual channel 1 frame loss")
plt.ylabel("Lost frames")
plt.xlabel("Frame number")
print(
"SC 82 VC 1 first insert-zone timestamps:",
# hex(...) 把整数时间戳打印成十六进制,方便和二进制协议字段对照。
[hex(a.insert_zone.timestamp) for a in sc82_vc1][:10],
)
print(
"SC 82 VC 1 insert-zone unknown values:",
# set 推导式会去重,用来查看 unknown 字段是否固定。
{hex(a.insert_zone.unknown) for a in sc82_vc1},
)
# 从该虚拟信道重组 Space Packet并按 APID 分组。
# AOS 帧里承载的是 Space Packet 数据流extract_space_packets 会把跨帧的
# Space Packet 重新拼出来。
sc82_vc1_packets = list(ccsds.extract_space_packets(sc82_vc1, 82, 1))
# 每个 Space Packet 开头都有 Primary Header这里解析这些 header。
sc82_vc1_sp_headers = [
ccsds.SpacePacketPrimaryHeader.parse(p) for p in sc82_vc1_packets
]
# APID 是 Space Packet 的应用进程标识。统计 APID 可以知道有哪些数据类型。
sc82_vc1_apids = collections.Counter([p.APID for p in sc82_vc1_sp_headers])
print("SC 82 VC 1 APIDs:", sc82_vc1_apids)
# 构造字典key 是 APIDvalue 是属于该 APID 的 packet 列表。
sc82_vc1_by_apid = {
apid: [
p
# zip 把 header 和 packet 一一配对,便于按 header.APID 筛选 packet。
for h, p in zip(sc82_vc1_sp_headers, sc82_vc1_packets)
if h.APID == apid
]
for apid in sc82_vc1_apids
}
# 为每个 APID 保存一张载荷热力图。
plot_apids(sc82_vc1_by_apid, 82, 1)
# -------------------------------------------------------------------------
# Spacecraft 245 Virtual channel 3
# -------------------------------------------------------------------------
# SC 245 / VC 3 似乎包含固定或填充类数据。这里检查帧计数和若干字节区间。
# 过滤出 spacecraft_id=245 且 virtual_channel_id=3 的帧。
sc245_vc3 = [
a
for a in aos
if a.primary_header.spacecraft_id == 245
and a.primary_header.virtual_channel_id == 3
]
# 提取 VC 3 的帧计数,用于检查是否连续。
fc = np.array([a.primary_header.virtual_channel_frame_count for a in sc245_vc3])
# 绘制 VC 3 的疑似丢帧情况。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(np.diff(fc) - 1)
plt.title("Spacecraft 245 Virtual channel 3 frame loss")
plt.ylabel("Lost frames")
plt.xlabel("Frame number")
print("SC 245 VC 3 first headers:", [a.primary_header for a in sc245_vc3[:10]])
# 这里需要原始字节帧,而不是解析后的 aos Container所以从 frames 和 aos
# 同时遍历,筛选出 SC 245 / VC 3 对应的原始 220 字节帧。
sc245_vc3_frames = np.array(
[
f
for f, a in zip(frames, aos)
if a.primary_header.spacecraft_id == 245
and a.primary_header.virtual_channel_id == 3
]
)
# np.unique 查看某些字节范围里出现过哪些不同值。
# 如果只出现一个值,说明该字段可能是固定填充。
print("SC 245 VC 3 unique bytes 5-12:", np.unique(sc245_vc3_frames[:, 5:12]))
print("hex(170):", hex(170))
print("SC 245 VC 3 unique byte 12:", np.unique(sc245_vc3_frames[:, 12]))
print("hex(138):", hex(138))
print("SC 245 VC 3 unique bytes 13-end:", np.unique(sc245_vc3_frames[:, 13:]))
print("SC 245 VC 3 insert-zone unknown:", hex(sc245_vc3[0].insert_zone.unknown))
# -------------------------------------------------------------------------
# Spacecraft 245 Virtual channel 1
# -------------------------------------------------------------------------
# SC 245 / VC 1 是后续 APID 分析的主要数据源。
# 过滤出主要遥测通道 SC 245 / VC 1。
sc245_vc1 = [
a
for a in aos
if a.primary_header.spacecraft_id == 245
and a.primary_header.virtual_channel_id == 1
]
# 提取该虚拟信道的帧计数。
fc = np.array([a.primary_header.virtual_channel_frame_count for a in sc245_vc1])
# 绘制该通道的帧丢失情况。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(np.diff(fc) - 1)
plt.title("Spacecraft 245 Virtual channel 1 frame loss")
plt.ylabel("Lost frames")
plt.xlabel("Frame number")
print("SC 245 VC 1 first headers:", [a.primary_header for a in sc245_vc1[:10]])
# 从 AOS 插入区中取出时间戳,后续画“帧计数随时间变化”的散点图。
sc245_vc1_timestamps = get_timestamps(sc245_vc1)
# 画帧计数随时间的变化。横轴是真实时间,纵轴是 virtual channel frame count。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(sc245_vc1_timestamps, fc, ".")
plt.title("Spacecraft 245 Virtual channel 1")
plt.xlabel("Timestamp")
plt.ylabel("Virtual channel frame count")
plt.grid()
print(
"SC 245 VC 1 insert-zone unknown values:",
{hex(a.insert_zone.unknown) for a in sc245_vc1},
)
# 重组 SC 245 / VC 1 中的 Space Packet并保留 AOS 帧插入区时间戳。
# get_timestamps=True 表示每个 packet 会附带其所在 AOS 帧的时间戳。
sc245_vc1_packets = list(
ccsds.extract_space_packets(sc245_vc1, 245, 1, get_timestamps=True)
)
# 因为 packet 现在是 `(packet_bytes, timestamp)`,所以解析 header 时使用 p[0]。
sc245_vc1_sp_headers = [
ccsds.SpacePacketPrimaryHeader.parse(p[0]) for p in sc245_vc1_packets
]
# 统计主遥测通道中每个 APID 出现的 packet 数量。
sc245_vc1_apids = collections.Counter([p.APID for p in sc245_vc1_sp_headers])
print("SC 245 VC 1 APIDs:", sc245_vc1_apids)
# 按 APID 对 packet 分桶,后续每个 APID 单独分析。
sc245_vc1_by_apid = {
apid: [
p
for h, p in zip(sc245_vc1_sp_headers, sc245_vc1_packets)
if h.APID == apid
]
for apid in sc245_vc1_apids
}
# 为 SC 245 / VC 1 的每个 APID 生成一张载荷热力图。
plot_apids(sc245_vc1_by_apid, 245, 1)
# -------------------------------------------------------------------------
# APID 1280
# -------------------------------------------------------------------------
# 将 APID 1280 的载荷按大端 int16 解释,观察前 12 字节的遥测量。
# packets_asarray(...) 得到 uint8 字节矩阵view("int16") 把每两个字节看成
# 一个 16 位整数byteswap() 用来处理大端/小端字节序差异。
channels = packets_asarray(sc245_vc1_by_apid[1280]).view("int16").byteswap()
# 取出每个 packet 对应的 AOS 帧时间戳,作为绘图横轴。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[1280])
# 绘制 APID 1280 前 6 个 int16 通道,也就是原始载荷 bytes 0-12。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, :6])
plt.title("APID 1280 Spacecraft 245 Virtual channel 1 bytes 0-12 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(0, 12, 2)])
# 第二张图只看局部区间,便于放大观察某段时间内的细节变化。
plt.figure(figsize=(14, 6), facecolor="w")
# slice(850, 1200) 表示取第 850 到第 1199 个样本。
sel = slice(850, 1200)
plt.plot(timestamps[sel], channels[sel, :6])
plt.title("APID 1280 Spacecraft 245 Virtual channel 1 bytes 0-12 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(0, 12, 2)])
# -------------------------------------------------------------------------
# APID 1281
# -------------------------------------------------------------------------
# APID 1281 同样按大端 int16 解释,分段查看 18-36 字节。
# channels 的行是 packet 序号,列是 int16 通道序号。
channels = packets_asarray(sc245_vc1_by_apid[1281]).view("int16").byteswap()
# timestamps 和 channels 的行一一对应。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[1281])
# 查看 int16 通道 9 到 14对应字节范围 18-30。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, 9:15])
plt.title("APID 1281 Spacecraft 245 Virtual channel 1 bytes 18-30 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(9 * 2, 15 * 2, 2)])
# 放大 APID 1281 的一小段样本。
plt.figure(figsize=(12, 6), facecolor="w")
sel = slice(400, 600)
plt.plot(timestamps[sel], channels[sel, 9:15])
plt.title("APID 1281 Spacecraft 245 Virtual channel 1 bytes 18-30 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(9 * 2, 15 * 2, 2)])
# 查看 int16 通道 15 到 17对应字节范围 30-36。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, 15:18])
plt.title("APID 1281 Spacecraft 245 Virtual channel 1 bytes 30-36 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(2 * 15, 18 * 2, 2)])
# 同样对 bytes 30-36 的局部区间放大观察。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps[sel], channels[sel, 15:18])
plt.title("APID 1281 Spacecraft 245 Virtual channel 1 bytes 30-36 (int16)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(2 * 15, 18 * 2, 2)])
# -------------------------------------------------------------------------
# APID 1288
# -------------------------------------------------------------------------
# APID 1288 的载荷偏移 80 后,前 24 字节按三个大端 float64 解释。
# notebook 中把最大时间间隔对应行置为 NaN用于清理明显观测间断。
# [:, 80:] 表示丢掉载荷前 80 字节;[:, : 3 * 8] 表示再取 24 字节。
# 24 字节正好可以看成 3 个 float64。
channels = (
np.copy(packets_asarray(sc245_vc1_by_apid[1288])[:, 80:][:, : 3 * 8])
.view("float64")
.byteswap()
)
# APID 1288 的每个 packet 时间戳。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[1288])
# np.diff(timestamps) 找相邻样本的时间间隔argmax 找最大间隔位置。
# 最大间隔可能代表观测中断,因此把该点设成 NaN绘图时会断开。
channels[np.argmax(np.diff(timestamps)), :] = np.nan
# 绘制三个 float64 通道的原始变化。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels)
plt.title("APID 1288 Spacecraft 245 Virtual channel 1 bytes 80-104 (float64)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (float64)")
plt.legend([f"Bytes {j}-{j + 8}" for j in range(80, 104, 8)])
plt.grid()
# 对每个 float64 通道拟合 1 阶和 2 阶多项式,并绘制去趋势残差。
# 去趋势的目的:把总体平滑变化减掉,只看剩余的小幅波动。
for deg in [1, 2]:
# 每个多项式阶数单独画一张图。
plt.figure(figsize=(12, 6), facecolor="w")
# channels.shape[1] 是通道数量,这里通常是 3。
for j in range(channels.shape[1]):
# 把 datetime64 转成“相对起点的秒数”polyfit 需要普通数值横轴。
s = (timestamps - timestamps[0]) / np.timedelta64(1, "s")
# 过滤 NaN 点,否则 polyfit 无法拟合。
sel = ~np.isnan(channels[:, j])
# np.polyfit 拟合 deg 阶多项式,返回多项式系数。
p = np.polyfit(s[sel], channels[sel, j], deg)
# np.polyval 计算拟合曲线;原始值减拟合曲线就是残差。
plt.plot(timestamps, channels[:, j] - np.polyval(p, s))
plt.title("APID 1288 Spacecraft 245 Virtual channel 1 bytes 80-104 (float64)")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (float64)")
plt.legend(
[
f"Bytes {j}-{j + 8} minus degree {deg} polynomial"
for j in range(80, 104, 8)
]
)
plt.grid()
# -------------------------------------------------------------------------
# APID 1287
# -------------------------------------------------------------------------
# APID 1287 的载荷偏移 66 后,前 24 字节按三个大端 float64 解释。
# 另外从 packet 内部字节 18 起取 6 字节时间字段,补两个高位零后按大端
# uint64 解析,与 AOS 插入区时间戳比较。
# 这里的处理方式和 APID 1288 类似,只是 float64 起始偏移变成 66。
channels = (
np.copy(packets_asarray(sc245_vc1_by_apid[1287])[:, 66:][:, : 3 * 8])
.view("float64")
.byteswap()
)
# AOS 帧插入区提供的时间戳。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[1287])
# Space Packet 自身载荷中也有一个 6 字节时间字段。
timestamps_packet = np.array(
[
tianwen1_tm.parse_timestamp_datetime64(
# b"\x00\x00" + 6 字节时间字段 = 8 字节,才能用 >Q 解析为 uint64。
struct.unpack(">Q", b"\x00\x00" + p[0][18:][:6])[0]
)
for p in sc245_vc1_by_apid[1287]
]
)
# 清理最大时间间隔处的样本,避免图中出现不合理连接。
channels[np.argmax(np.diff(timestamps)), :] = np.nan
# 比较 AOS 帧时间戳和 Space Packet 内部时间戳之间的差值。
plt.figure(figsize=(12, 6), facecolor="w")
# 时间戳相减得到 timedelta64再除以 1 秒单位,得到浮点秒数。
t_diff = (timestamps - timestamps_packet) / np.timedelta64(1, "s")
# 同样在最大间隔处断开曲线。
t_diff[np.argmax(np.diff(timestamps))] = np.nan
plt.plot(timestamps, t_diff)
plt.title("APID 1287 Spacecraft 245 Virtual channel 1 timestamp difference")
plt.xlabel("AOS frame timestamp")
plt.ylabel("AOS frame timestamp - Space packet timestamp")
# 绘制 APID 1287 三个 float64 通道,横轴使用 packet 内部时间戳。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps_packet, channels)
plt.title("APID 1287 Spacecraft 245 Virtual channel 1 bytes 66-90 (float64)")
plt.xlabel("Space Packet frame timestamp")
plt.ylabel("Raw value (float64)")
plt.legend([f"Bytes {j}-{j + 8}" for j in range(66, 90, 8)])
plt.grid()
# 分别对全时段和 2020-07-31 03:00 前的数据做 1 阶、2 阶去趋势分析。
# cut 是满足 timestamp <= 2020-07-31T03:00 的最后一个索引。
cut = np.where(timestamps_packet <= np.datetime64("2020-07-31T03:00"))[0][-1]
# 第一个 slice(None, None) 表示全时段;第二个 slice(0, cut) 表示前半段。
for selt in [slice(None, None), slice(0, cut)]:
for deg in [1, 2]:
plt.figure(figsize=(12, 6), facecolor="w")
for j in range(channels.shape[1]):
# 使用当前选中时间段的起点作为 0 秒。
s = (
timestamps_packet[selt] - timestamps_packet[selt][0]
) / np.timedelta64(1, "s")
# 只用非 NaN 数据点参与拟合。
sel = ~np.isnan(channels[selt, j])
# 对当前通道和当前时间段拟合多项式。
p = np.polyfit(s[sel], channels[selt][sel, j], deg)
# 绘制去趋势后的残差。
plt.plot(timestamps_packet[selt], channels[selt, j] - np.polyval(p, s))
plt.title("APID 1287 Spacecraft 245 Virtual channel 1 bytes 66-90 (float64)")
plt.xlabel("Space Packet timestamp")
plt.ylabel("Raw value (float64)")
plt.legend(
[
f"Bytes {j}-{j + 8} minus degree {deg} polynomial"
for j in range(66, 90, 8)
]
)
plt.grid()
# -------------------------------------------------------------------------
# APIDs 2 and 3
# -------------------------------------------------------------------------
# 原 notebook 这里两个变量都从 APID 2 取数,保留原逻辑。若要比较 APID 2
# 与 APID 3可把 `sc245_vc1_by_apid[2]` 的第二处改为 `[3]`。
# channels2 是 APID 2 的 uint8 载荷矩阵。
channels2 = packets_asarray(sc245_vc1_by_apid[2])
# timestamps2 是 APID 2 每个 packet 对应的 AOS 时间戳。
timestamps2 = get_packet_timestamps(sc245_vc1_by_apid[2])
# 注意:这里原 notebook 仍然取 APID 2不是真正的 APID 3。
# 为了忠实保留原分析脚本,暂不改动。
channels3 = packets_asarray(sc245_vc1_by_apid[2])
# 同上,这里的 timestamps3 也来自 APID 2。
timestamps3 = get_packet_timestamps(sc245_vc1_by_apid[2])
# 对比 byte 0 的变化;'.-' 是点加线,'.' 是只画点。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps2, channels2[:, 0], ".-", label="APID 2 byte 0")
plt.plot(timestamps3, channels3[:, 0], ".", label="APID 3 byte 0")
plt.title("APIDs 2 and 3 Spacecraft 245 Virtual channel 1 byte 0")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.legend()
plt.grid()
# 对比 byte 2 的变化。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps2, channels2[:, 2], ".-", label="APID 2 byte 2")
plt.plot(timestamps3, channels3[:, 2], ".", label="APID 3 byte 2")
plt.title("APIDs 2 and 3 Spacecraft 245 Virtual channel 1 byte 2")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.legend()
plt.grid()
# 只取局部样本,放大查看 byte 4 的变化。
plt.figure(figsize=(12, 6), facecolor="w")
# 从第 14350 个样本开始取 400 个样本。
sel = slice(14350, 14350 + 400)
plt.plot(timestamps2[sel], channels2[sel, 4], ".-", label="APID 2 byte 4")
plt.plot(timestamps3[sel], channels3[sel, 4], ".", label="APID 3 byte 4")
plt.title("APIDs 2 and 3 Spacecraft 245 Virtual channel 1 byte 4")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.legend()
plt.grid()
# 使用同一个局部范围,查看 byte 8 的变化。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps2[sel], channels2[sel, 8], ".-", label="APID 2 byte 8")
plt.plot(timestamps3[sel], channels3[sel, 8], ".", label="APID 3 byte 8")
plt.title("APIDs 2 and 3 Spacecraft 245 Virtual channel 1 byte 8")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.legend()
plt.grid()
# -------------------------------------------------------------------------
# APID 454
# -------------------------------------------------------------------------
# APID 454 按 uint8 查看,重点观察 byte 0 以及 byte 4-7、18-20。
# 这里没有 view 成 int16/float64说明按单字节原始值观察。
channels = packets_asarray(sc245_vc1_by_apid[454])
# APID 454 的 packet 时间戳。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[454])
# 放大一小段样本,查看 byte 0。
plt.figure(figsize=(12, 6), facecolor="w")
sel = slice(1780, 1850)
plt.plot(timestamps[sel], channels[sel, 0], ".-")
plt.title("APID 454 Spacecraft 245 Virtual channel 1 byte 0")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.grid()
# 在同一张图中画 byte 4-6 和 byte 18-19比较这些字段是否同步变化。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps[sel], channels[sel, 4:7], ".-")
plt.plot(timestamps[sel], channels[sel, 18:20], ".-")
plt.title("APID 454 Spacecraft 245 Virtual channel 1 bytes 4-7, 18-20")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (uint8)")
plt.legend([f"Byte {j}" for j in list(range(4, 7)) + list(range(18, 20))])
plt.grid()
# -------------------------------------------------------------------------
# APID 462
# -------------------------------------------------------------------------
# APID 462 按大端 int16 解释,观察前 14 字节。
# 前 14 字节对应 7 个 int16 通道。
channels = packets_asarray(sc245_vc1_by_apid[462]).view("int16").byteswap()
# APID 462 的时间戳。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[462])
# 绘制 7 个 int16 通道的全时段变化。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, :7])
plt.title("APID 462 Spacecraft 245 Virtual channel 1 bytes 0-14")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.legend([f"Bytes {j} and {j + 1}" for j in range(0, 14, 2)])
plt.grid()
# -------------------------------------------------------------------------
# APID 1536
# -------------------------------------------------------------------------
# APID 1536 按大端 int16 解释,分别观察 bytes 18-20 和 20-22。
# int16 通道 9 对应 bytes 18-20通道 10 对应 bytes 20-22。
channels = packets_asarray(sc245_vc1_by_apid[1536]).view("int16").byteswap()
# APID 1536 的时间戳。
timestamps = get_packet_timestamps(sc245_vc1_by_apid[1536])
# 绘制 int16 通道 9。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, 9])
plt.title("APID 1536 Spacecraft 245 Virtual channel 1 bytes 18-20")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.grid()
# 绘制 int16 通道 10。
plt.figure(figsize=(12, 6), facecolor="w")
plt.plot(timestamps, channels[:, 10])
plt.title("APID 1536 Spacecraft 245 Virtual channel 1 bytes 20-22")
plt.xlabel("AOS frame timestamp")
plt.ylabel("Raw value (int16)")
plt.grid()
# notebook 里依赖自动显示图像,但脚本在 WSL / 无显示环境下不应强制调用
# `plt.show()`,否则可能因为缺少图形界面后端而报错。这里统一用 savefig
# 保存所有 figure方便在 WSL 中直接查看生成的 PNG 文件。
# OUTPUT_DIR 是 Tianwen/frame_analysis_figures。
save_all_figures(OUTPUT_DIR)
# 只有直接运行这个脚本时才会执行 main()。
# 如果以后从别的 Python 文件 import 这个脚本main() 不会自动运行。
if __name__ == "__main__":
main()