agentconnect.communication.hub module

Message routing system for the AgentConnect framework.

This module provides a communication hub that handles message routing and agent registration without dictating agent behavior. It enables agent discovery and communication in a peer-to-peer network where agents make independent decisions.

class CommunicationHub(registry)

Bases: object

Message routing system that facilitates peer-to-peer agent communication.

The CommunicationHub is NOT a central controller of agent behavior, but rather:

  1. Routes messages between independent agents

  2. Facilitates agent discovery through registration

  3. Ensures secure message delivery without dictating responses

  4. Manages communication protocols for consistent messaging

  5. Tracks message history for auditability

Each agent connected to the hub maintains its autonomy and decision-making capability. The hub simply enables discovery and communication without controlling behavior.

Parameters:

registry (AgentRegistry)

add_message_handler(agent_id, handler)

Add a message handler for a specific agent.

Parameters:
  • agent_id (str) – The ID of the agent to handle messages for

  • handler (Callable[[Message], Awaitable[None]]) – Async function that takes a Message and returns None

Raises:

ValueError – If agent_id is None or empty, or if handler is None

Return type:

None

add_global_handler(handler)

Add a global message handler that receives all messages

Parameters:

handler (Callable) – Async function that takes a Message and returns None

Raises:

ValueError – If handler is None

Return type:

None

remove_message_handler(agent_id, handler)

Remove a message handler for a specific agent

Parameters:
  • agent_id (str) – The ID of the agent

  • handler (Callable) – The handler function to remove

Returns:

True if handler was removed, False if not found

Return type:

bool

remove_global_handler(handler)

Remove a global message handler

Parameters:

handler (Callable) – The handler function to remove

Returns:

True if handler was removed, False if not found

Return type:

bool

clear_agent_handlers(agent_id)

Clear all message handlers for a specific agent

Parameters:

agent_id (str) – The ID of the agent

Return type:

None

async register_agent(agent)

Register agent for active communication

Return type:

bool

Parameters:

agent (BaseAgent)

async unregister_agent(agent_id)

Unregister an agent from active communication

Return type:

bool

Parameters:

agent_id (str)

async route_message(message)

Route a message between agents without controlling agent behavior.

This method verifies message security, locates the receiver, delivers the message, and tracks it in message history. While it ensures delivery and tracks history, it does not dictate how agents respond to messages - each agent maintains its independence in processing and responding to messages.

Parameters:

message (Message) – The message to route

Return type:

bool

Returns:

True if message was successfully routed, False otherwise

async get_agent(agent_id)

Get an active agent by ID

Return type:

Optional[BaseAgent]

Parameters:

agent_id (str)

async get_all_agents()

Get all active agents

Returns:

List of all active agents

Return type:

List[BaseAgent]

Note

This method returns a copy of the active agents list to prevent external modification of the internal state.

async is_agent_active(agent_id)

Check if an agent is active

Return type:

bool

Parameters:

agent_id (str)

get_message_history()

Get message history

Return type:

List[Message]

async send_message_and_wait_response(sender_id, receiver_id, content, message_type=MessageType.REQUEST_COLLABORATION, metadata={}, timeout=60)

Send a message and wait for a response.

Note on timeout behavior: When a timeout occurs, this method returns None, but the future remains in pending_responses. If a response arrives after the timeout, it will still be processed correctly by the hub, but the original caller will have already moved on. Consider using a message queue library like asyncio-nats, aiozmq, or AgentForum for more robust agent communication.

Parameters:
  • sender_id (str) – The ID of the sender agent

  • receiver_id (str) – The ID of the receiver agent

  • content (str) – The message content

  • message_type (MessageType) – The message type

  • metadata (Optional[Dict]) – Additional metadata to include with the message

  • timeout (int) – Maximum time to wait for response in seconds

Return type:

Optional[Message]

Returns:

The response message, or None if no response was received

async send_collaboration_request(sender_id, receiver_id, task_description, timeout=60, **kwargs)

Facilitate a collaboration request between agents based on capabilities.

This method creates and routes a collaboration request without controlling the outcome. The receiving agent independently decides whether and how to fulfill the request based on its own capabilities and decision-making processes.

Parameters:
  • sender_id (str) – ID of the requesting agent

  • receiver_id (str) – ID of the agent being requested to collaborate

  • task_description (str) – Description of the task to be performed

  • timeout (int) – How long to wait for a response in seconds

  • **kwargs – Additional parameters for the collaboration request

Return type:

str

Returns:

The response content as a string, or an error message if the request failed

Raises: