大模型时代,RAG是搭建私有知识库最便捷方式,而 HNSW 是 RAG 知识库搭建和搜索的重大技术之一,今天我们就来聊聊 HNSW。
HNSW(Hierarchical Navigable Small World)听着名字高大上,但把算法过程用语言描述出来,也没那么难懂。

HNSW 结合了跳表和 NSW 算法,把数据点存于多层 NSW 图。对于每个新加入的数据点,以概率方式获取它所在的最高层级,层越高放到该层的概率越小。数据点会放到它所在的最高层到第0层,即层级越小包含数据点越全面,第0层包含所有的数据点。
举例说明,在上面HNSW简化图中,点n1 放到了第2、1、0层,点n2放到了第1、0层,点n3放到了第0层。
NSW 图构建

HNSW 包含多层 NSW 图,每层 NSW 图构建过程如下:
初始化一个空图,每个数据点作为图的一个节点。
对于每个新加入的数据点,通过ANN(近似最近临搜索)算法找到其最近邻节点,并与其建立双向边连接。
给新加入的数据点添加长程边。长程边是小世界网络中极少数却最关键的边,它们跳过了局部邻域,把本来相距很远的区域连接起来。
限制每个节点的出度(一般为一个小常数,如 10-30),以保证图的稀疏性和搜索效率。
连接策略基于小世界网络特性,确保图具有低直径(节点间最短路径较短)和高聚类系数。

从最高层 NSW 图开始,
通过 ANN(近似最近邻搜索)算法搜索与目标点最近节点。把上一层 NSW 图搜索到的节点作为下一层 NSW 图搜索起点。重复上面步骤,直到在第0层 NSW 图中找到与目标点最近的节点。
在上图中,假设目标节点映射到第0层 NSW 是n4,绿线标志了搜索路径。
从算法描述可知,不管在搭建 HNSW 插入新节点时还是在搜索时,都要通过算法查找节点的最近临节点,即ANN(近似最近邻搜索)。懂了此算法,整个 HNSW 也就恍然大悟。算法过程:
从 NSW 图中的一个或多个随机起点开始,
检查当前节点的邻居,选择与目标点距离最近的邻居作为下一步的起点。重复此过程,直到无法找到更接近目标点的节点(即达到局部最优)。
可通过多起点搜索或设置搜索深度来提高召回率。
ANN 算法 python 示例
def _search_candidates_from_entry(self, query: Vector, ef: int,
entry_id: int) -> List[int]:
"""从一个入口节点开始搜索最近邻节点
Args:
query: 目标节点.
ef: 返回的最大候选节点数.
entry_id: 搜索入口节点
Returns:
候选节点ID列表
"""
visited = set([entry_id])
self._search_paths[entry_id] = [entry_id] # Initialize with entry point
def dist_to(node_id: int) -> float:
return self._distance_fn(self._vectors[node_id], query)
# 待检索的节点最小堆
explore_heap: List[Tuple[float, int]] = [(dist_to(entry_id), entry_id)]
# 最佳候选堆记录与目标节点距离最近的节点堆,
# 由于python heap只支持最小堆,所以存储负值, 即堆顶是堆中与目标节点最远的节点
best_heap: List[Tuple[float, int]] = [(-explore_heap[0][0], entry_id)]
# 记录当前最优距离,用于判断是否需要提前停止搜索
best_distance = dist_to(entry_id)
steps_without_improvement = 0
max_steps_without_improvement = 5 # 当检索节点与目标节点距离没有改善时提前停止
max_search_steps = 20 # 最大搜索步骤数
search_step = 0
while explore_heap and search_step < max_search_steps:
d_cur, cur = heapq.heappop(explore_heap)
search_step += 1
# 如果当前节点与目标节点距离大于最佳候选堆中最远节点距离且已经找到足够候选节点时,停止搜索
worst_best = -best_heap[0][0]
if len(best_heap) >= ef and d_cur > worst_best:
break
if d_cur < best_distance:
best_distance = d_cur
steps_without_improvement = 0
else:
steps_without_improvement += 1
# 当发现检索节点与目标节点的距离一直没有缩小时,停止搜索
if steps_without_improvement > max_steps_without_improvement:
break
# 记录当前节点在搜索路径中
if cur not in self._search_paths[entry_id]:
self._search_paths[entry_id].append(cur)
# 检索当前节点的邻居节点
for nbr in self._graph.get(cur, []):
if nbr in visited:
continue
visited.add(nbr)
d = dist_to(nbr)
heapq.heappush(explore_heap, (d, nbr))
if len(best_heap) < ef:
# 更新最佳候选堆
heapq.heappush(best_heap, (-d, nbr))
else:
# 当最佳候选堆已满时,判断当前节点与目标节点的距离是否小于堆顶节点的距离
# 如果是,则替换堆顶节点
if d < -best_heap[0][0]:
heapq.heapreplace(best_heap, (-d, nbr))
return [node_id for _, node_id in best_heap]从上面介绍可知把 HNSW 掰开看,此算法还是比较亲民,较容易看懂的。