#Import the required libraries for local memory management with Mem0
from mem0 import Memory, AsyncMemory
import os
import asyncio
import logging
from dotenv import load_dotenv
import agentops
import openai
load_dotenv()
#Set up environment variables for API keys
os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
#Set up the configuration for local memory storage and define sample user data.
local_config = {
"llm": {
"provider": "openai",
"config": {
"model": "gpt-4o-mini",
"temperature": 0.1,
"max_tokens": 2000,
},
}
}
user_id = "alice_demo"
agent_id = "assistant_demo"
run_id = "session_001"
sample_messages = [
{"role": "user", "content": "I'm planning to watch a movie tonight. Any recommendations?"},
{"role": "assistant", "content": "How about a thriller? They can be quite engaging."},
{"role": "user", "content": "I'm not a big fan of thriller movies but I love sci-fi movies."},
{
"role": "assistant",
"content": "Got it! I'll avoid thriller recommendations and suggest sci-fi movies in the future.",
},
]
sample_preferences = [
"I prefer dark roast coffee over light roast",
"I exercise every morning at 6 AM",
"I'm vegetarian and avoid all meat products",
"I love reading science fiction novels",
"I work in software engineering",
]
#This function demonstrates sequential memory operations using the synchronous Memory class
def demonstrate_sync_memory(local_config, sample_messages, sample_preferences, user_id):
"""
Demonstrate synchronous Memory class operations.
"""
agentops.start_trace("mem0_memory_example", tags=["mem0_memory_example"])
try:
memory = Memory.from_config(local_config)
result = memory.add(
sample_messages, user_id=user_id, metadata={"category": "movie_preferences", "session": "demo"}
)
for i, preference in enumerate(sample_preferences):
result = memory.add(preference, user_id=user_id, metadata={"type": "preference", "index": i})
search_queries = [
"What movies does the user like?",
"What are the user's food preferences?",
"When does the user exercise?",
]
for query in search_queries:
results = memory.search(query, user_id=user_id)
if results and "results" in results:
for j, result in enumerate(results['results']):
print(f"Result {j+1}: {result.get('memory', 'N/A')}")
else:
print("No results found")
all_memories = memory.get_all(user_id=user_id)
if all_memories and "results" in all_memories:
print(f"Total memories: {len(all_memories['results'])}")
delete_all_result = memory.delete_all(user_id=user_id)
print(f"Delete all result: {delete_all_result}")
agentops.end_trace(end_state="success")
except Exception as e:
agentops.end_trace(end_state="error")
# Execute sync demonstrations
demonstrate_sync_memory(local_config, sample_messages, sample_preferences, user_id)