166 lines
6.2 KiB
Python
166 lines
6.2 KiB
Python
# 同步算法的实现和验证
|
||
# 算法说明:
|
||
# 1. 使用哈希表(defaultdict)建立content_hash到chunks的映射,时间复杂度O(n)
|
||
# 2. 使用集合操作找到相同位置的chunks,时间复杂度O(n)
|
||
# 3. 使用双指针法进行剩余chunks的匹配,时间复杂度O(n)
|
||
# 总体时间复杂度: O(n),其中n为chunks的总数
|
||
# 空间复杂度: O(n),主要用于存储哈希表
|
||
|
||
from collections import defaultdict
|
||
from typing import TypedDict, List, Dict, Set
|
||
from dataclasses import dataclass
|
||
|
||
@dataclass
|
||
class Chunk:
|
||
index: int
|
||
content_hash: str
|
||
chunk_content: str
|
||
uuid: str = None
|
||
|
||
class SyncResult(TypedDict):
|
||
to_create: List[Dict]
|
||
to_update: List[Dict]
|
||
to_delete: List[str]
|
||
|
||
# 模拟后端的旧 chunks 数据
|
||
old_chunks = [
|
||
{'uuid': 'uuid_1', 'index': 0, 'content_hash': 'hash_A', 'chunk_content': '这是第一段。'},
|
||
{'uuid': 'uuid_2', 'index': 1, 'content_hash': 'hash_B', 'chunk_content': '这是第二段。'},
|
||
{'uuid': 'uuid_3', 'index': 2, 'content_hash': 'hash_C', 'chunk_content': '这是第三段。'},
|
||
{'uuid': 'uuid_4', 'index': 3, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
{'uuid': 'uuid_5', 'index': 4, 'content_hash': 'hash_E', 'chunk_content': '这是第五段。'},
|
||
]
|
||
|
||
# 模拟 GitHub Actions 生成的新 chunks 数据
|
||
new_chunks = [
|
||
{'index': 0, 'content_hash': 'hash_A', 'chunk_content': '这是第一段。'},
|
||
{'index': 1, 'content_hash': 'hash_C', 'chunk_content': '这是第三段。'},
|
||
{'index': 2, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
{'index': 3, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
{'index': 4, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
{'index': 5, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
{'index': 6, 'content_hash': 'hash_D', 'chunk_content': '这是第四段。'},
|
||
]
|
||
|
||
def synchronize_chunks(old_chunks: List[Dict], new_chunks: List[Dict]) -> SyncResult:
|
||
"""
|
||
基于 content_hash + index 的双指针匹配算法,查找需要新增、更新和删除的 chunks。
|
||
主要改进:
|
||
1. 对同一 content_hash 的旧、新 chunks,分别按 index 排序,再逐个匹配,避免原先直接根据
|
||
“两两相同位置”导致重复 content_hash 时的混淆。
|
||
2. 保留了原先的距离阈值(distance <= threshold)判断,但逻辑更直观,减少漏匹或误判。
|
||
"""
|
||
|
||
# ========== 1. 输入验证 ==========
|
||
if not isinstance(old_chunks, list) or not isinstance(new_chunks, list):
|
||
raise TypeError("输入参数必须是列表类型")
|
||
|
||
required_fields = {'index', 'content_hash', 'chunk_content'}
|
||
for chunk in old_chunks:
|
||
if not required_fields.union({'uuid'}).issubset(chunk.keys()):
|
||
raise ValueError("旧chunks缺少必要字段")
|
||
for chunk in new_chunks:
|
||
if not required_fields.issubset(chunk.keys()):
|
||
raise ValueError("新chunks缺少必要字段")
|
||
|
||
# ========== 2. 构建 content_hash => chunks 的映射表,减少跨 content_hash 的错误匹配 ==========
|
||
old_chunks_by_hash = defaultdict(list)
|
||
for oc in old_chunks:
|
||
old_chunks_by_hash[oc['content_hash']].append(oc)
|
||
|
||
new_chunks_by_hash = defaultdict(list)
|
||
for nc in new_chunks:
|
||
new_chunks_by_hash[nc['content_hash']].append(nc)
|
||
|
||
# ========== 3. 遍历所有的 content_hash,逐个匹配 ==========
|
||
|
||
to_create = []
|
||
to_update = []
|
||
to_delete = []
|
||
|
||
# “并”集获取所有出现过的 content_hash
|
||
all_hashes = set(old_chunks_by_hash.keys()) | set(new_chunks_by_hash.keys())
|
||
|
||
# 允许的更新距离阈值,可根据需要调大或调小
|
||
threshold = 10
|
||
|
||
for content_hash in all_hashes:
|
||
old_list = sorted(old_chunks_by_hash[content_hash], key=lambda x: x['index'])
|
||
new_list = sorted(new_chunks_by_hash[content_hash], key=lambda x: x['index'])
|
||
|
||
i, j = 0, 0
|
||
len_old, len_new = len(old_list), len(new_list)
|
||
|
||
while i < len_old and j < len_new:
|
||
old_entry = old_list[i]
|
||
new_entry = new_list[j]
|
||
distance = abs(old_entry['index'] - new_entry['index'])
|
||
|
||
# 如果索引相近,则判定为同一块内容,执行更新操作
|
||
if distance <= threshold:
|
||
to_update.append({
|
||
'uuid': old_entry['uuid'],
|
||
'index': new_entry['index'],
|
||
'content_hash': content_hash,
|
||
'chunk_content': new_entry['chunk_content']
|
||
})
|
||
i += 1
|
||
j += 1
|
||
|
||
# 如果旧 chunk.index 更小,说明它在新列表里没有合适的配对,需要删除
|
||
elif old_entry['index'] < new_entry['index']:
|
||
to_delete.append(old_entry['uuid'])
|
||
i += 1
|
||
|
||
# 否则,新 chunk.index 更小,说明这是新增加的块
|
||
else:
|
||
to_create.append({
|
||
'index': new_entry['index'],
|
||
'content_hash': content_hash,
|
||
'chunk_content': new_entry['chunk_content']
|
||
})
|
||
j += 1
|
||
|
||
# 把剩余的旧 chunks 视为需要删除
|
||
while i < len_old:
|
||
to_delete.append(old_list[i]['uuid'])
|
||
i += 1
|
||
|
||
# 把剩余的新 chunks 视为需要新增
|
||
while j < len_new:
|
||
to_create.append({
|
||
'index': new_list[j]['index'],
|
||
'content_hash': content_hash,
|
||
'chunk_content': new_list[j]['chunk_content']
|
||
})
|
||
j += 1
|
||
|
||
return {
|
||
'to_create': to_create,
|
||
'to_update': to_update,
|
||
'to_delete': to_delete
|
||
}
|
||
|
||
if __name__ == '__main__':
|
||
result = synchronize_chunks(old_chunks, new_chunks)
|
||
|
||
print("需要创建的 chunks:")
|
||
if result['to_create']:
|
||
for chunk in result['to_create']:
|
||
print(chunk)
|
||
else:
|
||
print("null")
|
||
|
||
print("\n需要更新的 chunks:")
|
||
if result['to_update']:
|
||
for chunk in result['to_update']:
|
||
print(chunk)
|
||
else:
|
||
print("null")
|
||
|
||
print("\n需要删除的 chunks:")
|
||
if result['to_delete']:
|
||
for uuid in result['to_delete']:
|
||
print(uuid)
|
||
else:
|
||
print("null") |