agentconnect.communication package

Decentralized communication infrastructure for the AgentConnect framework.

This module provides tools for peer-to-peer agent communication, message routing, and protocol handling. It includes a message routing system that facilitates agent discovery and interaction without centralized control of agent behavior.

Key components:

  • CommunicationHub: Message routing and delivery system for peer-to-peer agent communication

  • Protocol implementations: Base protocol and specialized variants for different interaction types

class CommunicationHub(registry)

Bases: object

Message routing system that facilitates peer-to-peer agent communication.

The CommunicationHub is NOT a central controller of agent behavior, but rather:

  1. Routes messages between independent agents

  2. Facilitates agent discovery through registration

  3. Ensures secure message delivery without dictating responses

  4. Manages communication protocols for consistent messaging

  5. Tracks message history for auditability

Each agent connected to the hub maintains its autonomy and decision-making capability. The hub simply enables discovery and communication without controlling behavior.

Parameters:

registry (AgentRegistry)

add_global_handler(handler)

Add a global message handler that receives all messages

Parameters:

handler (Callable) – Async function that takes a Message and returns None

Raises:

ValueError – If handler is None

Return type:

None

add_message_handler(agent_id, handler)

Add a message handler for a specific agent.

Parameters:
  • agent_id (str) – The ID of the agent to handle messages for

  • handler (Callable[[Message], Awaitable[None]]) – Async function that takes a Message and returns None

Raises:

ValueError – If agent_id is None or empty, or if handler is None

Return type:

None

clear_agent_handlers(agent_id)

Clear all message handlers for a specific agent

Parameters:

agent_id (str) – The ID of the agent

Return type:

None

async get_agent(agent_id)

Get an active agent by ID

Return type:

Optional[BaseAgent]

Parameters:

agent_id (str)

async get_all_agents()

Get all active agents

Returns:

List of all active agents

Return type:

List[BaseAgent]

Note

This method returns a copy of the active agents list to prevent external modification of the internal state.

get_message_history()

Get message history

Return type:

List[Message]

async is_agent_active(agent_id)

Check if an agent is active

Return type:

bool

Parameters:

agent_id (str)

async register_agent(agent)

Register agent for active communication

Return type:

bool

Parameters:

agent (BaseAgent)

remove_global_handler(handler)

Remove a global message handler

Parameters:

handler (Callable) – The handler function to remove

Returns:

True if handler was removed, False if not found

Return type:

bool

remove_message_handler(agent_id, handler)

Remove a message handler for a specific agent

Parameters:
  • agent_id (str) – The ID of the agent

  • handler (Callable) – The handler function to remove

Returns:

True if handler was removed, False if not found

Return type:

bool

async route_message(message)

Route a message between agents without controlling agent behavior.

This method verifies message security, locates the receiver, delivers the message, and tracks it in message history. While it ensures delivery and tracks history, it does not dictate how agents respond to messages - each agent maintains its independence in processing and responding to messages.

Parameters:

message (Message) – The message to route

Return type:

bool

Returns:

True if message was successfully routed, False otherwise

async send_collaboration_request(sender_id, receiver_id, task_description, timeout=60, **kwargs)

Facilitate a collaboration request between agents based on capabilities.

This method creates and routes a collaboration request without controlling the outcome. The receiving agent independently decides whether and how to fulfill the request based on its own capabilities and decision-making processes.

Parameters:
  • sender_id (str) – ID of the requesting agent

  • receiver_id (str) – ID of the agent being requested to collaborate

  • task_description (str) – Description of the task to be performed

  • timeout (int) – How long to wait for a response in seconds

  • **kwargs – Additional parameters for the collaboration request

Return type:

str

Returns:

The response content as a string, or an error message if the request failed

Raises:
async send_message_and_wait_response(sender_id, receiver_id, content, message_type=MessageType.REQUEST_COLLABORATION, metadata={}, timeout=60)

Send a message and wait for a response.

Note on timeout behavior: When a timeout occurs, this method returns None, but the future remains in pending_responses. If a response arrives after the timeout, it will still be processed correctly by the hub, but the original caller will have already moved on. Consider using a message queue library like asyncio-nats, aiozmq, or AgentForum for more robust agent communication.

Parameters:
  • sender_id (str) – The ID of the sender agent

  • receiver_id (str) – The ID of the receiver agent

  • content (str) – The message content

  • message_type (MessageType) – The message type

  • metadata (Optional[Dict]) – Additional metadata to include with the message

  • timeout (int) – Maximum time to wait for response in seconds

Return type:

Optional[Message]

Returns:

The response message, or None if no response was received

async unregister_agent(agent_id)

Unregister an agent from active communication

Return type:

bool

Parameters:

agent_id (str)

Subpackages

Submodules