import asyncio
import os
import logging
from typing import Optional, List, Dict, Any
import numpy as np
import sounddevice as sd
from pydantic import BaseModel
# OpenAI Agents SDK imports
from agents import (
Agent,
function_tool
)
from agents.voice import (
AudioInput,
SingleAgentVoiceWorkflow,
VoicePipeline
)
from agents.extensions.handoff_prompt import prompt_with_handoff_instructions
# Mem0 imports
from mem0 import AsyncMemoryClient
# Set up API keys (replace with your actual keys)
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
os.environ["MEM0_API_KEY"] = "your-mem0-api-key"
# Define a global user ID for simplicity
USER_ID = "voice_user"
# Initialize Mem0 client
mem0_client = AsyncMemoryClient()
# Create tools that utilize Mem0's memory
@function_tool
async def save_memories(
memory: str
) -> str:
"""
Store a user memory in memory.
Args:
memory: The memory to save
"""
print(f"Saving memory: {memory} for user {USER_ID}")
# Store the preference in Mem0
memory_content = f"User memory - {memory}"
await mem0_client.add(
memory_content,
user_id=USER_ID,
)
return f"I've saved your memory: {memory}"
@function_tool
async def search_memories(
query: str
) -> str:
"""
Find memories relevant to the current conversation.
Args:
query: The search query to find relevant memories
"""
print(f"Finding memories related to: {query}")
results = await mem0_client.search(
query,
user_id=USER_ID,
limit=5,
threshold=0.7, # Higher threshold for more relevant results
output_format="v1.1"
)
# Format and return the results
if not results.get('results', []):
return "I don't have any relevant memories about this topic."
memories = [f"• {result['memory']}" for result in results.get('results', [])]
return "Here's what I remember that might be relevant:\n" + "\n".join(memories)
# Create the agent with memory-enabled tools
def create_memory_voice_agent():
# Create the agent with memory-enabled tools
agent = Agent(
name="Memory Assistant",
instructions=prompt_with_handoff_instructions(
"""You're speaking to a human, so be polite and concise.
Always respond in clear, natural English.
You have the ability to remember information about the user.
Use the save_memories tool when the user shares an important information worth remembering.
Use the search_memories tool when you need context from past conversations or user asks you to recall something.
""",
),
model="gpt-4o",
tools=[save_memories, search_memories],
)
return agent
async def record_from_microphone(duration=5, samplerate=24000):
"""Record audio from the microphone for a specified duration."""
print(f"Recording for {duration} seconds...")
# Create a buffer to store the recorded audio
frames = []
# Callback function to store audio data
def callback(indata, frames_count, time_info, status):
frames.append(indata.copy())
# Start recording
with sd.InputStream(samplerate=samplerate, channels=1, callback=callback, dtype=np.int16):
await asyncio.sleep(duration)
# Combine all frames into a single numpy array
audio_data = np.concatenate(frames)
return audio_data
async def main():
print("Starting Memory Voice Agent")
# Create the agent and context
agent = create_memory_voice_agent()
# Set up the voice pipeline
pipeline = VoicePipeline(
workflow=SingleAgentVoiceWorkflow(agent)
)
# Configure TTS settings
pipeline.config.tts_settings.voice = "alloy"
pipeline.config.tts_settings.speed = 1.0
try:
while True:
# Get user input
print("\nPress Enter to start recording (or 'q' to quit)...")
user_input = input()
if user_input.lower() == 'q':
break
# Record and process audio
audio_data = await record_from_microphone(duration=5)
audio_input = AudioInput(buffer=audio_data)
print("Processing your request...")
# Process the audio input
result = await pipeline.run(audio_input)
# Create an audio player
player = sd.OutputStream(samplerate=24000, channels=1, dtype=np.int16)
player.start()
# Store the agent's response for adding to memory
agent_response = ""
print("\nAgent response:")
# Play the audio stream as it comes in
async for event in result.stream():
if event.type == "voice_stream_event_audio":
player.write(event.data)
elif event.type == "voice_stream_event_content":
# Accumulate and print the text response
content = event.data
agent_response += content
print(content, end="", flush=True)
print("\n")
# Example of saving the conversation to Mem0 after completion
if agent_response:
try:
await mem0_client.add(
f"Agent response: {agent_response}",
user_id=USER_ID,
metadata={"type": "agent_response"}
)
except Exception as e:
print(f"Failed to store memory: {e}")
except KeyboardInterrupt:
print("\nExiting...")
if __name__ == "__main__":
asyncio.run(main())