Custom Agent Example¶
Creating a Custom Agent¶
This example demonstrates how to create a custom agent by extending the base agent classes.
Implementing a Custom Agent¶
Here’s how to implement a custom agent:
from agentconnect.core.agent import BaseAgent
from agentconnect.core.types import AgentType, AgentIdentity, Capability, InteractionMode
from agentconnect.core.message import Message
from typing import Optional
import logging
class CustomAgent(BaseAgent):
"""A custom agent implementation with specialized behavior"""
def __init__(
self,
agent_id: str,
name: str,
identity: AgentIdentity,
special_feature: str,
organization_id: Optional[str] = None,
):
# Define agent capabilities
capabilities = [
Capability(
name="custom_capability",
description="A specialized capability unique to this agent",
input_schema={"input_data": "string"},
output_schema={"result": "string"},
)
]
# Initialize base agent
super().__init__(
agent_id=agent_id,
agent_type=AgentType.AI, # Or custom type if needed
identity=identity,
capabilities=capabilities,
interaction_modes=[InteractionMode.AGENT_TO_AGENT],
organization_id=organization_id,
)
self.name = name
self.special_feature = special_feature
self.logger = logging.getLogger(f"CustomAgent-{agent_id}")
async def process_message(self, message: Message) -> Optional[Message]:
"""Custom message processing logic"""
# First call the base implementation for common processing
response = await super().process_message(message)
if response:
return response
self.logger.info(f"Processing message with special feature: {self.special_feature}")
# Custom processing logic here
processed_content = f"Processed with {self.special_feature}: {message.content}"
# Create and return response
return Message.create(
sender_id=self.agent_id,
receiver_id=message.sender_id,
content=processed_content,
sender_identity=self.identity,
message_type=message.message_type,
)
Using the Custom Agent¶
Here’s how to use the custom agent:
import asyncio
from agentconnect.core.types import AgentIdentity
from agentconnect.core.message import Message
from agentconnect.core.types import MessageType
# Create a custom agent
custom_agent = CustomAgent(
agent_id="custom1",
name="CustomProcessor",
identity=AgentIdentity.create_key_based(),
special_feature="advanced_nlp",
organization_id="example_org",
)
# Create another agent to interact with the custom agent
regular_agent = AIAgent(
agent_id="regular1",
name="RegularAgent",
provider_type=ModelProvider.OPENAI,
model_name=ModelName.GPT4O,
api_key=os.getenv("OPENAI_API_KEY"),
identity=AgentIdentity.create_key_based(),
organization_id="example_org",
)
# Send a message from regular agent to custom agent
message = Message.create(
sender_id=regular_agent.agent_id,
receiver_id=custom_agent.agent_id,
content="Please process this text using your special capabilities",
sender_identity=regular_agent.identity,
message_type=MessageType.TEXT,
)
# Process the message
response = await custom_agent.process_message(message)
print(f"Response from custom agent: {response.content}")
Integrating with Communication Hub¶
Here’s how to integrate the custom agent with the communication hub:
import asyncio
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
# Create registry and hub
registry = AgentRegistry()
hub = CommunicationHub(registry)
# Create message handler for tracking communication
async def message_handler(message: Message) -> None:
print(f"Message to custom agent: {message.content[:50]}...")
# Create and register agents
custom_agent = CustomAgent(
agent_id="custom_agent",
name="CustomAgent",
identity=AgentIdentity.create_key_based(),
special_feature="data_transformation",
organization_id="example_org",
)
regular_agent = AIAgent(
agent_id="regular_agent",
name="RegularAgent",
provider_type=ModelProvider.GOOGLE,
model_name=ModelName.GEMINI2_FLASH,
api_key=os.getenv("GOOGLE_API_KEY"),
identity=AgentIdentity.create_key_based(),
organization_id="example_org",
interaction_modes=[InteractionMode.AGENT_TO_AGENT],
)
# Register agents with the hub
await hub.register_agent(custom_agent)
await hub.register_agent(regular_agent)
# Add message handler to track communication with custom agent
hub.add_message_handler("custom_agent", message_handler)
# Send collaboration request
result = await hub.send_collaboration_request(
sender_id="regular_agent",
receiver_id="custom_agent",
task_description="Transform this dataset using your custom capability",
timeout=30,
)
print(f"Collaboration result: {result}")
Advanced Custom Agent Features¶
Here are some advanced features you can implement in your custom agent:
import os
import json
from typing import Dict, Any, List
class AdvancedCustomAgent(BaseAgent):
"""Advanced custom agent with memory and state management"""
def __init__(self, agent_id: str, name: str, identity: AgentIdentity, **kwargs):
super().__init__(agent_id=agent_id, identity=identity, **kwargs)
self.name = name
self.memory: Dict[str, Any] = {}
self.conversation_history: List[Dict[str, str]] = []
async def process_message(self, message: Message) -> Optional[Message]:
# Track conversation history
self.conversation_history.append({
"sender": message.sender_id,
"content": message.content,
"timestamp": str(datetime.now())
})
# Custom processing using memory
if "remember" in message.content.lower():
# Extract what to remember
key_value = self._extract_memory_request(message.content)
if key_value:
key, value = key_value
self.memory[key] = value
return Message.create(
sender_id=self.agent_id,
receiver_id=message.sender_id,
content=f"I've remembered that {key} is {value}",
sender_identity=self.identity,
message_type=MessageType.TEXT,
)
if "recall" in message.content.lower():
# Extract what to recall
key = self._extract_recall_request(message.content)
if key and key in self.memory:
return Message.create(
sender_id=self.agent_id,
receiver_id=message.sender_id,
content=f"You asked me to recall {key}, it's {self.memory[key]}",
sender_identity=self.identity,
message_type=MessageType.TEXT,
)
# Fall back to regular processing
return await super().process_message(message)
def _extract_memory_request(self, content: str) -> Optional[tuple]:
# Simple parsing logic - in a real agent you might use NLP
if "remember that" in content.lower():
parts = content.lower().split("remember that", 1)[1].strip()
if " is " in parts:
key, value = parts.split(" is ", 1)
return (key.strip(), value.strip())
return None
def _extract_recall_request(self, content: str) -> Optional[str]:
# Simple parsing logic
if "recall" in content.lower():
parts = content.lower().split("recall", 1)[1].strip()
return parts.strip()
return None
def save_state(self, filepath: str) -> None:
"""Save agent memory and history to file"""
state = {
"agent_id": self.agent_id,
"memory": self.memory,
"conversation_history": self.conversation_history
}
with open(filepath, 'w') as f:
json.dump(state, f, indent=2)
def load_state(self, filepath: str) -> None:
"""Load agent memory and history from file"""
if os.path.exists(filepath):
with open(filepath, 'r') as f:
state = json.load(f)
self.memory = state.get("memory", {})
self.conversation_history = state.get("conversation_history", [])