Integrate Mem0 with Google Agent Development Kit (ADK), an open-source framework for building multi-agent workflows. This integration enables agents to access persistent memory across conversations, enhancing context retention and personalization.
The following example demonstrates how to create a Google ADK agent with Mem0 memory integration:
Copy
Ask AI
import osimport asynciofrom google.adk.agents import Agentfrom google.adk.runners import Runnerfrom google.adk.sessions import InMemorySessionServicefrom google.genai import typesfrom mem0 import MemoryClientfrom dotenv import load_dotenvload_dotenv()# Set up environment variables# os.environ["GOOGLE_API_KEY"] = "your-google-api-key"# os.environ["MEM0_API_KEY"] = "your-mem0-api-key"# Initialize Mem0 clientmem0 = MemoryClient()# Define memory function toolsdef search_memory(query: str, user_id: str) -> dict: """Search through past conversations and memories""" memories = mem0.search(query, user_id=user_id, output_format='v1.1') if memories.get('results', []): memory_list = memories['results'] memory_context = "\n".join([f"- {mem['memory']}" for mem in memory_list]) return {"status": "success", "memories": memory_context} return {"status": "no_memories", "message": "No relevant memories found"}def save_memory(content: str, user_id: str) -> dict: """Save important information to memory""" try: result = mem0.add([{"role": "user", "content": content}], user_id=user_id, output_format='v1.1') return {"status": "success", "message": "Information saved to memory", "result": result} except Exception as e: return {"status": "error", "message": f"Failed to save memory: {str(e)}"}# Create agent with memory capabilitiespersonal_assistant = Agent( name="personal_assistant", model="gemini-2.0-flash", instruction="""You are a helpful personal assistant with memory capabilities. Use the search_memory function to recall past conversations and user preferences. Use the save_memory function to store important information about the user. Always personalize your responses based on available memory.""", description="A personal assistant that remembers user preferences and past interactions", tools=[search_memory, save_memory])async def chat_with_agent(user_input: str, user_id: str) -> str: """ Handle user input with automatic memory integration. Args: user_input: The user's message user_id: Unique identifier for the user Returns: The agent's response """ # Set up session and runner session_service = InMemorySessionService() session = await session_service.create_session( app_name="memory_assistant", user_id=user_id, session_id=f"session_{user_id}" ) runner = Runner(agent=personal_assistant, app_name="memory_assistant", session_service=session_service) # Create content and run agent content = types.Content(role='user', parts=[types.Part(text=user_input)]) events = runner.run(user_id=user_id, session_id=session.id, new_message=content) # Extract final response for event in events: if event.is_final_response(): response = event.content.parts[0].text return response return "No response generated"# Example usageif __name__ == "__main__": response = asyncio.run(chat_with_agent( "I love Italian food and I'm planning a trip to Rome next month", user_id="alice" )) print(response)
Create specialized agents in a hierarchy that share memory:
Copy
Ask AI
from google.adk.tools.agent_tool import AgentTool# Travel specialist agenttravel_agent = Agent( name="travel_specialist", model="gemini-2.0-flash", instruction="""You are a travel planning specialist. Use get_user_context to understand the user's travel preferences and history before making recommendations. After providing advice, use store_interaction to save travel-related information.""", description="Specialist in travel planning and recommendations", tools=[search_memory, save_memory])# Health advisor agenthealth_agent = Agent( name="health_advisor", model="gemini-2.0-flash", instruction="""You are a health and wellness advisor. Use get_user_context to understand the user's health goals and dietary preferences. After providing advice, use store_interaction to save health-related information.""", description="Specialist in health and wellness advice", tools=[search_memory, save_memory])# Coordinator agent that delegates to specialistscoordinator_agent = Agent( name="coordinator", model="gemini-2.0-flash", instruction="""You are a coordinator that delegates requests to specialist agents. For travel-related questions (trips, hotels, flights, destinations), delegate to the travel specialist. For health-related questions (fitness, diet, wellness, exercise), delegate to the health advisor. Use get_user_context to understand the user before delegation.""", description="Coordinates requests between specialist agents", tools=[ AgentTool(agent=travel_agent, skip_summarization=False), AgentTool(agent=health_agent, skip_summarization=False) ])def chat_with_specialists(user_input: str, user_id: str) -> str: """ Handle user input with specialist agent delegation and memory. Args: user_input: The user's message user_id: Unique identifier for the user Returns: The specialist agent's response """ session_service = InMemorySessionService() session = session_service.create_session( app_name="specialist_system", user_id=user_id, session_id=f"session_{user_id}" ) runner = Runner(agent=coordinator_agent, app_name="specialist_system", session_service=session_service) content = types.Content(role='user', parts=[types.Part(text=user_input)]) events = runner.run(user_id=user_id, session_id=session.id, new_message=content) for event in events: if event.is_final_response(): response = event.content.parts[0].text # Store the conversation in shared memory conversation = [ {"role": "user", "content": user_input}, {"role": "assistant", "content": response} ] mem0.add(conversation, user_id=user_id) return response return "No response generated"# Example usageresponse = chat_with_specialists("Plan a healthy meal for my Italy trip", user_id="alice")print(response)
Simple interactive chat with memory and Google ADK:
Copy
Ask AI
def interactive_chat(): """Interactive chat interface with memory and ADK""" user_id = input("Enter your user ID: ") or "demo_user" print(f"Chat started for user: {user_id}") print("Type 'quit' to exit") print("=" * 50) while True: user_input = input("\nYou: ") if user_input.lower() == 'quit': print("Goodbye! Your conversation has been saved to memory.") break else: response = chat_with_specialists(user_input, user_id) print(f"Assistant: {response}")if __name__ == "__main__": interactive_chat()
# Configure memory search with metadatamemories = mem0.search( query="travel preferences", user_id="alice", limit=5, filters={"category": "travel"} # Filter by category if supported)# Configure agent with custom model settingsagent = Agent( name="custom_agent", model="gemini-2.0-flash", # or use LiteLLM for other models instruction="Custom agent behavior", tools=[memory_tools], # Additional ADK configurations)# Use Google Cloud Vertex AI instead of AI Studioos.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "True"os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project-id"os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"