# OmniCoreAgent

AgentiPy provides seamless integration with the OmniAgent framework, allowing you to create AI agents that can interact with the Solana blockchain through natural language interfaces. This integration enables developers to build sophisticated AI-driven blockchain applications with conversational capabilities, real-time event streaming, and robust memory management.

## 🚀 Quick Start

```python
import asyncio
from mcpomni_connect.omni_agent import OmniAgent
from mcpomni_connect.memory_store.memory_router import MemoryRouter
from mcpomni_connect.events.event_router import EventRouter
from mcpomni_connect.agents.tools.local_tools_registry import ToolRegistry
from agentipy.agent import SolanaAgentKit
from agentipy.tools.get_balance import BalanceFetcher
from agentipy.tools.transfer import TokenTransferManager
from dotenv import load_dotenv
from typing import Optional
import os

load_dotenv()

# Initialize Solana agent
solana_agent = SolanaAgentKit(
    private_key=os.getenv("SOLANA_PRIVATE_KEY"),
    rpc_url="https://api.mainnet-beta.solana.com"
)

# Create tool registry
async def create_tool_registry() -> ToolRegistry:
    """Create a tool registry with Solana operations."""
    tool_registry = ToolRegistry()

    @tool_registry.register_tool("get_balance_solana")
    async def get_balance(address: Optional[str] = None) -> str:
        """Get the balance of a Solana address."""
        try:
            balance_sol = await BalanceFetcher.get_balance(
                solana_agent, 
                token_address=address
            )
            return {"status": "success", "data": balance_sol}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    @tool_registry.register_tool("transfer_solana")
    async def transfer_solana(address: str, amount: float) -> str:
        """Transfer SOL to a Solana address."""
        try:
            sig = await TokenTransferManager.transfer(
                solana_agent, 
                to=address, 
                amount=amount
            )
            return {
                "status": "success",
                "message": "SOL transferred successfully",
                "signature": sig
            }
        except Exception as e:
            return {"status": "error", "message": str(e)}
    
    return tool_registry

# Initialize memory and event systems
memory_store = MemoryRouter(memory_store_type="in_memory")
event_router = EventRouter(event_store_type="in_memory")

# Create the OmniAgent
async def create_agent():
    tool_registry = await create_tool_registry()
    
    agent = OmniAgent(
        name="solana_agent",
        system_instruction="""
You are a Solana agent with the ability to check balances and transfer SOL.

CRITICAL RULES:
- Always check balance before transfers using get_balance_solana
- Verify sufficient funds before calling transfer_solana
- Only use available tools in the registry
        """,
        model_config={
            "provider": "openai",
            "model": "gpt-4.1",
            "temperature": 0.7,
            "max_context_length": 50000,
        },
        local_tools=tool_registry,
        agent_config={
            "max_steps": 15,
            "tool_call_timeout": 60,
            "request_limit": 1000,
            "memory_config": {"mode": "token_budget", "value": 10000},
        },
        memory_store=memory_store,
        event_router=event_router,
        debug=True,
    )
    
    return agent

# Run the agent
async def main():
    agent = await create_agent()
    
    # Check balance
    response = await agent.run(
        "What's my SOL balance?",
        session_id="session_001"
    )
    print(response)
    
    # Transfer SOL
    response = await agent.run(
        "Transfer 0.5 SOL to <address>",
        session_id="session_001"
    )
    print(response)

asyncio.run(main())
```

## 🛠️ Key Components

{% stepper %}
{% step %}

### Tool Registry

The `ToolRegistry` is the foundation of your agent's capabilities. It manages all the tools (functions) that your AI agent can execute.

```python
tool_registry = ToolRegistry()

@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
    """Tool description that the AI will use to understand when to call this."""
    # Your implementation
    return {"status": "success", "data": result}
```

Key Features:

* Automatic function registration
* Type hints for parameter validation
* Docstrings used by AI for tool selection
* Async/await support
  {% endstep %}

{% step %}

### OmniAgent Configuration

The `OmniAgent` is the core component that orchestrates AI reasoning, tool execution, and memory management.

```python
agent = OmniAgent(
    name="agent_name",
    system_instruction="Detailed instructions for the AI...",
    model_config={
        "provider": "openai",  # or "anthropic", "google"
        "model": "gpt-4.1",
        "temperature": 0.7,
        "max_context_length": 50000,
    },
    local_tools=tool_registry,
    agent_config={
        "max_steps": 15,              # Maximum reasoning steps
        "tool_call_timeout": 60,       # Timeout per tool call
        "request_limit": 1000,         # Rate limiting
        "memory_config": {
            "mode": "token_budget",    # or "sliding_window"
            "value": 10000
        },
    },
    memory_store=memory_store,
    event_router=event_router,
    debug=True,
)
```

Configuration Options:

| Parameter            | Description            | Default  |
| -------------------- | ---------------------- | -------- |
| `name`               | Agent identifier       | Required |
| `system_instruction` | AI behavior guidelines | Required |
| `model_config`       | LLM configuration      | Required |
| `local_tools`        | Tool registry          | None     |
| `agent_config`       | Execution settings     | {}       |
| `memory_store`       | Conversation memory    | None     |
| `event_router`       | Event tracking         | None     |
| `debug`              | Enable debug logging   | False    |
| {% endstep %}        |                        |          |

{% step %}

### Memory Management

The memory system maintains conversation context across interactions:

```python
# In-memory storage (development)
memory_store = MemoryRouter(memory_store_type="in_memory")

# Persistent storage (production)
memory_store = MemoryRouter(
    memory_store_type="redis",
    redis_config={
        "host": "localhost",
        "port": 6379,
        "db": 0
    }
)
```

Memory Modes:

* `token_budget`: Maintains last N tokens of conversation
* `sliding_window`: Keeps last N messages
  {% endstep %}

{% step %}

### Event Streaming

Events provide real-time visibility into agent operations:

```python
event_router = EventRouter(event_store_type="in_memory")

# Stream events from a session
async for event in agent.stream_events(session_id):
    print(f"Event: {event.type}")
    print(f"Data: {event.data}")
```

Event Types:

* `agent.start` - Agent begins processing
* `tool.call` - Tool execution started
* `tool.result` - Tool execution completed
* `agent.response` - Agent generates response
* `agent.error` - Error occurred
  {% endstep %}
  {% endstepper %}

## 🌐 FastAPI Integration

Create a production-ready API service:

```python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Initialize agent on startup
    app.state.agent = await create_agent()
    yield
    # Cleanup on shutdown
    await app.state.agent.cleanup()

app = FastAPI(lifespan=lifespan)

@app.post("/chat")
async def chat(message: str, session_id: str = None):
    agent = app.state.agent
    response = await agent.run(message, session_id)
    return response

@app.get("/events/{session_id}")
async def get_events(session_id: str):
    agent = app.state.agent
    async def event_generator():
        try:
            async for event in agent.stream_events(session_id):
                yield f"event: {event.type}\ndata: {event.json()}\n\n"
        except Exception as e:
            yield f"error: {str(e)}\n\n"
    
    return StreamingResponse(
        event_generator(), 
        media_type="text/event-stream"
    )
```

## 📋 Available Tools

### Get Balance

Check the SOL balance of any Solana address:

```python
@tool_registry.register_tool("get_balance_solana")
async def get_balance(address: Optional[str] = None) -> dict:
    """
    Get the balance of a Solana address.
    If no address is provided, returns the agent's wallet balance.
    
    Args:
        address: Optional Solana address to check
        
    Returns:
        Dictionary with status and balance data
    """
    balance = await BalanceFetcher.get_balance(
        solana_agent, 
        token_address=address
    )
    return {"status": "success", "data": balance}
```

### Transfer SOL

Send SOL to any Solana address:

```python
@tool_registry.register_tool("transfer_solana")
async def transfer_solana(address: str, amount: float) -> dict:
    """
    Transfer SOL to a Solana address.
    
    Args:
        address: Destination Solana address
        amount: Amount of SOL to transfer
        
    Returns:
        Dictionary with status and transaction signature
    """
    signature = await TokenTransferManager.transfer(
        solana_agent,
        to=address,
        amount=amount
    )
    return {
        "status": "success",
        "signature": signature
    }
```

## 🔒 Best Practices

{% stepper %}
{% step %}

### Balance Verification

Always check balance before transfers:

```python
system_instruction = """
CRITICAL: Before ANY SOL transfer:
1. Call get_balance_solana to check current balance
2. Compare balance with requested transfer amount
3. Only proceed if balance >= transfer amount
4. Inform user if insufficient funds
"""
```

{% endstep %}

{% step %}

### Error Handling

Implement robust error handling in tools:

```python
@tool_registry.register_tool("tool_name")
async def tool_function(param: str) -> dict:
    try:
        result = await perform_operation(param)
        return {"status": "success", "data": result}
    except ValueError as e:
        return {"status": "error", "message": f"Invalid input: {str(e)}"}
    except Exception as e:
        return {"status": "error", "message": f"Operation failed: {str(e)}"}
```

{% endstep %}

{% step %}

### Environment Variables

Store sensitive data securely:

```bash
# .env file
SOLANA_PRIVATE_KEY=your_private_key_here
OPENAI_API_KEY=your_openai_key_here
```

```python
from dotenv import load_dotenv
import os

load_dotenv()

private_key = os.getenv("SOLANA_PRIVATE_KEY")
```

{% endstep %}

{% step %}

### Session Management

Use session IDs for context preservation:

```python
# Same session maintains conversation context
response1 = await agent.run("Check my balance", session_id="user_123")
response2 = await agent.run("Transfer 0.5 SOL to <address>", session_id="user_123")
```

{% endstep %}
{% endstepper %}

## 🎯 Use Cases

### Conversational Wallet

```python
# Natural language interactions
"What's my current SOL balance?"
"Send 0.5 SOL to ABC123..."
"How much SOL do I have left after that transfer?"
```

### Automated Trading Bot

```python
# Conditional transfers based on balance
"If my balance is above 10 SOL, transfer 5 SOL to the treasury address"
```

### Multi-User Support

```python
# Different sessions for different users
await agent.run(message, session_id=f"user_{user_id}")
```

## 🐛 Debugging

Enable debug mode for detailed logging:

```python
agent = OmniAgent(
    # ... other config
    debug=True
)
```

This will log:

* Tool selection reasoning
* Parameter extraction
* Execution results
* Error traces

## 📚 API Reference

### `OmniAgent.run()`

Execute a user message and return response.

```python
async def run(
    self,
    message: str,
    session_id: Optional[str] = None
) -> dict:
    """
    Process a user message and return the agent's response.
    
    Args:
        message: User input message
        session_id: Optional session identifier for context
        
    Returns:
        Dictionary containing agent response and metadata
    """
```

### `OmniAgent.stream_events()`

Stream real-time events from a session.

```python
async def stream_events(
    self,
    session_id: str
) -> AsyncIterator[Event]:
    """
    Stream events from a specific session.
    
    Args:
        session_id: Session identifier
        
    Yields:
        Event objects with type and data
    """
```

## 🚦 Next Steps

* Explore [CrewAI Integration](broken://pages/a728d77019ee02f6ab975ea5b840d2d12e5d5019)
* Learn about [Google ADK Integration](broken://pages/02dd4c372005e545ce54da58bc2c13fb6bb2f0de)
* Check [Advanced Configuration](broken://pages/15d0c72838bbbb3b08782d6dfcd9699eabf207c2)
* See [Production Deployment Guide](broken://pages/d8c7d4cd459a24b476a9b2e91ff17649889b7a47)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.agentipy.fun/integrations/omnicoreagent.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
