# Crypto Market Dashboard Example

This example demonstrates how to use the `agentipy` framework to build a crypto market dashboard that:

* Retrieves market prices from Pyth feeds.
* Gets AI-based price predictions via Allora.
* Formats and sends styled embed messages to Discord using a webhook.

## Features

* **Price Updates:** Fetches live prices and confidence intervals for assets such as SOL/USD, BTC/USD, ETH/USD, and TRUMP/USD.
* **AI Forecast:** Retrieves short-term (5min) and longer-term (8hr) predictions for BTC and ETH.
* **Discord Integration:** Sends the updates as a rich embed message to a Discord channel.
* **Robust Logging & Error Handling:** Provides system notices and handles partial outages gracefully.

## Code (`main.py`)

````python
import asyncio
import os
import logging
import datetime
from typing import Dict, List, Any

import aiohttp

from agentipy.tools.use_pyth import PythManager
from agentipy.agent import SolanaAgentKit
from agentipy.tools.use_allora import (
    AlloraManager,
    PriceInferenceToken,
    PriceInferenceTimeframe,
    ChainSlug
)

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)

# Configuration
SLEEP_INTERVAL = int(os.getenv("SLEEP_INTERVAL", 300))
ALLORA_ENV = os.getenv("ALLORA_ENV", "MAINNET")
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
CHAIN = ChainSlug.MAINNET if ALLORA_ENV.upper() == "MAINNET" else ChainSlug.TESTNET

PYTH_FEEDS: Dict[str, str] = {
    "SOL/USD": "H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG",
    "BTC/USD": "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
    "ETH/USD": "JBu1AL4obBcCMqKBBxhpWCNUt136ijcuMZLFvTP7iWdB",
    "TRUMP/USD": "A8G6XyA6fSrsavG63ssAGU3Hnt2oDZARxefREzAY5axH",
}

class MarketMonitor:
    def __init__(self):
        self.agent = SolanaAgentKit(
            allora_api_key=os.getenv('ALLORA_API_KEY', 'UP-Allora_API_Key')
        )
        self.session = aiohttp.ClientSession()
        self.last_update = datetime.datetime.utcnow()

    async def close(self):
        await self.session.close()

    def _format_prediction(self, prediction_data: Dict[str, Any]) -> str:
        """Format prediction data with confidence interval"""
        try:
            price = prediction_data.get('price_prediction', 0)
            confidence = prediction_data.get('confidence_interval', [0.0])[0]
            return f"${float(price):.2f} (±${float(confidence):.2f})"
        except Exception as e:
            logging.error(f"Error formatting prediction: {str(e)}")
            return "N/A"

    async def create_embed(self, message_data: dict) -> dict:
        """Create a rich Discord embed with modern UI elements"""
        status_color = 0x00FF88 if message_data.get('status') == "success" else 0xFF3300
        timestamp = datetime.datetime.utcnow().isoformat()

        embed = {
            "title": "🌐 Crypto Market Dashboard",
            "color": status_color,
            "thumbnail": {"url": os.getenv("EMBED_THUMBNAIL", "")},
            "footer": {
                "text": "Market Intelligence v2.4",
                "icon_url": os.getenv("FOOTER_ICON", "")
            },
            "timestamp": timestamp,
            "fields": []
        }

        # Price Section
        price_fields = []
        for symbol, data in message_data.get('prices', {}).items():
            price_fields.append({
                "name": f"🔸 {symbol}",
                "value": (
                    f"```diff\n"
                    f"+ Price: ${data['price']:.4f}\n"
                    f"± Confidence: ${data['confidence']:.4f}\n"
                    f"🕒 Updated: {data['timestamp']}\n"
                    f"```"
                ),
                "inline": True
            })

        # Add prices in grid layout
        for i in range(0, len(price_fields), 2):
            embed["fields"].extend(price_fields[i:i+2])
            if i+2 < len(price_fields):
                embed["fields"].append({"name": "\u200b", "value": "\u200b", "inline": False})

        # Prediction Section
        if message_data.get('predictions'):
            prediction_groups = {}
            for pred in message_data['predictions']:
                key = pred['token']
                if key not in prediction_groups:
                    prediction_groups[key] = []
                prediction_groups[key].append(pred)

            for token, preds in prediction_groups.items():
                pred_lines = [f"**{token} Predictions**"]
                for p in preds:
                    formatted = self._format_prediction(p['data'])
                    pred_lines.append(f"▫️ {p['timeframe']}: {formatted}")

                embed["fields"].append({
                    "name": "🔮 AI Forecast",
                    "value": "\n".join(pred_lines),
                    "inline": False
                })

        # Warnings Section
        if message_data.get('warnings'):
            embed["fields"].append({
                "name": "⚠️ System Notices",
                "value": "\n".join([f"▫️ {warn}" for warn in message_data['warnings']]),
                "inline": False
            })

        return embed

    async def send_discord_update(self, message_data: dict):
        """Send styled embed message to Discord"""
        if not DISCORD_WEBHOOK_URL:
            logging.warning("Discord webhook not configured")
            return

        try:
            embed = await self.create_embed(message_data)
            payload = {
                "embeds": [embed],
                "username": "Crypto Sentinel",
                "avatar_url": os.getenv("BOT_AVATAR", "")
            }

            async with self.session.post(DISCORD_WEBHOOK_URL, json=payload) as resp:
                if resp.status not in (200, 204):
                    error = await resp.text()
                    logging.error(f"Discord API error: {resp.status} - {error}")
                else:
                    logging.info("Sent market update successfully")

        except Exception as e:
            logging.error(f"Notification error: {str(e)}")

    async def fetch_market_data(self) -> dict:
        """Aggregate data from all sources"""
        data = {
            "status": "success",
            "prices": {},
            "predictions": [],
            "warnings": []
        }

        # Fetch Pyth prices
        for symbol, address in PYTH_FEEDS.items():
            try:
                result = await PythManager.get_price(address)
                if result["status"] == "TRADING":
                    data["prices"][symbol] = {
                        "price": result["price"],
                        "confidence": result["confidence_interval"],
                        "timestamp": datetime.datetime.utcnow().strftime("%H:%M:%S UTC")
                    }
                else:
                    data["warnings"].append(f"{symbol} in {result['status']} status")
            except Exception as e:
                data["warnings"].append(f"{symbol} data unavailable")
                logging.error(f"Price error ({symbol}): {str(e)}")

        # Fetch Allora predictions
        try:
            allora = AlloraManager(agent=self.agent, chain=CHAIN)
            timeframes = [
                (PriceInferenceTimeframe.FIVE_MIN, "5min"),
                (PriceInferenceTimeframe.EIGHT_HOURS, "8hr")
            ]

            predictions = []
            for token in [PriceInferenceToken.BTC, PriceInferenceToken.ETH]:
                for timeframe, label in timeframes:
                    try:
                        prediction = await allora.get_price_prediction(token, timeframe)
                        predictions.append({
                            "token": token.name,
                            "timeframe": label,
                            "data": prediction
                        })
                    except Exception as e:
                        data["warnings"].append(f"Prediction failed for {token.name} {label}")
                        logging.error(f"Prediction error ({token.name} {label}): {str(e)}")

            data["predictions"] = predictions

        except Exception as e:
            data["warnings"].append("Prediction service unavailable")
            logging.error(f"Allora connection error: {str(e)}")

        # Update system status
        if len(data["warnings"]) > 2:
            data["status"] = "partial_outage"
        elif len(data["warnings"]) > 0:
            data["status"] = "warning"

        return data

    async def monitor_loop(self):
        """Main monitoring loop"""
        while True:
            try:
                market_data = await self.fetch_market_data()
                await self.send_discord_update(market_data)
                logging.info(f"Sleeping for {SLEEP_INTERVAL}s...")
                await asyncio.sleep(SLEEP_INTERVAL)

            except Exception as e:
                logging.error(f"Critical error: {str(e)}")
                emergency_data = {
                    "status": "error",
                    "warnings": ["System malfunction - attempting recovery"]
                }
                await self.send_discord_update(emergency_data)
                await asyncio.sleep(60)

async def main():
    monitor = MarketMonitor()
    try:
        await monitor.monitor_loop()
    except KeyboardInterrupt:
        await monitor.close()
        logging.info("Graceful shutdown complete")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Monitoring stopped by user")
````

## Source Files

You can find the source files for this example (including `readme.md` with setup and usage instructions) on GitHub: <https://github.com/niceberginc/agentipy/blob/main/examples/CryptoMarketDashboard/>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.agentipy.fun/examples/crypto-market-dashboard-example.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
