自然进化

1.加入仿生记忆功能。
This commit is contained in:
guo zebin
2025-11-11 14:45:49 +08:00
parent f37680a7d0
commit da05cd73e6
30 changed files with 5663 additions and 128 deletions

View File

@@ -0,0 +1,7 @@
"""
算法模块
包含仿生记忆系统的核心算法:
- 牛顿冷却遗忘算法
- 聚类抑制机制
"""

View File

@@ -0,0 +1,147 @@
"""
基于聚类的记忆抑制机制
实现从短期记忆中加载数倍目标条数进行k-means聚类每簇取最相似的代表
主要思路:
1. 从短期记忆中加载数倍(t:聚类平均条数)目标所需条数(k*n从n倍的检索结果中取topk)的相关记录含embedding总检索条数=t*k*n
2. 对结果根据embedding进行k-means聚类簇数为k*n同条数/t
3. 每簇取与检索最相似的代表当前簇返回k个簇代表作为最终结果
"""
import numpy as np
from sklearn.cluster import KMeans
from typing import List, Dict, Tuple
import logging
# 使用统一日志配置
from bionicmemory.utils.logging_config import get_logger
logger = get_logger(__name__)
class ClusteringSuppression:
"""
聚类抑制机制
通过k-means聚类对相似记忆进行分组从每组中选择最相关的代表
"""
def __init__(self,
cluster_multiplier: int = 3,
retrieval_multiplier: int = 2):
"""
初始化聚类抑制机制
Args:
cluster_multiplier: 每个簇期望包含的记录数量默认3条
retrieval_multiplier: 检索结果倍数默认2倍
"""
self.cluster_multiplier = cluster_multiplier
self.retrieval_multiplier = retrieval_multiplier
logger.info(f"聚类抑制机制初始化: 每簇期望记录数={cluster_multiplier}, 检索倍数={retrieval_multiplier}")
def calculate_retrieval_parameters(self, target_k: int) -> Tuple[int, int]:
"""
计算检索参数
Args:
target_k: 目标返回条数
Returns:
(总检索条数, 聚类数)
"""
# 聚类数 = 目标条数 * 检索倍数
cluster_count = target_k * self.retrieval_multiplier
# 总检索条数 = 聚类数 * 每簇期望记录数
total_retrieval = cluster_count * self.cluster_multiplier
return total_retrieval, cluster_count
def cluster_by_query_similarity_and_aggregate(self,
records: List[Dict],
embeddings_array: np.ndarray,
distances: List[float],
cluster_count: int,
target_k: int) -> List[Dict]:
"""
基于查询相似度的聚类:
- 簇内选与查询distance最小的记录为代表
- 代表记录的valid_access_count = 簇内所有记录的valid_access_count之和
- 最终结果 = 分别按相关度与valid_access_count各取target_k条按doc_id去重后返回合集。
Args:
records: 与embeddings_array、distances一一对齐的记录列表每条含embedding、distance、valid_access_count
embeddings_array: 形如 (N, D) 的向量数组
distances: 长度为 N 的距离列表(越小越相似)
cluster_count: 聚类簇数
target_k: 返回前k条代表
"""
import numpy as np
from sklearn.cluster import KMeans
if not isinstance(cluster_count, int) or cluster_count < 1:
cluster_count = 1
n = len(records)
if n == 0:
return []
# 样本数 <= 聚类数不聚类直接在原集合上做双路topK并去重
if n <= cluster_count:
base = []
for i in range(n):
rep = dict(records[i])
rep["cluster_size"] = 1
base.append(rep)
# 分别取topK
by_rel = sorted(base, key=lambda x: float(x.get("distance", float("inf"))))[:target_k]
by_cnt = sorted(base, key=lambda x: float(x.get("valid_access_count", 0.0)), reverse=True)[:target_k]
# 合并去重按doc_id
seen = set()
merged = []
for r in by_rel + by_cnt:
rid = r.get("doc_id")
if rid not in seen:
seen.add(rid)
merged.append(r)
return merged
# KMeans 聚类
kmeans = KMeans(n_clusters=cluster_count, random_state=42, n_init=10)
labels = kmeans.fit_predict(embeddings_array)
# 簇代表选择与累计
representatives = []
for cid in np.unique(labels):
idx = np.where(labels == cid)[0]
if len(idx) == 0:
continue
# 代表簇内与查询distance最小
local_dist = [(i, float(distances[i]) if distances[i] is not None else float("inf")) for i in idx]
rep_idx, _ = min(local_dist, key=lambda t: t[1])
# 累计簇内valid_access_count
sum_valid = float(sum(float(records[i].get("valid_access_count", 0.0)) for i in idx))
rep = dict(records[rep_idx])
rep["valid_access_count"] = sum_valid
rep["cluster_size"] = len(idx)
representatives.append(rep)
# 分别按相关度与valid_access_count取topK然后合并去重
top_by_relevance = sorted(representatives, key=lambda x: float(x.get("distance", float("inf"))))[:target_k]
top_by_count = sorted(representatives, key=lambda x: float(x.get("valid_access_count", 0.0)), reverse=True)[:target_k]
seen_ids = set()
final_selection = []
for r in top_by_relevance + top_by_count:
rid = r.get("doc_id")
if rid not in seen_ids:
seen_ids.add(rid)
final_selection.append(r)
return final_selection

View File

@@ -0,0 +1,48 @@
import math
from enum import Enum
from datetime import datetime
class CoolingRate(Enum):
MINUTES_20 = (0.582, 20 * 60)
HOURS_1 = (0.442, 1 * 60 * 60)
HOURS_9 = (0.358, 9 * 60 * 60)
DAYS_1 = (0.337, 1 * 24 * 60 * 60)
DAYS_2 = (0.278, 2 * 24 * 60 * 60)
DAYS_6 = (0.254, 6 * 24 * 60 * 60)
DAYS_31 = (0.211, 31 * 24 * 60 * 60)
class NewtonCoolingHelper:
@staticmethod
def calculate_cooling_rate(enum_value: CoolingRate) -> float:
"""
根据枚举值计算冷却速率系数alpha
"""
final_temperature_ratio, time_interval = enum_value.value
return -math.log(final_temperature_ratio) / time_interval
@staticmethod
def calculate_newton_cooling_effect(initial_temperature: float, time_interval: float, cooling_rate: float = None) -> float:
"""
根据牛顿冷却定律计算当前时间的温度。
"""
if cooling_rate is None:
cooling_rate = NewtonCoolingHelper.calculate_cooling_rate(CoolingRate.DAYS_31)
return initial_temperature * math.exp(-cooling_rate * time_interval)
@staticmethod
def calculate_time_difference(update_time: datetime, current_time: datetime) -> float:
"""
计算上次更新时间与当前时间之间的时间差。
"""
if isinstance(update_time, str):
update_time = datetime.fromisoformat(update_time)
if isinstance(current_time, str):
current_time = datetime.fromisoformat(current_time)
time_delta = current_time - update_time
return time_delta.total_seconds()
@staticmethod
def get_threshold(cooling_rate: CoolingRate=None) -> float:
if cooling_rate is None:
cooling_rate=CoolingRate.DAYS_31
return cooling_rate.value[0]