Agent Framework
Build sophisticated AI agents with our powerful framework designed for scale and flexibility.
Agent Architecture
BaseAgent Class
All agents inherit from the BaseAgent class, which provides core functionality:
from artcafe_agent import BaseAgent
class MyAgent(BaseAgent):
def __init__(self):
super().__init__(
agent_id="my-agent",
name="My Agent",
capabilities=["text_analysis", "data_processing"]
)
async def setup(self):
"""Initialize agent resources"""
await self.subscribe("tasks/analysis")
await self.subscribe("commands/process")
async def process_message(self, message):
"""Handle incoming messages"""
if message.topic == "tasks/analysis":
return await self.analyze_text(message.data)
elif message.topic == "commands/process":
return await self.process_data(message.data)
async def teardown(self):
"""Cleanup resources"""
await self.unsubscribe_all()
Agent Patterns
1. Worker Pattern
Process tasks from a queue with automatic load balancing:
class WorkerAgent(BaseAgent):
async def setup(self):
# Workers compete for tasks
await self.subscribe("tasks/queue", competing=True)
async def process_message(self, message):
task = message.data
result = await self.process_task(task)
# Publish result
await self.publish(f"tasks/results/{task['id']}", result)
return {"status": "completed"}
2. Orchestrator Pattern
Coordinate multiple agents to achieve complex goals:
class OrchestratorAgent(BaseAgent):
async def orchestrate_workflow(self, workflow):
steps = workflow['steps']
context = {}
for step in steps:
# Assign task to specialized agent
await self.publish(f"tasks/{step['type']}", {
"step": step,
"context": context
})
# Wait for result
result = await self.wait_for_message(
f"tasks/results/{step['id']}",
timeout=30
)
context[step['name']] = result.data
return context
3. Monitor Pattern
Monitor system health and react to anomalies:
class MonitorAgent(BaseAgent):
async def setup(self):
# Subscribe to all agent status updates
await self.subscribe("agents/status/+")
# Set up periodic health checks
self.schedule_task(self.check_system_health, interval=60)
async def process_message(self, message):
if "error" in message.data:
await self.handle_error(message)
elif message.data.get("status") == "offline":
await self.handle_agent_offline(message)
Advanced Features
Shared Memory
Access shared knowledge graph for context:
# Store knowledge
await self.memory.set("user_123", {
"preferences": ["tech", "science"],
"history": []
})
# Retrieve knowledge
user_data = await self.memory.get("user_123")
State Management
Persistent state across restarts:
# Save state
await self.save_state({
"processed_count": 1000,
"last_run": datetime.now()
})
# Load state on startup
state = await self.load_state()
Error Handling
Built-in retry and circuit breaker:
@retry(max_attempts=3, backoff=2)
@circuit_breaker(failure_threshold=5)
async def process_with_retry(self, data):
return await external_api.process(data)
Metrics & Monitoring
Automatic performance tracking:
# Track custom metrics
self.metrics.increment("tasks_processed")
self.metrics.gauge("queue_size", len(queue))
# Metrics auto-published to dashboard
Best Practices
- ✓Idempotent Operations: Design message handlers to be idempotent
- ✓Graceful Shutdown: Always implement teardown() for cleanup
- ✓Error Boundaries: Catch and handle errors to prevent agent crashes
- ✓Meaningful Logs: Use structured logging for debugging
- ✓Resource Limits: Set appropriate timeouts and memory limits
Ready to build?
Check out our example agents and start building your own intelligent systems.
View Examples →