Multi-Agent Example

Working with Multiple Agents

This example demonstrates how to set up and manage multiple agents in a collaborative environment using AgentConnect’s decentralized communication architecture.

Setting Up Multiple Agents

import os
import asyncio
from dotenv import load_dotenv

from agentconnect.agents.ai_agent import AIAgent
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
from agentconnect.core.types import (
    ModelProvider,
    ModelName,
    AgentIdentity,
    InteractionMode,
    Capability,
    MessageType
)
from agentconnect.core.message import Message

# Load environment variables
load_dotenv()

# Create a registry and communication hub
registry = AgentRegistry()
hub = CommunicationHub(registry)

# Create specialized agents with different capabilities
researcher = AIAgent(
    agent_id="researcher",
    name="Research Specialist",
    provider_type=ModelProvider.GOOGLE,
    model_name=ModelName.GEMINI2_FLASH,
    api_key=os.getenv("GOOGLE_API_KEY"),
    identity=AgentIdentity.create_key_based(),
    capabilities=[
        Capability(
            name="research",
            description="Can perform in-depth research on any topic",
            input_schema={"query": "string"},
            output_schema={"findings": "string"}
        )
    ],
    personality="thorough and analytical researcher",
    organization_id="example_org",
    interaction_modes=[InteractionMode.AGENT_TO_AGENT],
)

analyst = AIAgent(
    agent_id="analyst",
    name="Data Analyst",
    provider_type=ModelProvider.OPENAI,
    model_name=ModelName.GPT4O,
    api_key=os.getenv("OPENAI_API_KEY"),
    identity=AgentIdentity.create_key_based(),
    capabilities=[
        Capability(
            name="data_analysis",
            description="Can analyze data and extract insights",
            input_schema={"data": "string"},
            output_schema={"insights": "string"}
        )
    ],
    personality="precise and detail-oriented analyst",
    organization_id="example_org",
    interaction_modes=[InteractionMode.AGENT_TO_AGENT],
)

writer = AIAgent(
    agent_id="writer",
    name="Content Writer",
    provider_type=ModelProvider.GOOGLE,
    model_name=ModelName.GEMINI2_FLASH_LITE,
    api_key=os.getenv("GOOGLE_API_KEY"),
    identity=AgentIdentity.create_key_based(),
    capabilities=[
        Capability(
            name="content_creation",
            description="Can write engaging content on any topic",
            input_schema={"topic": "string", "style": "string"},
            output_schema={"content": "string"}
        )
    ],
    personality="creative and articulate writer",
    organization_id="example_org",
    interaction_modes=[InteractionMode.AGENT_TO_AGENT],
)

# Register all agents with the hub
async def setup():
    await hub.register_agent(researcher)
    await hub.register_agent(analyst)
    await hub.register_agent(writer)

    # Add message handlers to track communication
    hub.add_message_handler("researcher", lambda msg: print(f"Research message: {msg.content[:50]}..."))
    hub.add_message_handler("analyst", lambda msg: print(f"Analysis message: {msg.content[:50]}..."))
    hub.add_message_handler("writer", lambda msg: print(f"Writer message: {msg.content[:50]}..."))

# Run the setup
asyncio.run(setup())

Capability-Based Collaboration

With AgentConnect, agents can discover and collaborate with each other based on capabilities rather than pre-defined connections:

async def collaborative_task():
    # Human requests a comprehensive report on quantum computing
    initial_request = "I need a comprehensive report on the latest advancements in quantum computing"

    # Instead of hardcoding the sequence, we use capability discovery
    # The researcher discovers other agents based on their capabilities

    # Step 1: Send to researcher to gather information
    research_msg = Message.create(
        sender_id="human_user",
        receiver_id="researcher",
        content=initial_request,
        sender_identity=AgentIdentity.create_key_based(),
        message_type=MessageType.TEXT
    )

    research_response = await hub.route_message(research_msg)
    print(f"Research complete: {research_response.content[:100]}...")

    # Step 2: The analyst processes the research findings
    # The researcher uses the hub to find an agent with analysis capabilities
    analysis_result = await hub.send_collaboration_request(
        sender_id="researcher",
        receiver_id="analyst",  # In a real scenario, this could be discovered via capability search
        task_description=f"Analyze these quantum computing research findings and extract key insights: {research_response.content}",
        timeout=60
    )

    print(f"Analysis complete: {analysis_result[:100]}...")

    # Step 3: The writer creates the final report
    # Again, in a real scenario, this agent would be discovered via capabilities
    final_report = await hub.send_collaboration_request(
        sender_id="analyst",
        receiver_id="writer",
        task_description=f"Create a comprehensive report based on this analysis of quantum computing: {analysis_result}",
        timeout=60
    )

    print(f"Final report complete: {final_report[:100]}...")

    # Return the final report to the human
    final_msg = Message.create(
        sender_id="writer",
        receiver_id="human_user",
        content=final_report,
        sender_identity=AgentIdentity.create_key_based(),
        message_type=MessageType.TEXT
    )

    await hub.route_message(final_msg)

    return final_report

# Run the collaborative task
final_report = asyncio.run(collaborative_task())

Automatic Capability Discovery

AgentConnect’s true power comes from decentralized capability discovery. Here’s how to search for agents by capability:

async def capability_discovery_example():
    # Create a human agent as the requester
    human = AIAgent(
        agent_id="human_assistant",
        name="Human Assistant",
        provider_type=ModelProvider.GOOGLE,
        model_name=ModelName.GEMINI2_FLASH_LITE,
        api_key=os.getenv("GOOGLE_API_KEY"),
        identity=AgentIdentity.create_key_based(),
        interaction_modes=[InteractionMode.HUMAN_TO_AGENT, InteractionMode.AGENT_TO_AGENT],
        organization_id="example_org",
    )

    # Register this agent
    await hub.register_agent(human)

    # Search for agents with research capabilities
    research_agents = await registry.find_agents_by_capability("research")
    print(f"Found {len(research_agents)} agents with research capabilities")

    if research_agents:
        # Get the first research agent's ID
        research_agent_id = research_agents[0]

        # Send a collaboration request
        result = await hub.send_collaboration_request(
            sender_id="human_assistant",
            receiver_id=research_agent_id,
            task_description="Research the applications of quantum computing in healthcare",
            timeout=60
        )

        print(f"Research result: {result[:100]}...")

        # Now find an agent that can write content
        content_agents = await registry.find_agents_by_capability("content_creation")

        if content_agents:
            content_agent_id = content_agents[0]

            # Send the research to the content creator
            final_content = await hub.send_collaboration_request(
                sender_id="human_assistant",
                receiver_id=content_agent_id,
                task_description=f"Create an easy-to-understand blog post about this research: {result}",
                timeout=60
            )

            print(f"Final content: {final_content[:100]}...")
            return final_content

    return "No appropriate agents found"

# Run the capability discovery example
asyncio.run(capability_discovery_example())

Using Message Handlers for Coordination

Message handlers allow you to track and orchestrate communication between agents:

async def message_handler_example():
    # Create a global message handler to track all communications
    def global_message_tracker(message):
        print(f"[GLOBAL] {message.sender_id}{message.receiver_id}: {message.content[:50]}...")

    # Add global message handler
    hub.add_global_message_handler(global_message_tracker)

    # Create agent-specific message handlers for customized logic
    async def researcher_handler(message):
        print(f"[RESEARCH] Received: {message.content[:50]}...")
        # You could add specialized processing here

        # For example, logging research queries to a database
        # store_in_research_db(message.content)

        # Or triggering additional actions when certain keywords are detected
        if "quantum" in message.content.lower():
            print("[RESEARCH] Quantum-related request detected!")

    # Add the researcher-specific handler
    hub.add_message_handler("researcher", researcher_handler)

    # Send a test message to the researcher
    test_msg = Message.create(
        sender_id="human_user",
        receiver_id="researcher",
        content="Research the relationship between quantum computing and machine learning",
        sender_identity=AgentIdentity.create_key_based(),
        message_type=MessageType.TEXT
    )

    # Route the message and see the handlers in action
    await hub.route_message(test_msg)

# Run the message handler example
asyncio.run(message_handler_example())

Complete Multi-Agent System

Here’s a complete example that ties everything together:

import os
import asyncio
import json
from dotenv import load_dotenv

from agentconnect.agents.ai_agent import AIAgent
from agentconnect.agents.human_agent import HumanAgent
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
from agentconnect.core.types import (
    ModelProvider,
    ModelName,
    AgentIdentity,
    InteractionMode,
    Capability,
    MessageType
)

async def run_multi_agent_system():
    # Load environment variables
    load_dotenv()

    # Create registry and hub
    registry = AgentRegistry()
    hub = CommunicationHub(registry)

    # Create specialized agents
    agents = {
        "researcher": AIAgent(
            agent_id="researcher",
            name="Research Specialist",
            provider_type=ModelProvider.GOOGLE,
            model_name=ModelName.GEMINI2_FLASH,
            api_key=os.getenv("GOOGLE_API_KEY"),
            identity=AgentIdentity.create_key_based(),
            capabilities=[
                Capability(
                    name="research",
                    description="In-depth research on any topic",
                    input_schema={"query": "string"},
                    output_schema={"findings": "string"}
                )
            ],
            personality="thorough researcher",
            organization_id="example_org",
            interaction_modes=[InteractionMode.AGENT_TO_AGENT, InteractionMode.HUMAN_TO_AGENT],
        ),
        "analyst": AIAgent(
            agent_id="analyst",
            name="Data Analyst",
            provider_type=ModelProvider.OPENAI,
            model_name=ModelName.GPT4O,
            api_key=os.getenv("OPENAI_API_KEY"),
            identity=AgentIdentity.create_key_based(),
            capabilities=[
                Capability(
                    name="analysis",
                    description="Data analysis and insights extraction",
                    input_schema={"data": "string"},
                    output_schema={"insights": "string"}
                )
            ],
            personality="precise analyst",
            organization_id="example_org",
            interaction_modes=[InteractionMode.AGENT_TO_AGENT, InteractionMode.HUMAN_TO_AGENT],
        ),
        "writer": AIAgent(
            agent_id="writer",
            name="Content Writer",
            provider_type=ModelProvider.GOOGLE,
            model_name=ModelName.GEMINI2_FLASH_LITE,
            api_key=os.getenv("GOOGLE_API_KEY"),
            identity=AgentIdentity.create_key_based(),
            capabilities=[
                Capability(
                    name="writing",
                    description="Content creation and summarization",
                    input_schema={"topic": "string", "style": "string"},
                    output_schema={"content": "string"}
                )
            ],
            personality="creative writer",
            organization_id="example_org",
            interaction_modes=[InteractionMode.AGENT_TO_AGENT, InteractionMode.HUMAN_TO_AGENT],
        ),
    }

    # Create a human agent
    human = HumanAgent(
        agent_id="human_user",
        name="Example User",
        identity=AgentIdentity.create_key_based(),
        organization_id="example_org",
    )

    # Register all agents
    for agent_id, agent in agents.items():
        await hub.register_agent(agent)
        print(f"Registered agent: {agent_id}")

        # Start agent background processing
        agent_task = asyncio.create_task(agent.run())

    # Register human
    await hub.register_agent(human)

    # Set up message tracking
    results = {}

    async def message_tracker(message):
        print(f"Message: {message.sender_id}{message.receiver_id}: {message.content[:50]}...")
        # Store the latest message from each agent
        if message.sender_id in agents:
            results[message.sender_id] = message.content

    # Add global message handler
    hub.add_global_message_handler(message_tracker)

    # Human initiates the process
    initial_request = "Create a comprehensive report on the potential of quantum computing in medicine"

    # Human sends message to researcher
    await human.send_message(
        "researcher",
        initial_request
    )

    # Wait for researcher to respond
    await asyncio.sleep(10)

    # Researcher automatically discovers and collaborates with the analyst based on capabilities
    # This happens through the agent's internal workflow

    # Wait for the full process to complete
    await asyncio.sleep(60)

    # Save all results
    with open("multi_agent_results.json", "w") as f:
        json.dump(results, f, indent=2)

    # Clean up
    for agent in agents.values():
        agent.is_running = False
        await hub.unregister_agent(agent.agent_id)

    await hub.unregister_agent(human.agent_id)

    return results

if __name__ == "__main__":
    results = asyncio.run(run_multi_agent_system())
    print(f"Final results from all agents: {results.keys()}")