AsyncMemory
The AsyncMemory
class is a direct asynchronous interface to Mem0’s in-process memory operations. Unlike the memory, which interacts with an API, AsyncMemory
works directly with the underlying storage systems. This makes it ideal for applications where you want to embed Mem0 directly into your codebase.
Initialization
To use AsyncMemory
, import it from the mem0.memory
module:
import asyncio
from mem0 import AsyncMemory
# Initialize with default configuration
memory = AsyncMemory()
# Or initialize with custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
# Your custom configuration here
)
memory = AsyncMemory(config=custom_config)
Key Features
- Non-blocking Operations - All memory operations use
asyncio
to avoid blocking the event loop
- Concurrent Processing - Parallel execution of vector store and graph operations
- Efficient Resource Utilization - Better handling of I/O bound operations
- Compatible with Async Frameworks - Seamless integration with FastAPI, aiohttp, and other async frameworks
Methods
All methods in AsyncMemory
have the same parameters as the synchronous Memory
class but are designed to be used with async/await
.
Create memories
Add a new memory asynchronously:
try:
result = await memory.add(
messages=[
{"role": "user", "content": "I'm travelling to SF"},
{"role": "assistant", "content": "That's great to hear!"}
],
user_id="alice"
)
print("Memory added successfully:", result)
except Exception as e:
print(f"Error adding memory: {e}")
Retrieve memories
Retrieve memories related to a query:
try:
results = await memory.search(
query="Where am I travelling?",
user_id="alice"
)
print("Found memories:", results)
except Exception as e:
print(f"Error searching memories: {e}")
List memories
List all memories for a user_id
, agent_id
, and/or run_id
:
try:
all_memories = await memory.get_all(user_id="alice")
print(f"Retrieved {len(all_memories)} memories")
except Exception as e:
print(f"Error retrieving memories: {e}")
Get specific memory
Retrieve a specific memory by its ID:
try:
specific_memory = await memory.get(memory_id="memory-id-here")
print("Retrieved memory:", specific_memory)
except Exception as e:
print(f"Error retrieving memory: {e}")
Update memory
Update an existing memory by ID:
try:
updated_memory = await memory.update(
memory_id="memory-id-here",
data="I'm travelling to Seattle"
)
print("Memory updated successfully:", updated_memory)
except Exception as e:
print(f"Error updating memory: {e}")
Delete memory
Delete a specific memory by ID:
try:
result = await memory.delete(memory_id="memory-id-here")
print("Memory deleted successfully")
except Exception as e:
print(f"Error deleting memory: {e}")
Delete all memories
Delete all memories for a specific user, agent, or run:
try:
result = await memory.delete_all(user_id="alice")
print("All memories deleted successfully")
except Exception as e:
print(f"Error deleting memories: {e}")
At least one filter (user_id, agent_id, or run_id) is required when using delete_all.
Advanced Memory Organization
AsyncMemory supports the same three-parameter organization system as the synchronous Memory class:
# Store memories with full context
await memory.add(
messages=[{"role": "user", "content": "I prefer vegetarian food"}],
user_id="alice",
agent_id="diet-assistant",
run_id="consultation-001"
)
# Retrieve memories with different scopes
all_user_memories = await memory.get_all(user_id="alice")
agent_memories = await memory.get_all(user_id="alice", agent_id="diet-assistant")
session_memories = await memory.get_all(user_id="alice", run_id="consultation-001")
specific_memories = await memory.get_all(
user_id="alice",
agent_id="diet-assistant",
run_id="consultation-001"
)
# Search with context
general_search = await memory.search("What do you know about me?", user_id="alice")
agent_search = await memory.search("What do you know about me?", user_id="alice", agent_id="diet-assistant")
session_search = await memory.search("What do you know about me?", user_id="alice", run_id="consultation-001")
Memory History
Get the history of changes for a specific memory:
try:
history = await memory.history(memory_id="memory-id-here")
print("Memory history:", history)
except Exception as e:
print(f"Error retrieving history: {e}")
Example: Concurrent Usage with Other APIs
AsyncMemory
can be effectively combined with other async operations. Here’s an example showing how to use it alongside OpenAI API calls in separate threads:
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory
async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()
async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
try:
# Retrieve relevant memories
search_result = await async_memory.search(query=message, user_id=user_id, limit=3)
relevant_memories = search_result["results"]
memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)
# Generate Assistant response
system_prompt = f"You are a helpful AI. Answer the question based on query and memories.\nUser Memories:\n{memories_str}"
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": message}]
response = await async_openai_client.chat.completions.create(model="gpt-4o-mini", messages=messages)
assistant_response = response.choices[0].message.content
# Create new memories from the conversation
messages.append({"role": "assistant", "content": assistant_response})
await async_memory.add(messages, user_id=user_id)
return assistant_response
except Exception as e:
print(f"Error in chat_with_memories: {e}")
return "I apologize, but I encountered an error processing your request."
async def async_main():
print("Chat with AI (type 'exit' to quit)")
while True:
user_input = input("You: ").strip()
if user_input.lower() == 'exit':
print("Goodbye!")
break
response = await chat_with_memories(user_input)
print(f"AI: {response}")
def main():
asyncio.run(async_main())
if __name__ == "__main__":
main()
Error Handling and Best Practices
Common Error Types
When working with AsyncMemory
, you may encounter these common errors:
Connection and Configuration Errors
import asyncio
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig
async def handle_initialization_errors():
try:
# Initialize with custom config
config = MemoryConfig(
vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
llm={"provider": "openai", "config": {"model": "gpt-4o-mini"}}
)
memory = AsyncMemory(config=config)
print("AsyncMemory initialized successfully")
except ValueError as e:
print(f"Configuration error: {e}")
except ConnectionError as e:
print(f"Connection error: {e}")
except Exception as e:
print(f"Unexpected initialization error: {e}")
asyncio.run(handle_initialization_errors())
Memory Operation Errors
async def handle_memory_operation_errors():
memory = AsyncMemory()
try:
# Memory not found error
result = await memory.get(memory_id="non-existent-id")
except ValueError as e:
print(f"Invalid memory ID: {e}")
except Exception as e:
print(f"Memory retrieval error: {e}")
try:
# Invalid search parameters
results = await memory.search(query="", user_id="alice")
except ValueError as e:
print(f"Invalid search query: {e}")
except Exception as e:
print(f"Search error: {e}")
Concurrent Operations
Take advantage of AsyncMemory’s concurrent capabilities:
async def batch_operations():
memory = AsyncMemory()
# Process multiple operations concurrently
tasks = []
for i in range(5):
task = memory.add(
messages=[{"role": "user", "content": f"Message {i}"}],
user_id=f"user_{i}"
)
tasks.append(task)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} completed successfully")
except Exception as e:
print(f"Batch operation error: {e}")
Resource Management
Properly manage AsyncMemory lifecycle:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_memory():
memory = AsyncMemory()
try:
yield memory
finally:
# Clean up resources if needed
pass
async def safe_memory_usage():
async with get_memory() as memory:
try:
result = await memory.search("test query", user_id="alice")
return result
except Exception as e:
print(f"Memory operation failed: {e}")
return None
Timeout and Retry Strategies
Implement timeout and retry logic for robustness:
async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(operation(), timeout=timeout)
return result
except asyncio.TimeoutError:
print(f"Timeout on attempt {attempt + 1}")
except Exception as e:
print(f"Error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise Exception(f"Operation failed after {max_retries} attempts")
# Usage example
async def robust_memory_search():
memory = AsyncMemory()
async def search_operation():
return await memory.search("test query", user_id="alice")
try:
result = await with_timeout_and_retry(search_operation)
print("Search successful:", result)
except Exception as e:
print(f"Search failed permanently: {e}")
Integration with Async Frameworks
FastAPI Integration
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory
import asyncio
app = FastAPI()
memory = AsyncMemory()
@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
try:
result = await memory.add(messages=messages, user_id=user_id)
return {"status": "success", "data": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
try:
result = await memory.search(query=query, user_id=user_id, limit=limit)
return {"status": "success", "data": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Troubleshooting Guide
Issue | Possible Causes | Solutions |
---|
Initialization fails | Missing dependencies, invalid config | Check dependencies, validate configuration |
Slow operations | Large datasets, network latency | Implement caching, optimize queries |
Memory not found | Invalid memory ID, deleted memory | Validate IDs, implement existence checks |
Connection timeouts | Network issues, server overload | Implement retry logic, check network |
Out of memory errors | Large batch operations | Process in smaller batches |
Monitoring and Logging
Add comprehensive logging to your async memory operations:
import logging
import time
from functools import wraps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_async_operation(operation_name):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
logger.info(f"Starting {operation_name}")
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(f"{operation_name} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
logger.error(f"{operation_name} failed after {duration:.2f}s: {e}")
raise
return wrapper
return decorator
@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
return await memory.add(messages=messages, user_id=user_id)
If you have any questions or need further assistance, please don’t hesitate to reach out: