Examples
Real-world examples and patterns for building intelligent agent systems with ArtCafe.ai
Data Processing Pipeline
Build a scalable data processing pipeline with multiple specialized agents.
PythonIntermediateWorker Pattern
View example →Customer Support Bot
Multi-agent system for intelligent customer support with escalation.
PythonAdvancedOrchestrator
View example →Real-time Analytics
Stream processing and real-time analytics with agent collaboration.
PythonIntermediateStream Processing
View example →Swarm Intelligence
Collaborative problem-solving with multiple agents sharing knowledge.
PythonExpertSwarm Pattern
View example →Data Processing Pipeline
Overview
This example demonstrates a complete data processing pipeline with agents for ingestion, validation, transformation, and storage. The system automatically scales based on load.
1. Ingestion Agent
from artcafe_agent import BaseAgent
import json
class IngestionAgent(BaseAgent):
def __init__(self):
super().__init__(
agent_id="ingestion-agent",
name="Data Ingestion Agent",
capabilities=["data_ingestion", "validation"]
)
async def setup(self):
await self.subscribe("data/incoming")
self.metrics.create_counter("records_ingested")
async def process_message(self, message):
data = message.data
# Basic validation
if not self.validate_schema(data):
await self.publish("data/errors", {
"error": "Invalid schema",
"data": data
})
return {"status": "rejected"}
# Add metadata
enriched_data = {
**data,
"ingested_at": datetime.now().isoformat(),
"ingestion_agent": self.agent_id
}
# Send to processing queue
await self.publish("data/processing", enriched_data)
self.metrics.increment("records_ingested")
return {"status": "accepted"}
2. Transformation Agent
class TransformationAgent(BaseAgent):
def __init__(self):
super().__init__(
agent_id="transform-agent",
name="Data Transformation Agent",
capabilities=["data_transformation", "enrichment"]
)
self.knowledge_graph = None
async def setup(self):
# Multiple workers compete for tasks
await self.subscribe("data/processing", competing=True)
self.knowledge_graph = await self.connect_knowledge_graph()
async def process_message(self, message):
data = message.data
# Apply transformations
transformed = await self.transform_data(data)
# Enrich with knowledge graph
context = await self.knowledge_graph.query(
f"MATCH (e:Entity) WHERE e.id = '{data['entity_id']}' RETURN e"
)
enriched = {
**transformed,
"context": context,
"ml_features": await self.extract_features(transformed)
}
# Send to storage
await self.publish("data/storage", enriched)
return {"status": "transformed", "record_id": data.get('id')}
3. Orchestrator
class PipelineOrchestrator(BaseAgent):
def __init__(self):
super().__init__(
agent_id="pipeline-orchestrator",
name="Pipeline Orchestrator",
capabilities=["orchestration", "monitoring"]
)
async def setup(self):
await self.subscribe("pipeline/commands")
await self.subscribe("data/errors")
# Monitor agent health
self.schedule_task(self.monitor_pipeline, interval=30)
async def monitor_pipeline(self):
# Check agent statuses
agents = ["ingestion-agent", "transform-agent", "storage-agent"]
for agent_id in agents:
status = await self.check_agent_health(agent_id)
if status != "healthy":
await self.handle_unhealthy_agent(agent_id)
# Check processing metrics
metrics = await self.get_pipeline_metrics()
if metrics['queue_size'] > 1000:
# Scale up workers
await self.scale_workers('transform-agent', count=3)
async def handle_batch_job(self, job):
# Coordinate batch processing
batch_id = str(uuid.uuid4())
for record in job['records']:
await self.publish("data/incoming", {
**record,
"batch_id": batch_id
})
# Track batch progress
await self.track_batch(batch_id, len(job['records']))
Customer Support Bot System
Architecture
Multi-agent system with specialized agents for different support tasks, automatic escalation, and knowledge base integration.
# Main support agent that handles initial contact
class SupportAgent(BaseAgent):
def __init__(self):
super().__init__(
agent_id="support-main",
name="Customer Support Agent",
capabilities=["nlp", "classification", "routing"]
)
self.classifier = None
async def setup(self):
await self.subscribe("support/incoming")
self.classifier = await self.load_model("support_classifier")
async def process_message(self, message):
inquiry = message.data
# Classify inquiry type
category = await self.classifier.predict(inquiry['text'])
# Route to specialized agent
if category == "technical":
await self.publish("support/technical", inquiry)
elif category == "billing":
await self.publish("support/billing", inquiry)
elif category == "general":
await self.handle_general_inquiry(inquiry)
else:
# Escalate to human
await self.escalate_to_human(inquiry)
# Technical support specialist
class TechnicalSupportAgent(BaseAgent):
async def setup(self):
await self.subscribe("support/technical")
self.kb = await self.connect_knowledge_base()
async def process_message(self, message):
inquiry = message.data
# Search knowledge base
solutions = await self.kb.search(inquiry['text'], limit=5)
if solutions and solutions[0].confidence > 0.8:
response = await self.generate_response(solutions[0])
await self.send_response(inquiry['customer_id'], response)
else:
# Need human help
await self.escalate_with_context(inquiry, solutions)
Want to see more examples?
Check out our GitHub repository for complete, runnable examples.
View on GitHub →