erDiagram MESSAGES { string id PK text name text role text content_json text url text metadata_json real timestamp } MARKS { integer id PK string msg_id FK string mark } MESSAGES ||--o{ MARKS : has
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
CREATETABLE IF NOTEXISTS messages ( id TEXT PRIMARY KEY, name TEXT NOTNULL, role TEXT NOTNULL, content_json TEXT NOTNULL, url TEXT DEFAULT'', metadata_json TEXT, timestampREALNOTNULL );
CREATETABLE IF NOTEXISTS marks ( id INTEGERPRIMARY KEY AUTOINCREMENT, msg_id TEXT NOTNULL, mark TEXT NOTNULL, FOREIGN KEY (msg_id) REFERENCES messages(id) );
CREATE INDEX IF NOTEXISTS idx_marks_msg_id ON marks(msg_id); CREATE INDEX IF NOTEXISTS idx_marks_mark ON marks(mark);
def_init_db(self) -> None: """初始化数据库表。""" conn = sqlite3.connect(self.db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL, content_json TEXT NOT NULL, url TEXT DEFAULT '', metadata_json TEXT, timestamp REAL NOT NULL ); CREATE TABLE IF NOT EXISTS marks ( id INTEGER PRIMARY KEY AUTOINCREMENT, msg_id TEXT NOT NULL, mark TEXT NOT NULL, FOREIGN KEY (msg_id) REFERENCES messages(id) ); CREATE INDEX IF NOT EXISTS idx_marks_msg_id ON marks(msg_id); CREATE INDEX IF NOT EXISTS idx_marks_mark ON marks(mark); """) conn.commit() conn.close()
# 按 mark 过滤 conditions = [] if mark isnotNone: ifisinstance(mark, str): mark = [mark] placeholders = ",".join("?" * len(mark)) query += f" INNER JOIN marks mk ON m.id = mk.msg_id" conditions.append(f"mk.mark IN ({placeholders})") params.extend(mark)
if exclude_mark isnotNone: ifisinstance(exclude_mark, str): exclude_mark = [exclude_mark] placeholders = ",".join("?" * len(exclude_mark)) conditions.append( f"m.id NOT IN (SELECT msg_id FROM marks WHERE mark IN ({placeholders}))" ) params.extend(exclude_mark)
if conditions: query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY m.timestamp ASC"
rows = conn.execute(query, params).fetchall()
# 反序列化为 Msg messages = [] for row in rows: content = json.loads(row["content_json"]) msg = Msg( name=row["name"], content=content, role=row["role"], url=row["url"] orNone, metadata=json.loads(row["metadata_json"]) if row["metadata_json"] elseNone, ) messages.append(msg)
asyncdefdelete(self, msg_ids: list[str], **kwargs) -> int: """按 ID 删除消息。""" conn = sqlite3.connect(self.db_path) try: placeholders = ",".join("?" * len(msg_ids)) conn.execute(f"DELETE FROM marks WHERE msg_id IN ({placeholders})", msg_ids) cursor = conn.execute( f"DELETE FROM messages WHERE id IN ({placeholders})", msg_ids ) conn.commit() return cursor.rowcount finally: conn.close()
asyncdefdelete_by_mark(self, mark: str, *args, **kwargs) -> int: """按 mark 删除消息。""" conn = sqlite3.connect(self.db_path) try: msg_ids = conn.execute( "SELECT msg_id FROM marks WHERE mark = ?", (mark,) ).fetchall() msg_ids = [row[0] for row in msg_ids] if msg_ids: returnawaitself.delete(msg_ids) return0 finally: conn.close()
sequenceDiagram participant Agent as ReActAgent participant Mem as SQLiteMemory participant DB as SQLite 数据库
Agent->>Mem: add(Msg("user", "你好"), marks=["HINT"]) Mem->>DB: INSERT INTO messages Mem->>DB: INSERT INTO marks
Agent->>Mem: get_memory(exclude_mark="COMPRESSED") Mem->>DB: SELECT ... WHERE id NOT IN (SELECT msg_id FROM marks WHERE mark='COMPRESSED') DB-->>Mem: rows Mem->>Mem: 反序列化为 Msg 列表 Mem-->>Agent: [Msg, Msg, ...]