🔐 Mem0 is now SOC 2 and HIPAA compliant! We're committed to the highest standards of data security and privacy, enabling secure memory for enterprises, healthcare, and beyond. Learn more
Integrate Mem0 with Google Agent Development Kit (ADK), an open-source framework for building multi-agent workflows. This integration enables agents to access persistent memory across conversations, enhancing context retention and personalization.
The following example demonstrates how to create a Google ADK agent with Mem0 memory integration:
Copy
Ask AI
import osfrom google.adk.agents import Agentfrom google.adk.runners import Runnerfrom google.adk.sessions import InMemorySessionServicefrom google.genai import typesfrom mem0 import MemoryClient# Set up environment variablesos.environ["GOOGLE_API_KEY"] = "your-google-api-key"os.environ["MEM0_API_KEY"] = "your-mem0-api-key"# Initialize Mem0 clientmem0 = MemoryClient()# Define memory function toolsdef search_memory(query: str, user_id: str) -> dict: """Search through past conversations and memories""" memories = mem0.search(query, user_id=user_id) if memories: memory_context = "\n".join([f"- {mem['memory']}" for mem in memories]) return {"status": "success", "memories": memory_context} return {"status": "no_memories", "message": "No relevant memories found"}def save_memory(content: str, user_id: str) -> dict: """Save important information to memory""" try: mem0.add([{"role": "user", "content": content}], user_id=user_id) return {"status": "success", "message": "Information saved to memory"} except Exception as e: return {"status": "error", "message": f"Failed to save memory: {str(e)}"}# Create agent with memory capabilitiespersonal_assistant = Agent( name="personal_assistant", model="gemini-2.0-flash", instruction="""You are a helpful personal assistant with memory capabilities. Use the search_memory function to recall past conversations and user preferences. Use the save_memory function to store important information about the user. Always personalize your responses based on available memory.""", description="A personal assistant that remembers user preferences and past interactions", tools=[search_memory, save_memory])def chat_with_agent(user_input: str, user_id: str) -> str: """ Handle user input with automatic memory integration. Args: user_input: The user's message user_id: Unique identifier for the user Returns: The agent's response """ # Set up session and runner session_service = InMemorySessionService() session = session_service.create_session( app_name="memory_assistant", user_id=user_id, session_id=f"session_{user_id}" ) runner = Runner(agent=personal_assistant, app_name="memory_assistant", session_service=session_service) # Create content and run agent content = types.Content(role='user', parts=[types.Part(text=user_input)]) events = runner.run(user_id=user_id, session_id=session.id, new_message=content) # Extract final response for event in events: if event.is_final_response(): response = event.content.parts[0].text return response return "No response generated"# Example usageif __name__ == "__main__": response = chat_with_agent( "I love Italian food and I'm planning a trip to Rome next month", user_id="alice" ) print(response)
Create specialized agents in a hierarchy that share memory:
Copy
Ask AI
from google.adk.tools.agent_tool import AgentTool# Travel specialist agenttravel_agent = Agent( name="travel_specialist", model="gemini-2.0-flash", instruction="""You are a travel planning specialist. Use get_user_context to understand the user's travel preferences and history before making recommendations. After providing advice, use store_interaction to save travel-related information.""", description="Specialist in travel planning and recommendations", tools=[search_memory, save_memory])# Health advisor agenthealth_agent = Agent( name="health_advisor", model="gemini-2.0-flash", instruction="""You are a health and wellness advisor. Use get_user_context to understand the user's health goals and dietary preferences. After providing advice, use store_interaction to save health-related information.""", description="Specialist in health and wellness advice", tools=[search_memory, save_memory])# Coordinator agent that delegates to specialistscoordinator_agent = Agent( name="coordinator", model="gemini-2.0-flash", instruction="""You are a coordinator that delegates requests to specialist agents. For travel-related questions (trips, hotels, flights, destinations), delegate to the travel specialist. For health-related questions (fitness, diet, wellness, exercise), delegate to the health advisor. Use get_user_context to understand the user before delegation.""", description="Coordinates requests between specialist agents", tools=[ AgentTool(agent=travel_agent, skip_summarization=False), AgentTool(agent=health_agent, skip_summarization=False) ])def chat_with_specialists(user_input: str, user_id: str) -> str: """ Handle user input with specialist agent delegation and memory. Args: user_input: The user's message user_id: Unique identifier for the user Returns: The specialist agent's response """ session_service = InMemorySessionService() session = session_service.create_session( app_name="specialist_system", user_id=user_id, session_id=f"session_{user_id}" ) runner = Runner(agent=coordinator_agent, app_name="specialist_system", session_service=session_service) content = types.Content(role='user', parts=[types.Part(text=user_input)]) events = runner.run(user_id=user_id, session_id=session.id, new_message=content) for event in events: if event.is_final_response(): response = event.content.parts[0].text # Store the conversation in shared memory conversation = [ {"role": "user", "content": user_input}, {"role": "assistant", "content": response} ] mem0.add(conversation, user_id=user_id) return response return "No response generated"# Example usageresponse = chat_with_specialists("Plan a healthy meal for my Italy trip", user_id="alice")print(response)
Simple interactive chat with memory and Google ADK:
Copy
Ask AI
def interactive_chat(): """Interactive chat interface with memory and ADK""" user_id = input("Enter your user ID: ") or "demo_user" print(f"Chat started for user: {user_id}") print("Type 'quit' to exit") print("=" * 50) while True: user_input = input("\nYou: ") if user_input.lower() == 'quit': print("Goodbye! Your conversation has been saved to memory.") break else: response = chat_with_specialists(user_input, user_id) print(f"Assistant: {response}")if __name__ == "__main__": interactive_chat()
# Configure memory search with metadatamemories = mem0.search( query="travel preferences", user_id="alice", limit=5, filters={"category": "travel"} # Filter by category if supported)# Configure agent with custom model settingsagent = Agent( name="custom_agent", model="gemini-2.0-flash", # or use LiteLLM for other models instruction="Custom agent behavior", tools=[memory_tools], # Additional ADK configurations)# Use Google Cloud Vertex AI instead of AI Studioos.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project-id"os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"