OmniCoreAgent
AgentiPy provides seamless integration with the OmniAgent framework, allowing you to create AI agents that can interact with the Solana blockchain through natural language interfaces. This integration enables developers to build sophisticated AI-driven blockchain applications with conversational capabilities, real-time event streaming, and robust memory management.
🚀 Quick Start
import asyncio
from mcpomni_connect.omni_agent import OmniAgent
from mcpomni_connect.memory_store.memory_router import MemoryRouter
from mcpomni_connect.events.event_router import EventRouter
from mcpomni_connect.agents.tools.local_tools_registry import ToolRegistry
from agentipy.agent import SolanaAgentKit
from agentipy.tools.get_balance import BalanceFetcher
from agentipy.tools.transfer import TokenTransferManager
from dotenv import load_dotenv
from typing import Optional
import os
load_dotenv()
# Initialize Solana agent
solana_agent = SolanaAgentKit(
private_key=os.getenv("SOLANA_PRIVATE_KEY"),
rpc_url="https://api.mainnet-beta.solana.com"
)
# Create tool registry
async def create_tool_registry() -> ToolRegistry:
"""Create a tool registry with Solana operations."""
tool_registry = ToolRegistry()
@tool_registry.register_tool("get_balance_solana")
async def get_balance(address: Optional[str] = None) -> str:
"""Get the balance of a Solana address."""
try:
balance_sol = await BalanceFetcher.get_balance(
solana_agent,
token_address=address
)
return {"status": "success", "data": balance_sol}
except Exception as e:
return {"status": "error", "message": str(e)}
@tool_registry.register_tool("transfer_solana")
async def transfer_solana(address: str, amount: float) -> str:
"""Transfer SOL to a Solana address."""
try:
sig = await TokenTransferManager.transfer(
solana_agent,
to=address,
amount=amount
)
return {
"status": "success",
"message": "SOL transferred successfully",
"signature": sig
}
except Exception as e:
return {"status": "error", "message": str(e)}
return tool_registry
# Initialize memory and event systems
memory_store = MemoryRouter(memory_store_type="in_memory")
event_router = EventRouter(event_store_type="in_memory")
# Create the OmniAgent
async def create_agent():
tool_registry = await create_tool_registry()
agent = OmniAgent(
name="solana_agent",
system_instruction="""
You are a Solana agent with the ability to check balances and transfer SOL.
CRITICAL RULES:
- Always check balance before transfers using get_balance_solana
- Verify sufficient funds before calling transfer_solana
- Only use available tools in the registry
""",
model_config={
"provider": "openai",
"model": "gpt-4.1",
"temperature": 0.7,
"max_context_length": 50000,
},
local_tools=tool_registry,
agent_config={
"max_steps": 15,
"tool_call_timeout": 60,
"request_limit": 1000,
"memory_config": {"mode": "token_budget", "value": 10000},
},
memory_store=memory_store,
event_router=event_router,
debug=True,
)
return agent
# Run the agent
async def main():
agent = await create_agent()
# Check balance
response = await agent.run(
"What's my SOL balance?",
session_id="session_001"
)
print(response)
# Transfer SOL
response = await agent.run(
"Transfer 0.5 SOL to <address>",
session_id="session_001"
)
print(response)
asyncio.run(main())🛠️ Key Components
Tool Registry
The ToolRegistry is the foundation of your agent's capabilities. It manages all the tools (functions) that your AI agent can execute.
tool_registry = ToolRegistry()
@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
"""Tool description that the AI will use to understand when to call this."""
# Your implementation
return {"status": "success", "data": result}Key Features:
Automatic function registration
Type hints for parameter validation
Docstrings used by AI for tool selection
Async/await support
OmniAgent Configuration
The OmniAgent is the core component that orchestrates AI reasoning, tool execution, and memory management.
agent = OmniAgent(
name="agent_name",
system_instruction="Detailed instructions for the AI...",
model_config={
"provider": "openai", # or "anthropic", "google"
"model": "gpt-4.1",
"temperature": 0.7,
"max_context_length": 50000,
},
local_tools=tool_registry,
agent_config={
"max_steps": 15, # Maximum reasoning steps
"tool_call_timeout": 60, # Timeout per tool call
"request_limit": 1000, # Rate limiting
"memory_config": {
"mode": "token_budget", # or "sliding_window"
"value": 10000
},
},
memory_store=memory_store,
event_router=event_router,
debug=True,
)Configuration Options:
name
Agent identifier
Required
system_instruction
AI behavior guidelines
Required
model_config
LLM configuration
Required
local_tools
Tool registry
None
agent_config
Execution settings
{}
memory_store
Conversation memory
None
event_router
Event tracking
None
debug
Enable debug logging
False
Memory Management
The memory system maintains conversation context across interactions:
# In-memory storage (development)
memory_store = MemoryRouter(memory_store_type="in_memory")
# Persistent storage (production)
memory_store = MemoryRouter(
memory_store_type="redis",
redis_config={
"host": "localhost",
"port": 6379,
"db": 0
}
)Memory Modes:
token_budget: Maintains last N tokens of conversationsliding_window: Keeps last N messages
Event Streaming
Events provide real-time visibility into agent operations:
event_router = EventRouter(event_store_type="in_memory")
# Stream events from a session
async for event in agent.stream_events(session_id):
print(f"Event: {event.type}")
print(f"Data: {event.data}")Event Types:
agent.start- Agent begins processingtool.call- Tool execution startedtool.result- Tool execution completedagent.response- Agent generates responseagent.error- Error occurred
🌐 FastAPI Integration
Create a production-ready API service:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize agent on startup
app.state.agent = await create_agent()
yield
# Cleanup on shutdown
await app.state.agent.cleanup()
app = FastAPI(lifespan=lifespan)
@app.post("/chat")
async def chat(message: str, session_id: str = None):
agent = app.state.agent
response = await agent.run(message, session_id)
return response
@app.get("/events/{session_id}")
async def get_events(session_id: str):
agent = app.state.agent
async def event_generator():
try:
async for event in agent.stream_events(session_id):
yield f"event: {event.type}\ndata: {event.json()}\n\n"
except Exception as e:
yield f"error: {str(e)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)📋 Available Tools
Get Balance
Check the SOL balance of any Solana address:
@tool_registry.register_tool("get_balance_solana")
async def get_balance(address: Optional[str] = None) -> dict:
"""
Get the balance of a Solana address.
If no address is provided, returns the agent's wallet balance.
Args:
address: Optional Solana address to check
Returns:
Dictionary with status and balance data
"""
balance = await BalanceFetcher.get_balance(
solana_agent,
token_address=address
)
return {"status": "success", "data": balance}Transfer SOL
Send SOL to any Solana address:
@tool_registry.register_tool("transfer_solana")
async def transfer_solana(address: str, amount: float) -> dict:
"""
Transfer SOL to a Solana address.
Args:
address: Destination Solana address
amount: Amount of SOL to transfer
Returns:
Dictionary with status and transaction signature
"""
signature = await TokenTransferManager.transfer(
solana_agent,
to=address,
amount=amount
)
return {
"status": "success",
"signature": signature
}🔒 Best Practices
Balance Verification
Always check balance before transfers:
system_instruction = """
CRITICAL: Before ANY SOL transfer:
1. Call get_balance_solana to check current balance
2. Compare balance with requested transfer amount
3. Only proceed if balance >= transfer amount
4. Inform user if insufficient funds
"""Error Handling
Implement robust error handling in tools:
@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
try:
result = await perform_operation(param)
return {"status": "success", "data": result}
except ValueError as e:
return {"status": "error", "message": f"Invalid input: {str(e)}"}
except Exception as e:
return {"status": "error", "message": f"Operation failed: {str(e)}"}🎯 Use Cases
Conversational Wallet
# Natural language interactions
"What's my current SOL balance?"
"Send 0.5 SOL to ABC123..."
"How much SOL do I have left after that transfer?"Automated Trading Bot
# Conditional transfers based on balance
"If my balance is above 10 SOL, transfer 5 SOL to the treasury address"Multi-User Support
# Different sessions for different users
await agent.run(message, session_id=f"user_{user_id}")🐛 Debugging
Enable debug mode for detailed logging:
agent = OmniAgent(
# ... other config
debug=True
)This will log:
Tool selection reasoning
Parameter extraction
Execution results
Error traces
📚 API Reference
OmniAgent.run()
OmniAgent.run()Execute a user message and return response.
async def run(
self,
message: str,
session_id: Optional[str] = None
) -> dict:
"""
Process a user message and return the agent's response.
Args:
message: User input message
session_id: Optional session identifier for context
Returns:
Dictionary containing agent response and metadata
"""OmniAgent.stream_events()
OmniAgent.stream_events()Stream real-time events from a session.
async def stream_events(
self,
session_id: str
) -> AsyncIterator[Event]:
"""
Stream events from a specific session.
Args:
session_id: Session identifier
Yields:
Event objects with type and data
"""🚦 Next Steps
Explore CrewAI Integration
Learn about Google ADK Integration
Check Advanced Configuration
See Production Deployment Guide
