Files
CGA-bench/frontend/log_capture.py
2026-05-22 10:02:42 +08:00

75 lines
2.1 KiB
Python

import queue
import threading
from typing import Optional
class LogCapture:
"""日志捕获队列,用于实时日志传输"""
def __init__(self):
self.queue: queue.Queue = queue.Queue()
self._stop_event: Optional[threading.Event] = None
def set_stop_event(self, event: threading.Event):
"""设置停止事件"""
self._stop_event = event
def write(self, msg: str):
"""写入日志消息到队列"""
self.queue.put(msg)
def read_all(self) -> str:
"""读取所有待处理的日志消息"""
lines = []
while True:
try:
line = self.queue.get_nowait()
lines.append(line)
except queue.Empty:
break
return "".join(lines)
def is_stopped(self) -> bool:
"""检查是否已停止"""
if self._stop_event is None:
return False
return self._stop_event.is_set()
def clear(self):
"""清空队列"""
while not self.queue.empty():
try:
self.queue.get_nowait()
except queue.Empty:
break
class LogReader:
"""日志读取器,后台线程持续读取日志"""
def __init__(self, process, output_queue: queue.Queue, stop_event: threading.Event):
self.process = process
self.output_queue = output_queue
self.stop_event = stop_event
self._thread: Optional[threading.Thread] = None
def start(self):
"""启动日志读取线程"""
self._thread = threading.Thread(target=self._read_loop, daemon=True)
self._thread.start()
def _read_loop(self):
"""持续读取进程输出直到停止"""
if self.process.stdout is None:
return
for line in iter(self.process.stdout.readline, ''):
if self.stop_event.is_set():
break
if line:
self.output_queue.put(line)
def join(self, timeout: float = None):
"""等待日志读取线程结束"""
if self._thread is not None:
self._thread.join(timeout=timeout)