Files
2026-04-13 11:34:23 +08:00

166 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 同步算法的实现和验证
# 算法说明:
# 1. 使用哈希表(defaultdict)建立content_hash到chunks的映射时间复杂度O(n)
# 2. 使用集合操作找到相同位置的chunks时间复杂度O(n)
# 3. 使用双指针法进行剩余chunks的匹配时间复杂度O(n)
# 总体时间复杂度: O(n)其中n为chunks的总数
# 空间复杂度: O(n),主要用于存储哈希表
from collections import defaultdict
from typing import TypedDict, List, Dict, Set
from dataclasses import dataclass
@dataclass
class Chunk:
index: int
content_hash: str
chunk_content: str
uuid: str = None
class SyncResult(TypedDict):
to_create: List[Dict]
to_update: List[Dict]
to_delete: List[str]
# 模拟后端的旧 chunks 数据
old_chunks = [
{'uuid': 'uuid_1', 'index': 0, 'content_hash': 'hash_A', 'chunk_content': '这是第一段。'},
{'uuid': 'uuid_2', 'index': 1, 'content_hash': 'hash_B', 'chunk_content': '这是第二段。'},
{'uuid': 'uuid_3', 'index': 2, 'content_hash': 'hash_C', 'chunk_content': '这是第三段。'},
{'uuid': 'uuid_4', 'index': 3, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
{'uuid': 'uuid_5', 'index': 4, 'content_hash': 'hash_E', 'chunk_content': '这是第五段。'},
]
# 模拟 GitHub Actions 生成的新 chunks 数据
new_chunks = [
{'index': 0, 'content_hash': 'hash_A', 'chunk_content': '这是第一段。'},
{'index': 1, 'content_hash': 'hash_C', 'chunk_content': '这是第三段。'},
{'index': 2, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
{'index': 3, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
{'index': 4, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
{'index': 5, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
{'index': 6, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
]
def synchronize_chunks(old_chunks: List[Dict], new_chunks: List[Dict]) -> SyncResult:
"""
基于 content_hash + index 的双指针匹配算法,查找需要新增、更新和删除的 chunks。
主要改进:
1. 对同一 content_hash 的旧、新 chunks分别按 index 排序,再逐个匹配,避免原先直接根据
“两两相同位置”导致重复 content_hash 时的混淆。
2. 保留了原先的距离阈值(distance <= threshold)判断,但逻辑更直观,减少漏匹或误判。
"""
# ========== 1. 输入验证 ==========
if not isinstance(old_chunks, list) or not isinstance(new_chunks, list):
raise TypeError("输入参数必须是列表类型")
required_fields = {'index', 'content_hash', 'chunk_content'}
for chunk in old_chunks:
if not required_fields.union({'uuid'}).issubset(chunk.keys()):
raise ValueError("旧chunks缺少必要字段")
for chunk in new_chunks:
if not required_fields.issubset(chunk.keys()):
raise ValueError("新chunks缺少必要字段")
# ========== 2. 构建 content_hash => chunks 的映射表,减少跨 content_hash 的错误匹配 ==========
old_chunks_by_hash = defaultdict(list)
for oc in old_chunks:
old_chunks_by_hash[oc['content_hash']].append(oc)
new_chunks_by_hash = defaultdict(list)
for nc in new_chunks:
new_chunks_by_hash[nc['content_hash']].append(nc)
# ========== 3. 遍历所有的 content_hash逐个匹配 ==========
to_create = []
to_update = []
to_delete = []
# “并”集获取所有出现过的 content_hash
all_hashes = set(old_chunks_by_hash.keys()) | set(new_chunks_by_hash.keys())
# 允许的更新距离阈值,可根据需要调大或调小
threshold = 10
for content_hash in all_hashes:
old_list = sorted(old_chunks_by_hash[content_hash], key=lambda x: x['index'])
new_list = sorted(new_chunks_by_hash[content_hash], key=lambda x: x['index'])
i, j = 0, 0
len_old, len_new = len(old_list), len(new_list)
while i < len_old and j < len_new:
old_entry = old_list[i]
new_entry = new_list[j]
distance = abs(old_entry['index'] - new_entry['index'])
# 如果索引相近,则判定为同一块内容,执行更新操作
if distance <= threshold:
to_update.append({
'uuid': old_entry['uuid'],
'index': new_entry['index'],
'content_hash': content_hash,
'chunk_content': new_entry['chunk_content']
})
i += 1
j += 1
# 如果旧 chunk.index 更小,说明它在新列表里没有合适的配对,需要删除
elif old_entry['index'] < new_entry['index']:
to_delete.append(old_entry['uuid'])
i += 1
# 否则,新 chunk.index 更小,说明这是新增加的块
else:
to_create.append({
'index': new_entry['index'],
'content_hash': content_hash,
'chunk_content': new_entry['chunk_content']
})
j += 1
# 把剩余的旧 chunks 视为需要删除
while i < len_old:
to_delete.append(old_list[i]['uuid'])
i += 1
# 把剩余的新 chunks 视为需要新增
while j < len_new:
to_create.append({
'index': new_list[j]['index'],
'content_hash': content_hash,
'chunk_content': new_list[j]['chunk_content']
})
j += 1
return {
'to_create': to_create,
'to_update': to_update,
'to_delete': to_delete
}
if __name__ == '__main__':
result = synchronize_chunks(old_chunks, new_chunks)
print("需要创建的 chunks:")
if result['to_create']:
for chunk in result['to_create']:
print(chunk)
else:
print("null")
print("\n需要更新的 chunks:")
if result['to_update']:
for chunk in result['to_update']:
print(chunk)
else:
print("null")
print("\n需要删除的 chunks:")
if result['to_delete']:
for uuid in result['to_delete']:
print(uuid)
else:
print("null")