Examples

Real-world examples and patterns for building intelligent agent systems with ArtCafe.ai

Data Processing Pipeline

Build a scalable data processing pipeline with multiple specialized agents.

PythonIntermediateWorker Pattern
View example →

Customer Support Bot

Multi-agent system for intelligent customer support with escalation.

PythonAdvancedOrchestrator
View example →

Real-time Analytics

Stream processing and real-time analytics with agent collaboration.

PythonIntermediateStream Processing
View example →

Swarm Intelligence

Collaborative problem-solving with multiple agents sharing knowledge.

PythonExpertSwarm Pattern
View example →

Data Processing Pipeline

Overview

This example demonstrates a complete data processing pipeline with agents for ingestion, validation, transformation, and storage. The system automatically scales based on load.

1. Ingestion Agent

from artcafe_agent import BaseAgent
import json

class IngestionAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            agent_id="ingestion-agent",
            name="Data Ingestion Agent",
            capabilities=["data_ingestion", "validation"]
        )
    
    async def setup(self):
        await self.subscribe("data/incoming")
        self.metrics.create_counter("records_ingested")
    
    async def process_message(self, message):
        data = message.data
        
        # Basic validation
        if not self.validate_schema(data):
            await self.publish("data/errors", {
                "error": "Invalid schema",
                "data": data
            })
            return {"status": "rejected"}
        
        # Add metadata
        enriched_data = {
            **data,
            "ingested_at": datetime.now().isoformat(),
            "ingestion_agent": self.agent_id
        }
        
        # Send to processing queue
        await self.publish("data/processing", enriched_data)
        self.metrics.increment("records_ingested")
        
        return {"status": "accepted"}

2. Transformation Agent

class TransformationAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            agent_id="transform-agent",
            name="Data Transformation Agent",
            capabilities=["data_transformation", "enrichment"]
        )
        self.knowledge_graph = None
    
    async def setup(self):
        # Multiple workers compete for tasks
        await self.subscribe("data/processing", competing=True)
        self.knowledge_graph = await self.connect_knowledge_graph()
    
    async def process_message(self, message):
        data = message.data
        
        # Apply transformations
        transformed = await self.transform_data(data)
        
        # Enrich with knowledge graph
        context = await self.knowledge_graph.query(
            f"MATCH (e:Entity) WHERE e.id = '{data['entity_id']}' RETURN e"
        )
        
        enriched = {
            **transformed,
            "context": context,
            "ml_features": await self.extract_features(transformed)
        }
        
        # Send to storage
        await self.publish("data/storage", enriched)
        
        return {"status": "transformed", "record_id": data.get('id')}

3. Orchestrator

class PipelineOrchestrator(BaseAgent):
    def __init__(self):
        super().__init__(
            agent_id="pipeline-orchestrator",
            name="Pipeline Orchestrator",
            capabilities=["orchestration", "monitoring"]
        )
    
    async def setup(self):
        await self.subscribe("pipeline/commands")
        await self.subscribe("data/errors")
        
        # Monitor agent health
        self.schedule_task(self.monitor_pipeline, interval=30)
    
    async def monitor_pipeline(self):
        # Check agent statuses
        agents = ["ingestion-agent", "transform-agent", "storage-agent"]
        
        for agent_id in agents:
            status = await self.check_agent_health(agent_id)
            if status != "healthy":
                await self.handle_unhealthy_agent(agent_id)
        
        # Check processing metrics
        metrics = await self.get_pipeline_metrics()
        if metrics['queue_size'] > 1000:
            # Scale up workers
            await self.scale_workers('transform-agent', count=3)
    
    async def handle_batch_job(self, job):
        # Coordinate batch processing
        batch_id = str(uuid.uuid4())
        
        for record in job['records']:
            await self.publish("data/incoming", {
                **record,
                "batch_id": batch_id
            })
        
        # Track batch progress
        await self.track_batch(batch_id, len(job['records']))

Customer Support Bot System

Architecture

Multi-agent system with specialized agents for different support tasks, automatic escalation, and knowledge base integration.

# Main support agent that handles initial contact
class SupportAgent(BaseAgent):
    def __init__(self):
        super().__init__(
            agent_id="support-main",
            name="Customer Support Agent",
            capabilities=["nlp", "classification", "routing"]
        )
        self.classifier = None
    
    async def setup(self):
        await self.subscribe("support/incoming")
        self.classifier = await self.load_model("support_classifier")
    
    async def process_message(self, message):
        inquiry = message.data
        
        # Classify inquiry type
        category = await self.classifier.predict(inquiry['text'])
        
        # Route to specialized agent
        if category == "technical":
            await self.publish("support/technical", inquiry)
        elif category == "billing":
            await self.publish("support/billing", inquiry)
        elif category == "general":
            await self.handle_general_inquiry(inquiry)
        else:
            # Escalate to human
            await self.escalate_to_human(inquiry)

# Technical support specialist
class TechnicalSupportAgent(BaseAgent):
    async def setup(self):
        await self.subscribe("support/technical")
        self.kb = await self.connect_knowledge_base()
    
    async def process_message(self, message):
        inquiry = message.data
        
        # Search knowledge base
        solutions = await self.kb.search(inquiry['text'], limit=5)
        
        if solutions and solutions[0].confidence > 0.8:
            response = await self.generate_response(solutions[0])
            await self.send_response(inquiry['customer_id'], response)
        else:
            # Need human help
            await self.escalate_with_context(inquiry, solutions)

Want to see more examples?

Check out our GitHub repository for complete, runnable examples.

View on GitHub →