OmniCoreAgent

AgentiPy provides seamless integration with the OmniAgent framework, allowing you to create AI agents that can interact with the Solana blockchain through natural language interfaces. This integration enables developers to build sophisticated AI-driven blockchain applications with conversational capabilities, real-time event streaming, and robust memory management.

🚀 Quick Start

import asyncio
from mcpomni_connect.omni_agent import OmniAgent
from mcpomni_connect.memory_store.memory_router import MemoryRouter
from mcpomni_connect.events.event_router import EventRouter
from mcpomni_connect.agents.tools.local_tools_registry import ToolRegistry
from agentipy.agent import SolanaAgentKit
from agentipy.tools.get_balance import BalanceFetcher
from agentipy.tools.transfer import TokenTransferManager
from dotenv import load_dotenv
from typing import Optional
import os

load_dotenv()

# Initialize Solana agent
solana_agent = SolanaAgentKit(
    private_key=os.getenv("SOLANA_PRIVATE_KEY"),
    rpc_url="https://api.mainnet-beta.solana.com"
)

# Create tool registry
async def create_tool_registry() -> ToolRegistry:
    """Create a tool registry with Solana operations."""
    tool_registry = ToolRegistry()

    @tool_registry.register_tool("get_balance_solana")
    async def get_balance(address: Optional[str] = None) -> str:
        """Get the balance of a Solana address."""
        try:
            balance_sol = await BalanceFetcher.get_balance(
                solana_agent, 
                token_address=address
            )
            return {"status": "success", "data": balance_sol}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    @tool_registry.register_tool("transfer_solana")
    async def transfer_solana(address: str, amount: float) -> str:
        """Transfer SOL to a Solana address."""
        try:
            sig = await TokenTransferManager.transfer(
                solana_agent, 
                to=address, 
                amount=amount
            )
            return {
                "status": "success",
                "message": "SOL transferred successfully",
                "signature": sig
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    return tool_registry

# Initialize memory and event systems
memory_store = MemoryRouter(memory_store_type="in_memory")
event_router = EventRouter(event_store_type="in_memory")

# Create the OmniAgent
async def create_agent():
    tool_registry = await create_tool_registry()
    
    agent = OmniAgent(
        name="solana_agent",
        system_instruction="""
You are a Solana agent with the ability to check balances and transfer SOL.

CRITICAL RULES:
- Always check balance before transfers using get_balance_solana
- Verify sufficient funds before calling transfer_solana
- Only use available tools in the registry
        """,
        model_config={
            "provider": "openai",
            "model": "gpt-4.1",
            "temperature": 0.7,
            "max_context_length": 50000,
        },
        local_tools=tool_registry,
        agent_config={
            "max_steps": 15,
            "tool_call_timeout": 60,
            "request_limit": 1000,
            "memory_config": {"mode": "token_budget", "value": 10000},
        },
        memory_store=memory_store,
        event_router=event_router,
        debug=True,
    )
    
    return agent

# Run the agent
async def main():
    agent = await create_agent()
    
    # Check balance
    response = await agent.run(
        "What's my SOL balance?",
        session_id="session_001"
    )
    print(response)
    
    # Transfer SOL
    response = await agent.run(
        "Transfer 0.5 SOL to <address>",
        session_id="session_001"
    )
    print(response)

asyncio.run(main())

🛠️ Key Components

1

Tool Registry

The ToolRegistry is the foundation of your agent's capabilities. It manages all the tools (functions) that your AI agent can execute.

tool_registry = ToolRegistry()

@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
    """Tool description that the AI will use to understand when to call this."""
    # Your implementation
    return {"status": "success", "data": result}

Key Features:

  • Automatic function registration

  • Type hints for parameter validation

  • Docstrings used by AI for tool selection

  • Async/await support

2

OmniAgent Configuration

The OmniAgent is the core component that orchestrates AI reasoning, tool execution, and memory management.

agent = OmniAgent(
    name="agent_name",
    system_instruction="Detailed instructions for the AI...",
    model_config={
        "provider": "openai",  # or "anthropic", "google"
        "model": "gpt-4.1",
        "temperature": 0.7,
        "max_context_length": 50000,
    },
    local_tools=tool_registry,
    agent_config={
        "max_steps": 15,              # Maximum reasoning steps
        "tool_call_timeout": 60,       # Timeout per tool call
        "request_limit": 1000,         # Rate limiting
        "memory_config": {
            "mode": "token_budget",    # or "sliding_window"
            "value": 10000
        },
    },
    memory_store=memory_store,
    event_router=event_router,
    debug=True,
)

Configuration Options:

Parameter
Description
Default

name

Agent identifier

Required

system_instruction

AI behavior guidelines

Required

model_config

LLM configuration

Required

local_tools

Tool registry

None

agent_config

Execution settings

{}

memory_store

Conversation memory

None

event_router

Event tracking

None

debug

Enable debug logging

False

3

Memory Management

The memory system maintains conversation context across interactions:

# In-memory storage (development)
memory_store = MemoryRouter(memory_store_type="in_memory")

# Persistent storage (production)
memory_store = MemoryRouter(
    memory_store_type="redis",
    redis_config={
        "host": "localhost",
        "port": 6379,
        "db": 0
    }
)

Memory Modes:

  • token_budget: Maintains last N tokens of conversation

  • sliding_window: Keeps last N messages

4

Event Streaming

Events provide real-time visibility into agent operations:

event_router = EventRouter(event_store_type="in_memory")

# Stream events from a session
async for event in agent.stream_events(session_id):
    print(f"Event: {event.type}")
    print(f"Data: {event.data}")

Event Types:

  • agent.start - Agent begins processing

  • tool.call - Tool execution started

  • tool.result - Tool execution completed

  • agent.response - Agent generates response

  • agent.error - Error occurred

🌐 FastAPI Integration

Create a production-ready API service:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize agent on startup
    app.state.agent = await create_agent()
    yield
    # Cleanup on shutdown
    await app.state.agent.cleanup()

app = FastAPI(lifespan=lifespan)

@app.post("/chat")
async def chat(message: str, session_id: str = None):
    agent = app.state.agent
    response = await agent.run(message, session_id)
    return response

@app.get("/events/{session_id}")
async def get_events(session_id: str):
    agent = app.state.agent
    async def event_generator():
        try:
            async for event in agent.stream_events(session_id):
                yield f"event: {event.type}\ndata: {event.json()}\n\n"
        except Exception as e:
            yield f"error: {str(e)}\n\n"
    
    return StreamingResponse(
        event_generator(), 
        media_type="text/event-stream"
    )

📋 Available Tools

Get Balance

Check the SOL balance of any Solana address:

@tool_registry.register_tool("get_balance_solana")
async def get_balance(address: Optional[str] = None) -> dict:
    """
    Get the balance of a Solana address.
    If no address is provided, returns the agent's wallet balance.
    
    Args:
        address: Optional Solana address to check
        
    Returns:
        Dictionary with status and balance data
    """
    balance = await BalanceFetcher.get_balance(
        solana_agent, 
        token_address=address
    )
    return {"status": "success", "data": balance}

Transfer SOL

Send SOL to any Solana address:

@tool_registry.register_tool("transfer_solana")
async def transfer_solana(address: str, amount: float) -> dict:
    """
    Transfer SOL to a Solana address.
    
    Args:
        address: Destination Solana address
        amount: Amount of SOL to transfer
        
    Returns:
        Dictionary with status and transaction signature
    """
    signature = await TokenTransferManager.transfer(
        solana_agent,
        to=address,
        amount=amount
    )
    return {
        "status": "success",
        "signature": signature
    }

🔒 Best Practices

1

Balance Verification

Always check balance before transfers:

system_instruction = """
CRITICAL: Before ANY SOL transfer:
1. Call get_balance_solana to check current balance
2. Compare balance with requested transfer amount
3. Only proceed if balance >= transfer amount
4. Inform user if insufficient funds
"""
2

Error Handling

Implement robust error handling in tools:

@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
    try:
        result = await perform_operation(param)
        return {"status": "success", "data": result}
    except ValueError as e:
        return {"status": "error", "message": f"Invalid input: {str(e)}"}
    except Exception as e:
        return {"status": "error", "message": f"Operation failed: {str(e)}"}
3

Environment Variables

Store sensitive data securely:

# .env file
SOLANA_PRIVATE_KEY=your_private_key_here
OPENAI_API_KEY=your_openai_key_here
from dotenv import load_dotenv
import os

load_dotenv()

private_key = os.getenv("SOLANA_PRIVATE_KEY")
4

Session Management

Use session IDs for context preservation:

# Same session maintains conversation context
response1 = await agent.run("Check my balance", session_id="user_123")
response2 = await agent.run("Transfer 0.5 SOL to <address>", session_id="user_123")

🎯 Use Cases

Conversational Wallet

# Natural language interactions
"What's my current SOL balance?"
"Send 0.5 SOL to ABC123..."
"How much SOL do I have left after that transfer?"

Automated Trading Bot

# Conditional transfers based on balance
"If my balance is above 10 SOL, transfer 5 SOL to the treasury address"

Multi-User Support

# Different sessions for different users
await agent.run(message, session_id=f"user_{user_id}")

🐛 Debugging

Enable debug mode for detailed logging:

agent = OmniAgent(
    # ... other config
    debug=True
)

This will log:

  • Tool selection reasoning

  • Parameter extraction

  • Execution results

  • Error traces

📚 API Reference

OmniAgent.run()

Execute a user message and return response.

async def run(
    self,
    message: str,
    session_id: Optional[str] = None
) -> dict:
    """
    Process a user message and return the agent's response.
    
    Args:
        message: User input message
        session_id: Optional session identifier for context
        
    Returns:
        Dictionary containing agent response and metadata
    """

OmniAgent.stream_events()

Stream real-time events from a session.

async def stream_events(
    self,
    session_id: str
) -> AsyncIterator[Event]:
    """
    Stream events from a specific session.
    
    Args:
        session_id: Session identifier
        
    Yields:
        Event objects with type and data
    """

🚦 Next Steps

  • Explore CrewAI Integration

  • Learn about Google ADK Integration

  • Check Advanced Configuration

  • See Production Deployment Guide