Crypto Market Dashboard Example
This example demonstrates how to use the agentipy
framework to build a crypto market dashboard that:
Retrieves market prices from Pyth feeds.
Gets AI-based price predictions via Allora.
Formats and sends styled embed messages to Discord using a webhook.
Features
Price Updates: Fetches live prices and confidence intervals for assets such as SOL/USD, BTC/USD, ETH/USD, and TRUMP/USD.
AI Forecast: Retrieves short-term (5min) and longer-term (8hr) predictions for BTC and ETH.
Discord Integration: Sends the updates as a rich embed message to a Discord channel.
Robust Logging & Error Handling: Provides system notices and handles partial outages gracefully.
Code (main.py
)
main.py
)import asyncio
import os
import logging
import datetime
from typing import Dict, List, Any
import aiohttp
from agentipy.tools.use_pyth import PythManager
from agentipy.agent import SolanaAgentKit
from agentipy.tools.use_allora import (
AlloraManager,
PriceInferenceToken,
PriceInferenceTimeframe,
ChainSlug
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# Configuration
SLEEP_INTERVAL = int(os.getenv("SLEEP_INTERVAL", 300))
ALLORA_ENV = os.getenv("ALLORA_ENV", "MAINNET")
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL")
CHAIN = ChainSlug.MAINNET if ALLORA_ENV.upper() == "MAINNET" else ChainSlug.TESTNET
PYTH_FEEDS: Dict[str, str] = {
"SOL/USD": "H6ARHf6YXhGYeQfUzQNGk6rDNnLBQKrenN712K4AQJEG",
"BTC/USD": "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
"ETH/USD": "JBu1AL4obBcCMqKBBxhpWCNUt136ijcuMZLFvTP7iWdB",
"TRUMP/USD": "A8G6XyA6fSrsavG63ssAGU3Hnt2oDZARxefREzAY5axH",
}
class MarketMonitor:
def __init__(self):
self.agent = SolanaAgentKit(
allora_api_key=os.getenv('ALLORA_API_KEY', 'UP-Allora_API_Key')
)
self.session = aiohttp.ClientSession()
self.last_update = datetime.datetime.utcnow()
async def close(self):
await self.session.close()
def _format_prediction(self, prediction_data: Dict[str, Any]) -> str:
"""Format prediction data with confidence interval"""
try:
price = prediction_data.get('price_prediction', 0)
confidence = prediction_data.get('confidence_interval', [0.0])[0]
return f"${float(price):.2f} (±${float(confidence):.2f})"
except Exception as e:
logging.error(f"Error formatting prediction: {str(e)}")
return "N/A"
async def create_embed(self, message_data: dict) -> dict:
"""Create a rich Discord embed with modern UI elements"""
status_color = 0x00FF88 if message_data.get('status') == "success" else 0xFF3300
timestamp = datetime.datetime.utcnow().isoformat()
embed = {
"title": "🌐 Crypto Market Dashboard",
"color": status_color,
"thumbnail": {"url": os.getenv("EMBED_THUMBNAIL", "")},
"footer": {
"text": "Market Intelligence v2.4",
"icon_url": os.getenv("FOOTER_ICON", "")
},
"timestamp": timestamp,
"fields": []
}
# Price Section
price_fields = []
for symbol, data in message_data.get('prices', {}).items():
price_fields.append({
"name": f"🔸 {symbol}",
"value": (
f"```diff\n"
f"+ Price: ${data['price']:.4f}\n"
f"± Confidence: ${data['confidence']:.4f}\n"
f"🕒 Updated: {data['timestamp']}\n"
f"```"
),
"inline": True
})
# Add prices in grid layout
for i in range(0, len(price_fields), 2):
embed["fields"].extend(price_fields[i:i+2])
if i+2 < len(price_fields):
embed["fields"].append({"name": "\u200b", "value": "\u200b", "inline": False})
# Prediction Section
if message_data.get('predictions'):
prediction_groups = {}
for pred in message_data['predictions']:
key = pred['token']
if key not in prediction_groups:
prediction_groups[key] = []
prediction_groups[key].append(pred)
for token, preds in prediction_groups.items():
pred_lines = [f"**{token} Predictions**"]
for p in preds:
formatted = self._format_prediction(p['data'])
pred_lines.append(f"▫️ {p['timeframe']}: {formatted}")
embed["fields"].append({
"name": "🔮 AI Forecast",
"value": "\n".join(pred_lines),
"inline": False
})
# Warnings Section
if message_data.get('warnings'):
embed["fields"].append({
"name": "⚠️ System Notices",
"value": "\n".join([f"▫️ {warn}" for warn in message_data['warnings']]),
"inline": False
})
return embed
async def send_discord_update(self, message_data: dict):
"""Send styled embed message to Discord"""
if not DISCORD_WEBHOOK_URL:
logging.warning("Discord webhook not configured")
return
try:
embed = await self.create_embed(message_data)
payload = {
"embeds": [embed],
"username": "Crypto Sentinel",
"avatar_url": os.getenv("BOT_AVATAR", "")
}
async with self.session.post(DISCORD_WEBHOOK_URL, json=payload) as resp:
if resp.status not in (200, 204):
error = await resp.text()
logging.error(f"Discord API error: {resp.status} - {error}")
else:
logging.info("Sent market update successfully")
except Exception as e:
logging.error(f"Notification error: {str(e)}")
async def fetch_market_data(self) -> dict:
"""Aggregate data from all sources"""
data = {
"status": "success",
"prices": {},
"predictions": [],
"warnings": []
}
# Fetch Pyth prices
for symbol, address in PYTH_FEEDS.items():
try:
result = await PythManager.get_price(address)
if result["status"] == "TRADING":
data["prices"][symbol] = {
"price": result["price"],
"confidence": result["confidence_interval"],
"timestamp": datetime.datetime.utcnow().strftime("%H:%M:%S UTC")
}
else:
data["warnings"].append(f"{symbol} in {result['status']} status")
except Exception as e:
data["warnings"].append(f"{symbol} data unavailable")
logging.error(f"Price error ({symbol}): {str(e)}")
# Fetch Allora predictions
try:
allora = AlloraManager(agent=self.agent, chain=CHAIN)
timeframes = [
(PriceInferenceTimeframe.FIVE_MIN, "5min"),
(PriceInferenceTimeframe.EIGHT_HOURS, "8hr")
]
predictions = []
for token in [PriceInferenceToken.BTC, PriceInferenceToken.ETH]:
for timeframe, label in timeframes:
try:
prediction = await allora.get_price_prediction(token, timeframe)
predictions.append({
"token": token.name,
"timeframe": label,
"data": prediction
})
except Exception as e:
data["warnings"].append(f"Prediction failed for {token.name} {label}")
logging.error(f"Prediction error ({token.name} {label}): {str(e)}")
data["predictions"] = predictions
except Exception as e:
data["warnings"].append("Prediction service unavailable")
logging.error(f"Allora connection error: {str(e)}")
# Update system status
if len(data["warnings"]) > 2:
data["status"] = "partial_outage"
elif len(data["warnings"]) > 0:
data["status"] = "warning"
return data
async def monitor_loop(self):
"""Main monitoring loop"""
while True:
try:
market_data = await self.fetch_market_data()
await self.send_discord_update(market_data)
logging.info(f"Sleeping for {SLEEP_INTERVAL}s...")
await asyncio.sleep(SLEEP_INTERVAL)
except Exception as e:
logging.error(f"Critical error: {str(e)}")
emergency_data = {
"status": "error",
"warnings": ["System malfunction - attempting recovery"]
}
await self.send_discord_update(emergency_data)
await asyncio.sleep(60)
async def main():
monitor = MarketMonitor()
try:
await monitor.monitor_loop()
except KeyboardInterrupt:
await monitor.close()
logging.info("Graceful shutdown complete")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Monitoring stopped by user")
Source Files
You can find the source files for this example (including readme.md
with setup and usage instructions) on GitHub: https://github.com/niceberginc/agentipy/blob/main/examples/CryptoMarketDashboard/