Integrate Mem0 with AgentOps, a comprehensive monitoring and analytics platform for AI agents. This integration enables automatic tracking and analysis of memory operations, providing insights into agent performance and memory usage patterns.
The following example demonstrates how to integrate Mem0 with AgentOps monitoring for comprehensive memory operation tracking:
Copy
Ask AI
#Import the required libraries for local memory management with Mem0from mem0 import Memory, AsyncMemoryimport osimport asyncioimport loggingfrom dotenv import load_dotenvimport agentops#Set up environment variables for API keysos.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY")os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")#Set up the configuration for local memory storage and define sample user data. local_config = { "llm": { "provider": "openai", "config": { "model": "gpt-4o-mini", "temperature": 0.1, "max_tokens": 2000, }, }}user_id = "alice_demo"agent_id = "assistant_demo"run_id = "session_001"sample_messages = [ {"role": "user", "content": "I'm planning to watch a movie tonight. Any recommendations?"}, {"role": "assistant", "content": "How about a thriller? They can be quite engaging."}, {"role": "user", "content": "I'm not a big fan of thriller movies but I love sci-fi movies."}, { "role": "assistant", "content": "Got it! I'll avoid thriller recommendations and suggest sci-fi movies in the future.", },]sample_preferences = [ "I prefer dark roast coffee over light roast", "I exercise every morning at 6 AM", "I'm vegetarian and avoid all meat products", "I love reading science fiction novels", "I work in software engineering",]#This function demonstrates sequential memory operations using the synchronous Memory classdef demonstrate_sync_memory(local_config, sample_messages, sample_preferences, user_id): """ Demonstrate synchronous Memory class operations. """ agentops.start_trace("mem0_memory_example", tags=["mem0_memory_example"]) try: memory = Memory.from_config(local_config) result = memory.add( sample_messages, user_id=user_id, metadata={"category": "movie_preferences", "session": "demo"} ) for i, preference in enumerate(sample_preferences): result = memory.add(preference, user_id=user_id, metadata={"type": "preference", "index": i}) search_queries = [ "What movies does the user like?", "What are the user's food preferences?", "When does the user exercise?", ] for query in search_queries: results = memory.search(query, user_id=user_id) if results and "results" in results: for j, result in enumerate(results): print(f"Result {j+1}: {result.get('memory', 'N/A')}") else: print("No results found") all_memories = memory.get_all(user_id=user_id) if all_memories and "results" in all_memories: print(f"Total memories: {len(all_memories['results'])}") delete_all_result = memory.delete_all(user_id=user_id) print(f"Delete all result: {delete_all_result}") agentops.end_trace(end_state="success") except Exception as e: agentops.end_trace(end_state="error")# Execute sync demonstrationsdemonstrate_sync_memory(local_config, sample_messages, sample_preferences, user_id)