Advanced Agent Example

Creating Advanced Agents with Custom Tools

This example demonstrates how to create advanced AI agents with custom tools and specialized prompts.

Custom Tools with PromptTools

AgentConnect allows you to extend agent capabilities with custom tools:

import os
import asyncio
from typing import Dict, Any, List, Optional, Type, TypeVar, Callable, Awaitable
from dotenv import load_dotenv
from langchain_core.tools import BaseTool, StructuredTool
from pydantic import BaseModel, Field

from agentconnect.agents.ai_agent import AIAgent
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
from agentconnect.core.types import (
    ModelProvider,
    ModelName,
    AgentIdentity,
    InteractionMode,
    MessageType,
    Capability
)
from agentconnect.prompts.tools import PromptTools
from agentconnect.prompts.templates.prompt_templates import PromptTemplates, SystemPromptConfig

# Load environment variables
load_dotenv()

# Define a schema for our custom tool
class WeatherLookupInput(BaseModel):
    """Input for weather lookup tool."""

    location: str = Field(description="The city or location to check weather for")
    units: str = Field(description="Temperature units (celsius/fahrenheit)", default="celsius")

class WeatherLookupOutput(BaseModel):
    """Output for weather lookup tool."""

    temperature: float = Field(description="Current temperature")
    condition: str = Field(description="Weather condition (sunny, cloudy, etc.)")
    humidity: float = Field(description="Humidity percentage")

# Create a registry and hub
registry = AgentRegistry()
hub = CommunicationHub(registry)

# Set up the PromptTools instance with our registry and hub
prompt_tools = PromptTools(agent_registry=registry, communication_hub=hub)

# Define our custom tool function
def weather_lookup(location: str, units: str = "celsius") -> Dict[str, Any]:
    """
    Look up the current weather for a location.

    Args:
        location: City or location to check
        units: Temperature units (celsius/fahrenheit)

    Returns:
        Dictionary with weather information
    """
    # In a real implementation, this would call a weather API
    # This is a mock implementation for demonstration
    weather_data = {
        "New York": {"temperature": 22.5, "condition": "Partly Cloudy", "humidity": 65.0},
        "London": {"temperature": 18.0, "condition": "Rainy", "humidity": 80.0},
        "Tokyo": {"temperature": 27.0, "condition": "Sunny", "humidity": 70.0},
        "Sydney": {"temperature": 24.5, "condition": "Clear", "humidity": 55.0},
    }

    # Default to a generic response if location not found
    result = weather_data.get(
        location,
        {"temperature": 20.0, "condition": "Unknown", "humidity": 60.0}
    )

    # Convert temperature if needed
    if units.lower() == "fahrenheit":
        result["temperature"] = (result["temperature"] * 9/5) + 32

    return result

# Create the asynchronous version of our tool
async def weather_lookup_async(location: str, units: str = "celsius") -> Dict[str, Any]:
    """Async version of the weather lookup tool."""
    return weather_lookup(location, units)

# Register our custom tool with PromptTools
T = TypeVar('T', bound=BaseModel)

weather_tool = prompt_tools.create_tool_from_function(
    func=weather_lookup,
    name="weather_lookup",
    description="Get current weather information for a location",
    args_schema=WeatherLookupInput,
    category="weather",
    coroutine=weather_lookup_async
)

Creating an Agent with Custom Tools

Now we’ll create an agent that can use our custom tool:

# Create an agent with our custom tools
weather_agent = AIAgent(
    agent_id="weather_assistant",
    name="Weather Assistant",
    provider_type=ModelProvider.GOOGLE,  # Or your preferred provider
    model_name=ModelName.GEMINI2_FLASH,  # Or your preferred model
    api_key=os.getenv("GOOGLE_API_KEY"),
    identity=AgentIdentity.create_key_based(),
    capabilities=[
        Capability(
            name="weather_forecasting",
            description="Can provide weather information for locations worldwide",
            input_schema={"location": "string", "units": "string"},
            output_schema={"forecast": "string"}
        )
    ],
    personality="helpful weather expert",
    organization_id="example_org",
    interaction_modes=[InteractionMode.HUMAN_TO_AGENT, InteractionMode.AGENT_TO_AGENT],
    prompt_tools=prompt_tools,  # Pass our customized PromptTools instance
    # Pass our custom tool in the custom_tools list
    custom_tools=[weather_tool],
)

# Register the agent with the hub
async def setup_agent():
    await hub.register_agent(weather_agent)
    print(f"Registered weather agent with custom tools")

# Run the setup
asyncio.run(setup_agent())

Using Custom Prompt Templates

You can also customize the agent’s behavior with specialized prompt templates:

from agentconnect.prompts.templates.prompt_templates import (
    PromptTemplates,
    SystemPromptConfig,
    PromptType
)

# Create custom prompt templates for our weather agent
prompt_templates = PromptTemplates()

# Create a specialized system prompt config
system_config = SystemPromptConfig(
    name="Weather Expert",
    capabilities=weather_agent.capabilities,
    personality="expert meteorologist who explains weather patterns clearly",
    temperature=0.3,  # Lower temperature for more precise answers
    additional_context={
        "expertise": "Weather forecasting and climate patterns",
        "data_sources": "Multiple international weather services",
        "specialty": "Translating complex weather data into understandable explanations"
    }
)

# Create a custom chat prompt template
custom_prompt = prompt_templates.create_prompt(
    prompt_type=PromptType.SYSTEM,
    config=system_config,
    include_history=True
)

# Update our agent with the custom prompt templates
weather_agent.prompt_templates = prompt_templates

Using Agent Workflows and Prompt Systems

AgentConnect provides powerful workflow capabilities to control agent behavior:

from agentconnect.prompts.agent_prompts import (
    AgentWorkflow,
    WorkflowState,
    AgentMode
)

# Create a custom workflow that specializes in weather analysis
class WeatherAnalysisWorkflow(AgentWorkflow):
    """Specialized workflow for weather analysis."""

    def __init__(
        self,
        agent_id: str,
        system_prompt_config: SystemPromptConfig,
        llm,
        tools: PromptTools,
        prompt_templates: PromptTemplates,
        custom_tools: Optional[List[BaseTool]] = None,
    ):
        super().__init__(
            agent_id=agent_id,
            llm=llm,
            tools=tools,
            prompt_templates=prompt_templates,
            custom_tools=custom_tools,
        )

        self.system_prompt_config = system_prompt_config

    def build_workflow(self):
        """Build a custom workflow for weather analysis."""
        workflow = super().build_workflow()

        # Here we could add custom nodes and edges to the workflow
        # For example, specialized error handling for weather data

        return workflow

# To use this custom workflow, you would modify the AIAgent initialization:
# weather_agent.workflow_agent_type = "weather_analysis"
# Then register a factory function for creating this workflow type

# Create a message handler for our weather agent
async def weather_message_handler(message):
    print(f"Weather agent received: {message.content[:50]}...")

    # Add specialized processing for weather queries
    if "forecast" in message.content.lower():
        print("Forecast request detected! Prioritizing...")

# Add the message handler to the hub
hub.add_message_handler("weather_assistant", weather_message_handler)

Complete Example with Task Decomposition Tool

Here’s a complete example that combines custom tools with task decomposition:

import os
import asyncio
from dotenv import load_dotenv
from langchain_core.tools import BaseTool
from typing import Dict, Any, List, Optional

from agentconnect.agents.ai_agent import AIAgent
from agentconnect.agents.human_agent import HumanAgent
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
from agentconnect.core.types import (
    ModelProvider,
    ModelName,
    AgentIdentity,
    InteractionMode,
    Capability
)
from agentconnect.prompts.tools import PromptTools, Subtask
from agentconnect.prompts.templates.prompt_templates import PromptTemplates, SystemPromptConfig

async def run_advanced_agent_example():
    # Load environment variables
    load_dotenv()

    # Create registry and hub
    registry = AgentRegistry()
    hub = CommunicationHub(registry)

    # Create prompt tools with custom tools
    prompt_tools = PromptTools(agent_registry=registry, communication_hub=hub)

    # Define our custom data analysis tool function
    def analyze_weather_data(
        data: str,
        analysis_type: str = "trends"
    ) -> Dict[str, Any]:
        """
        Analyze weather data and extract insights.

        Args:
            data: Weather data in text format
            analysis_type: Type of analysis (trends, anomalies, forecast)

        Returns:
            Dictionary with analysis results
        """
        # In a real implementation, this would perform actual data analysis
        # This is a mock implementation for demonstration

        analysis_results = {
            "trends": {
                "summary": "Temperatures are trending 2°C higher than seasonal average",
                "confidence": 0.89,
                "key_points": ["Rising humidity levels", "Consistent pressure patterns"],
            },
            "anomalies": {
                "summary": "Detected unusual wind pattern shifts",
                "confidence": 0.76,
                "key_points": ["Rapid pressure changes", "Unseasonable precipitation"],
            },
            "forecast": {
                "summary": "Expect continued warming with periodic precipitation",
                "confidence": 0.82,
                "key_points": ["Temperature peaks mid-week", "Weekend cooling trend"],
            }
        }

        # Return the appropriate analysis or a default
        return analysis_results.get(
            analysis_type,
            {"summary": "Basic analysis completed", "confidence": 0.5, "key_points": []}
        )

    # Create the asynchronous version
    async def analyze_weather_data_async(
        data: str,
        analysis_type: str = "trends"
    ) -> Dict[str, Any]:
        return analyze_weather_data(data, analysis_type)

    # Register our custom analysis tool
    analysis_tool = prompt_tools.create_tool_from_function(
        func=analyze_weather_data,
        name="analyze_weather_data",
        description="Analyze weather data and extract insights",
        args_schema=type('AnalysisInput', (BaseModel,), {
            "data": (str, Field(description="Weather data to analyze")),
            "analysis_type": (str, Field(description="Type of analysis to perform"))
        }),
        category="analysis",
        coroutine=analyze_weather_data_async
    )

    # Create an advanced AI agent with custom tools
    advanced_agent = AIAgent(
        agent_id="weather_expert",
        name="Advanced Weather Expert",
        provider_type=ModelProvider.OPENAI,
        model_name=ModelName.GPT4O,
        api_key=os.getenv("OPENAI_API_KEY"),
        identity=AgentIdentity.create_key_based(),
        capabilities=[
            Capability(
                name="weather_analysis",
                description="Advanced analysis of weather patterns and data",
                input_schema={"data": "string", "location": "string"},
                output_schema={"analysis": "string", "recommendations": "string"}
            ),
            Capability(
                name="task_management",
                description="Can break down complex weather-related requests into subtasks",
                input_schema={"request": "string"},
                output_schema={"subtasks": "array"}
            )
        ],
        personality="methodical and detail-oriented weather scientist",
        organization_id="example_org",
        interaction_modes=[InteractionMode.HUMAN_TO_AGENT, InteractionMode.AGENT_TO_AGENT],
        prompt_tools=prompt_tools,
        custom_tools=[analysis_tool],  # Include our custom analysis tool
    )

    # Create a custom message handler for task decomposition
    async def task_decomposition_handler(message):
        if "analyze" in message.content.lower() and "weather" in message.content.lower():
            # This is a complex weather analysis task - decompose it
            decomposition_result = await prompt_tools.create_task_decomposition_tool().acoroutine(
                task_description=message.content,
                max_subtasks=3
            )

            print("Task decomposed into subtasks:")
            for idx, subtask in enumerate(decomposition_result.get("subtasks", [])):
                print(f"  {idx+1}. {subtask.get('title')}: {subtask.get('description')}")

    # Register the agent and add the message handler
    await hub.register_agent(advanced_agent)
    hub.add_message_handler("weather_expert", task_decomposition_handler)

    # Start the agent's processing
    agent_task = asyncio.create_task(advanced_agent.run())

    # Create a human agent for interaction
    human = HumanAgent(
        agent_id="user",
        name="Example User",
        identity=AgentIdentity.create_key_based(),
        organization_id="example_org",
    )

    await hub.register_agent(human)

    # Simulate a complex weather analysis request
    await human.send_message(
        "weather_expert",
        "Please analyze the recent weather patterns in the Northeastern United States "
        "and provide insights on how they compare to historical data. Also suggest "
        "what this might indicate for agricultural planning in the region."
    )

    # Allow time for processing and task decomposition
    await asyncio.sleep(10)

    # Clean up
    advanced_agent.is_running = False
    await agent_task
    await hub.unregister_agent(advanced_agent.agent_id)
    await hub.unregister_agent(human.agent_id)

if __name__ == "__main__":
    asyncio.run(run_advanced_agent_example())

Creating Custom Tool Registry and Workflow

For even more advanced use cases, you can create a custom tool registry:

from agentconnect.prompts.tools import ToolRegistry
from langchain_core.tools import BaseTool

# Create a custom tool registry
custom_registry = ToolRegistry()

# Define a simple custom tool
def generate_weather_report(location: str, time_period: str = "today") -> str:
    """Generate a weather report for a location."""
    reports = {
        "New York": {
            "today": "Sunny with a high of 75°F, light winds from the west.",
            "tomorrow": "Partly cloudy with a chance of afternoon showers, high of 72°F.",
            "week": "Mostly sunny throughout the week with temperatures between 70-80°F."
        },
        "London": {
            "today": "Overcast with light rain, high of 18°C, moderate humidity.",
            "tomorrow": "Continued light rain with fog in the morning, high of 17°C.",
            "week": "Clearing by mid-week with temperatures around 16-20°C."
        }
    }

    # Get the report for the location and time period or return a default
    location_reports = reports.get(location, {})
    return location_reports.get(
        time_period,
        f"Weather report for {location} ({time_period}): Generally favorable conditions."
    )

# Create a LangChain-compatible tool
from langchain.tools import Tool

report_tool = Tool.from_function(
    func=generate_weather_report,
    name="generate_weather_report",
    description="Generates a weather report for a specific location and time period",
)

# Register the tool with our custom registry
custom_registry.register_tool(report_tool)

# Now we can use this tool registry with our agents or workflows
# And access tools by name or category
weather_tools = custom_registry.get_tools_by_category("weather")

# Get a specific tool by name
report_tool = custom_registry.get_tool("generate_weather_report")

LangChain and LangGraph Integration

AgentConnect offers full compatibility with LangChain v0.3.x and LangGraph, allowing you to directly use their powerful tools and agents within the AgentConnect framework:

import os
import asyncio
from typing import List, Dict, Any
from dotenv import load_dotenv

# LangChain imports
from langchain_core.tools import BaseTool, StructuredTool, Tool
from langchain_openai import ChatOpenAI
from langchain.agents import tool
from langchain_community.utilities.tavily_search import TavilySearchAPIWrapper
from langchain_community.tools.tavily_search import TavilySearchResults

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver

# AgentConnect imports
from agentconnect.agents.ai_agent import AIAgent
from agentconnect.core.registry import AgentRegistry
from agentconnect.communication.hub import CommunicationHub
from agentconnect.core.types import (
    ModelProvider,
    ModelName,
    AgentIdentity,
    InteractionMode,
    Capability
)

# Load environment variables
load_dotenv()

# Initialize registry and hub
registry = AgentRegistry()
hub = CommunicationHub(registry)

# Create LangChain tools
# 1. Using the @tool decorator (langchain v0.3.x style)
@tool
def calculate_risk_score(market_data: str, risk_factors: List[str]) -> Dict[str, Any]:
    """Calculate investment risk score based on market data and risk factors."""
    # Simple mock implementation for demonstration
    risk_score = len(risk_factors) * 10
    return {
        "risk_score": risk_score,
        "risk_level": "high" if risk_score > 70 else "medium" if risk_score > 40 else "low",
        "recommendation": "Diversify" if risk_score > 70 else "Hold" if risk_score > 40 else "Invest"
    }

# 2. Using TavilySearchResults - a real external tool from LangChain
search_tool = TavilySearchResults(
    api_key=os.getenv("TAVILY_API_KEY", "your-tavily-api-key"),
    max_results=3
)

# Create an AIAgent that uses LangChain tools
langchain_agent = AIAgent(
    agent_id="investment_advisor",
    name="Investment Advisor",
    provider_type=ModelProvider.OPENAI,
    model_name=ModelName.GPT4O,
    api_key=os.getenv("OPENAI_API_KEY"),
    identity=AgentIdentity.create_key_based(),
    capabilities=[
        Capability(
            name="investment_advice",
            description="Provides investment advice based on market conditions",
            input_schema={"query": "string", "risk_profile": "string"},
            output_schema={"advice": "string", "risk_analysis": "string"}
        )
    ],
    personality="cautious and data-driven financial advisor",
    organization_id="example_org",
    interaction_modes=[InteractionMode.HUMAN_TO_AGENT],
    # Include LangChain tools directly in the custom_tools parameter
    custom_tools=[calculate_risk_score, search_tool],
)

# Register the agent
async def setup_langchain_agent():
    await hub.register_agent(langchain_agent)
    print("Registered investment advisor with LangChain tools")

# Run the setup
asyncio.run(setup_langchain_agent())

Creating Advanced Workflows with LangGraph

You can also use LangGraph for complex agent workflows within AgentConnect:

from typing import TypedDict, Sequence, Annotated
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages

# Define a state for our custom workflow
class InvestmentState(TypedDict):
    """State for the investment advisor workflow."""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    market_data: dict
    risk_profile: str
    recommendations: list
    current_step: str

# Create a custom workflow for investment advice
def create_investment_workflow(llm):
    # Create nodes for the workflow
    def get_market_data(state: InvestmentState) -> InvestmentState:
        """Get current market data."""
        # In a real implementation, this would call an API or database
        state["market_data"] = {
            "sp500": 4780.5,
            "nasdaq": 16950.2,
            "volatility_index": 18.2,
            "treasury_yield": 4.1,
            "sector_momentum": {
                "tech": "positive",
                "healthcare": "neutral",
                "energy": "negative",
                "financials": "positive",
            },
        }
        state["current_step"] = "analyze_risk"
        return state

    def analyze_risk(state: InvestmentState) -> InvestmentState:
        """Analyze risk based on market data and profile."""
        risk_score = 0

        # Simple risk calculation based on market data and profile
        if state["risk_profile"] == "conservative":
            risk_tolerance = 30
        elif state["risk_profile"] == "moderate":
            risk_tolerance = 60
        else:  # aggressive
            risk_tolerance = 90

        volatility = state["market_data"]["volatility_index"]

        # Basic risk calculation
        if volatility > 25:
            risk_score = 80
        elif volatility > 15:
            risk_score = 50
        else:
            risk_score = 30

        # Adjust based on profile
        adjusted_risk = min(100, risk_score * (risk_tolerance / 60))

        state["risk_analysis"] = {
            "score": adjusted_risk,
            "level": "high" if adjusted_risk > 70 else "medium" if adjusted_risk > 40 else "low",
        }

        state["current_step"] = "generate_recommendations"
        return state

    def generate_recommendations(state: InvestmentState) -> InvestmentState:
        """Generate investment recommendations."""
        # This would use the LLM in a real implementation
        risk_level = state["risk_analysis"]["level"]
        sector_momentum = state["market_data"]["sector_momentum"]

        recommendations = []
        if risk_level == "low":
            recommendations.append("Consider Treasury bonds with current yield of {:.1f}%".format(
                state["market_data"]["treasury_yield"]
            ))
            # Add more conservative recommendations...
        elif risk_level == "medium":
            # Add balanced recommendations...
            for sector, momentum in sector_momentum.items():
                if momentum == "positive":
                    recommendations.append(f"Consider moderate exposure to {sector} sector ETFs")
        else:  # high
            # Add aggressive recommendations...
            for sector, momentum in sector_momentum.items():
                if momentum == "positive":
                    recommendations.append(f"Consider significant exposure to {sector} sector individual stocks")

        state["recommendations"] = recommendations
        state["current_step"] = "summarize"
        return state

    def summarize(state: InvestmentState) -> InvestmentState:
        """Summarize the analysis and recommendations."""
        # Here we could use the LLM to generate a natural language summary
        summary = (
            f"Based on current market conditions and your {state['risk_profile']} risk profile, "
            f"your risk level is {state['risk_analysis']['level']} "
            f"with a score of {state['risk_analysis']['score']:.1f}/100.\n\n"
            "Recommendations:\n"
        )

        for rec in state["recommendations"]:
            summary += f"- {rec}\n"

        # Add the summary as an AI message
        state["messages"].append(AIMessage(content=summary))
        state["current_step"] = "complete"
        return state

    # Create the state graph
    workflow = StateGraph(InvestmentState)

    # Add nodes
    workflow.add_node("get_market_data", get_market_data)
    workflow.add_node("analyze_risk", analyze_risk)
    workflow.add_node("generate_recommendations", generate_recommendations)
    workflow.add_node("summarize", summarize)

    # Add edges
    workflow.add_edge("get_market_data", "analyze_risk")
    workflow.add_edge("analyze_risk", "generate_recommendations")
    workflow.add_edge("generate_recommendations", "summarize")
    workflow.add_edge("summarize", END)

    # Set the entry point
    workflow.set_entry_point("get_market_data")

    # Compile the workflow
    return workflow.compile()

# Create an AIAgent that uses a LangGraph workflow
async def create_langgraph_agent():
    # Create an OpenAI LLM
    llm = ChatOpenAI(
        api_key=os.getenv("OPENAI_API_KEY"),
        model_name="gpt-4o",
        temperature=0.2
    )

    # Create the investment workflow
    investment_workflow = create_investment_workflow(llm)

    # Create a custom agent that will use this workflow
    portfolio_advisor = AIAgent(
        agent_id="portfolio_advisor",
        name="Portfolio Advisor",
        provider_type=ModelProvider.OPENAI,
        model_name=ModelName.GPT4O,
        api_key=os.getenv("OPENAI_API_KEY"),
        identity=AgentIdentity.create_key_based(),
        capabilities=[
            Capability(
                name="portfolio_management",
                description="Creates personalized investment portfolios",
                input_schema={"risk_profile": "string", "goals": "string"},
                output_schema={"portfolio": "string", "rationale": "string"}
            )
        ],
        personality="methodical and data-driven investment advisor",
        organization_id="example_org",
        interaction_modes=[InteractionMode.HUMAN_TO_AGENT],
    )

    # Register the agent
    await hub.register_agent(portfolio_advisor)

    # In a real implementation, you would set up a message handler
    # that invokes the LangGraph workflow when appropriate

    async def portfolio_message_handler(message):
        if "portfolio" in message.content.lower() or "invest" in message.content.lower():
            # Initialize the workflow state
            initial_state = {
                "messages": [HumanMessage(content=message.content)],
                "market_data": {},
                "risk_profile": "moderate",  # Extract this from message in real implementation
                "recommendations": [],
                "current_step": "start"
            }

            # Run the workflow
            result = investment_workflow.invoke(initial_state)

            # Extract the response from the workflow
            response_content = result["messages"][-1].content

            print(f"Portfolio recommendation: {response_content[:100]}...")

            # Here you would send the response back to the user

    # Add the message handler
    hub.add_message_handler("portfolio_advisor", portfolio_message_handler)

    print("Registered portfolio advisor with LangGraph workflow")

# Run the setup
asyncio.run(create_langgraph_agent())