AsyncMemory

The AsyncMemory class is a direct asynchronous interface to Mem0’s in-process memory operations. Unlike the memory, which interacts with an API, AsyncMemory works directly with the underlying storage systems. This makes it ideal for applications where you want to embed Mem0 directly into your codebase.

Initialization

To use AsyncMemory, import it from the mem0.memory module:
Python
import asyncio
from mem0 import AsyncMemory

# Initialize with default configuration
memory = AsyncMemory()

# Or initialize with custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
    # Your custom configuration here
)
memory = AsyncMemory(config=custom_config)

Key Features

  1. Non-blocking Operations - All memory operations use asyncio to avoid blocking the event loop
  2. Concurrent Processing - Parallel execution of vector store and graph operations
  3. Efficient Resource Utilization - Better handling of I/O bound operations
  4. Compatible with Async Frameworks - Seamless integration with FastAPI, aiohttp, and other async frameworks

Methods

All methods in AsyncMemory have the same parameters as the synchronous Memory class but are designed to be used with async/await.

Create memories

Add a new memory asynchronously:
Python
try:
    result = await memory.add(
        messages=[
            {"role": "user", "content": "I'm travelling to SF"},
            {"role": "assistant", "content": "That's great to hear!"}
        ],
        user_id="alice"
    )
    print("Memory added successfully:", result)
except Exception as e:
    print(f"Error adding memory: {e}")

Retrieve memories

Retrieve memories related to a query:
Python
try:
    results = await memory.search(
        query="Where am I travelling?",
        user_id="alice"
    )
    print("Found memories:", results)
except Exception as e:
    print(f"Error searching memories: {e}")

List memories

List all memories for a user_id, agent_id, and/or run_id:
Python
try:
    all_memories = await memory.get_all(user_id="alice")
    print(f"Retrieved {len(all_memories)} memories")
except Exception as e:
    print(f"Error retrieving memories: {e}")

Get specific memory

Retrieve a specific memory by its ID:
Python
try:
    specific_memory = await memory.get(memory_id="memory-id-here")
    print("Retrieved memory:", specific_memory)
except Exception as e:
    print(f"Error retrieving memory: {e}")

Update memory

Update an existing memory by ID:
Python
try:
    updated_memory = await memory.update(
        memory_id="memory-id-here",
        data="I'm travelling to Seattle"
    )
    print("Memory updated successfully:", updated_memory)
except Exception as e:
    print(f"Error updating memory: {e}")

Delete memory

Delete a specific memory by ID:
Python
try:
    result = await memory.delete(memory_id="memory-id-here")
    print("Memory deleted successfully")
except Exception as e:
    print(f"Error deleting memory: {e}")

Delete all memories

Delete all memories for a specific user, agent, or run:
Python
try:
    result = await memory.delete_all(user_id="alice")
    print("All memories deleted successfully")
except Exception as e:
    print(f"Error deleting memories: {e}")
At least one filter (user_id, agent_id, or run_id) is required when using delete_all.

Advanced Memory Organization

AsyncMemory supports the same three-parameter organization system as the synchronous Memory class:
Python
# Store memories with full context
await memory.add(
    messages=[{"role": "user", "content": "I prefer vegetarian food"}],
    user_id="alice",
    agent_id="diet-assistant",
    run_id="consultation-001"
)

# Retrieve memories with different scopes
all_user_memories = await memory.get_all(user_id="alice")
agent_memories = await memory.get_all(user_id="alice", agent_id="diet-assistant")
session_memories = await memory.get_all(user_id="alice", run_id="consultation-001")
specific_memories = await memory.get_all(
    user_id="alice", 
    agent_id="diet-assistant", 
    run_id="consultation-001"
)

# Search with context
general_search = await memory.search("What do you know about me?", user_id="alice")
agent_search = await memory.search("What do you know about me?", user_id="alice", agent_id="diet-assistant")
session_search = await memory.search("What do you know about me?", user_id="alice", run_id="consultation-001")

Memory History

Get the history of changes for a specific memory:
Python
try:
    history = await memory.history(memory_id="memory-id-here")
    print("Memory history:", history)
except Exception as e:
    print(f"Error retrieving history: {e}")

Example: Concurrent Usage with Other APIs

AsyncMemory can be effectively combined with other async operations. Here’s an example showing how to use it alongside OpenAI API calls in separate threads:
Python
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory

async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()

async def chat_with_memories(message: str, user_id: str = "default_user") -> str:
    try:
        # Retrieve relevant memories
        search_result = await async_memory.search(query=message, user_id=user_id, limit=3)
        relevant_memories = search_result["results"]
        memories_str = "\n".join(f"- {entry['memory']}" for entry in relevant_memories)
        
        # Generate Assistant response
        system_prompt = f"You are a helpful AI. Answer the question based on query and memories.\nUser Memories:\n{memories_str}"
        messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": message}]
        response = await async_openai_client.chat.completions.create(model="gpt-4o-mini", messages=messages)
        assistant_response = response.choices[0].message.content

        # Create new memories from the conversation
        messages.append({"role": "assistant", "content": assistant_response})
        await async_memory.add(messages, user_id=user_id)

        return assistant_response
    except Exception as e:
        print(f"Error in chat_with_memories: {e}")
        return "I apologize, but I encountered an error processing your request."

async def async_main():
    print("Chat with AI (type 'exit' to quit)")
    while True:
        user_input = input("You: ").strip()
        if user_input.lower() == 'exit':
            print("Goodbye!")
            break
        response = await chat_with_memories(user_input)
        print(f"AI: {response}")

def main():
    asyncio.run(async_main())

if __name__ == "__main__":
    main()

Error Handling and Best Practices

Common Error Types

When working with AsyncMemory, you may encounter these common errors:

Connection and Configuration Errors

Python
import asyncio
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig

async def handle_initialization_errors():
    try:
        # Initialize with custom config
        config = MemoryConfig(
            vector_store={"provider": "chroma", "config": {"path": "./chroma_db"}},
            llm={"provider": "openai", "config": {"model": "gpt-4o-mini"}}
        )
        memory = AsyncMemory(config=config)
        print("AsyncMemory initialized successfully")
    except ValueError as e:
        print(f"Configuration error: {e}")
    except ConnectionError as e:
        print(f"Connection error: {e}")
    except Exception as e:
        print(f"Unexpected initialization error: {e}")

asyncio.run(handle_initialization_errors())

Memory Operation Errors

Python
async def handle_memory_operation_errors():
    memory = AsyncMemory()
    
    try:
        # Memory not found error
        result = await memory.get(memory_id="non-existent-id")
    except ValueError as e:
        print(f"Invalid memory ID: {e}")
    except Exception as e:
        print(f"Memory retrieval error: {e}")
    
    try:
        # Invalid search parameters
        results = await memory.search(query="", user_id="alice")
    except ValueError as e:
        print(f"Invalid search query: {e}")
    except Exception as e:
        print(f"Search error: {e}")

Performance Optimization

Concurrent Operations

Take advantage of AsyncMemory’s concurrent capabilities:
Python
async def batch_operations():
    memory = AsyncMemory()
    
    # Process multiple operations concurrently
    tasks = []
    for i in range(5):
        task = memory.add(
            messages=[{"role": "user", "content": f"Message {i}"}],
            user_id=f"user_{i}"
        )
        tasks.append(task)
    
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
            else:
                print(f"Task {i} completed successfully")
    except Exception as e:
        print(f"Batch operation error: {e}")

Resource Management

Properly manage AsyncMemory lifecycle:
Python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_memory():
    memory = AsyncMemory()
    try:
        yield memory
    finally:
        # Clean up resources if needed
        pass

async def safe_memory_usage():
    async with get_memory() as memory:
        try:
            result = await memory.search("test query", user_id="alice")
            return result
        except Exception as e:
            print(f"Memory operation failed: {e}")
            return None

Timeout and Retry Strategies

Implement timeout and retry logic for robustness:
Python
async def with_timeout_and_retry(operation, max_retries=3, timeout=10.0):
    for attempt in range(max_retries):
        try:
            result = await asyncio.wait_for(operation(), timeout=timeout)
            return result
        except asyncio.TimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
        except Exception as e:
            print(f"Error on attempt {attempt + 1}: {e}")
        
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    raise Exception(f"Operation failed after {max_retries} attempts")

# Usage example
async def robust_memory_search():
    memory = AsyncMemory()
    
    async def search_operation():
        return await memory.search("test query", user_id="alice")
    
    try:
        result = await with_timeout_and_retry(search_operation)
        print("Search successful:", result)
    except Exception as e:
        print(f"Search failed permanently: {e}")

Integration with Async Frameworks

FastAPI Integration

Python
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory
import asyncio

app = FastAPI()
memory = AsyncMemory()

@app.post("/memories/")
async def add_memory(messages: list, user_id: str):
    try:
        result = await memory.add(messages=messages, user_id=user_id)
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/memories/search")
async def search_memories(query: str, user_id: str, limit: int = 10):
    try:
        result = await memory.search(query=query, user_id=user_id, limit=limit)
        return {"status": "success", "data": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Troubleshooting Guide

IssuePossible CausesSolutions
Initialization failsMissing dependencies, invalid configCheck dependencies, validate configuration
Slow operationsLarge datasets, network latencyImplement caching, optimize queries
Memory not foundInvalid memory ID, deleted memoryValidate IDs, implement existence checks
Connection timeoutsNetwork issues, server overloadImplement retry logic, check network
Out of memory errorsLarge batch operationsProcess in smaller batches

Monitoring and Logging

Add comprehensive logging to your async memory operations:
Python
import logging
import time
from functools import wraps

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_async_operation(operation_name):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            logger.info(f"Starting {operation_name}")
            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"{operation_name} completed in {duration:.2f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"{operation_name} failed after {duration:.2f}s: {e}")
                raise
        return wrapper
    return decorator

@log_async_operation("Memory Add")
async def logged_memory_add(memory, messages, user_id):
    return await memory.add(messages=messages, user_id=user_id)
If you have any questions or need further assistance, please don’t hesitate to reach out: