Back to Tutorials
Advanced
45 min
Advanced Patterns

Building a Swarm

Create collaborative agent swarms with shared intelligence

You'll Learn

  • Swarm architecture
  • Consensus mechanisms
  • Task distribution
  • Collective memory

Introduction to Agent Swarms

Agent swarms are collections of agents that work together to solve complex problems. Like swarms in nature, they exhibit emergent intelligence through simple individual behaviors and communication.

Swarm Architecture

Core Components

  1. Scout Agents: Discover and analyze tasks
  2. Worker Agents: Execute specific operations
  3. Coordinator Agents: Orchestrate collaboration
  4. Memory Agents: Maintain shared state

Building Your First Swarm

Step 1: Define the Swarm Protocol

# swarm_protocol.py
from dataclasses import dataclass
from typing import List, Dict, Any
from enum import Enum

class SwarmRole(Enum):
    SCOUT = "scout"
    WORKER = "worker"
    COORDINATOR = "coordinator"
    MEMORY = "memory"

@dataclass
class SwarmTask:
    id: str
    type: str
    data: Dict[str, Any]
    requirements: List[str]
    priority: int = 0

@dataclass
class SwarmState:
    active_tasks: List[SwarmTask]
    completed_tasks: List[str]
    agent_capabilities: Dict[str, List[str]]
    performance_metrics: Dict[str, float]

Step 2: Implement Scout Agents

from artcafe_sdk import Agent
import asyncio

class ScoutAgent(Agent):
    def __init__(self, scout_id: int):
        super().__init__(
            name=f"scout-{scout_id}",
            capabilities=["discovery", "analysis"]
        )
        
    async def on_start(self):
        await self.subscribe("environment.events")
        await self.subscribe("swarm.coordination.requests")
        
        # Start periodic environment scanning
        asyncio.create_task(self.scan_environment())
    
    async def scan_environment(self):
        while True:
            # Simulate environment scanning
            await asyncio.sleep(5)
            
            # Discover new tasks
            task = await self.discover_task()
            if task:
                await self.publish("swarm.tasks.discovered", {
                    "task": task,
                    "scout_id": self.name,
                    "timestamp": datetime.now().isoformat()
                })
    
    async def discover_task(self):
        # Implement task discovery logic
        # This could involve API calls, sensor readings, etc.
        pass

Step 3: Create Worker Agents

class WorkerAgent(Agent):
    def __init__(self, worker_id: int, capabilities: List[str]):
        super().__init__(
            name=f"worker-{worker_id}",
            capabilities=capabilities
        )
        self.current_task = None
        
    async def on_start(self):
        # Subscribe to task assignments
        await self.subscribe(f"swarm.tasks.assign.{self.name}")
        await self.subscribe("swarm.tasks.available")
        
        # Announce capabilities
        await self.publish("swarm.agents.online", {
            "agent_id": self.name,
            "capabilities": self.capabilities,
            "status": "ready"
        })
    
    async def on_message(self, message):
        if message.topic.startswith("swarm.tasks.assign"):
            await self.handle_task_assignment(message.data)
        elif message.topic == "swarm.tasks.available":
            await self.bid_on_task(message.data)
    
    async def bid_on_task(self, task_data):
        task = SwarmTask(**task_data)
        
        # Check if we can handle this task
        if self.can_handle_task(task):
            bid_score = self.calculate_bid_score(task)
            await self.publish("swarm.tasks.bids", {
                "task_id": task.id,
                "agent_id": self.name,
                "bid_score": bid_score,
                "estimated_time": self.estimate_completion_time(task)
            })
    
    def can_handle_task(self, task: SwarmTask) -> bool:
        return all(req in self.capabilities for req in task.requirements)

Step 4: Implement the Coordinator

class CoordinatorAgent(Agent):
    def __init__(self):
        super().__init__(
            name="coordinator-prime",
            capabilities=["orchestration", "decision-making"]
        )
        self.swarm_state = SwarmState(
            active_tasks=[],
            completed_tasks=[],
            agent_capabilities={},
            performance_metrics={}
        )
        
    async def on_start(self):
        await self.subscribe("swarm.tasks.discovered")
        await self.subscribe("swarm.tasks.bids")
        await self.subscribe("swarm.tasks.completed")
        await self.subscribe("swarm.agents.online")
        await self.subscribe("swarm.agents.offline")
    
    async def on_message(self, message):
        if message.topic == "swarm.tasks.discovered":
            await self.handle_new_task(message.data)
        elif message.topic == "swarm.tasks.bids":
            await self.handle_task_bid(message.data)
        elif message.topic == "swarm.tasks.completed":
            await self.handle_task_completion(message.data)
    
    async def handle_new_task(self, data):
        task = SwarmTask(**data["task"])
        self.swarm_state.active_tasks.append(task)
        
        # Announce task availability
        await self.publish("swarm.tasks.available", task.__dict__)
        
        # Start bid collection timer
        asyncio.create_task(self.collect_bids(task.id))
    
    async def collect_bids(self, task_id: str):
        # Wait for bids
        await asyncio.sleep(2.0)
        
        # Select best bidder
        best_agent = await self.select_best_bidder(task_id)
        if best_agent:
            await self.assign_task(task_id, best_agent)

Step 5: Add Collective Memory

class MemoryAgent(Agent):
    def __init__(self):
        super().__init__(
            name="memory-core",
            capabilities=["storage", "retrieval", "analysis"]
        )
        self.shared_memory = {}
        self.task_history = []
        
    async def on_start(self):
        await self.subscribe("swarm.memory.store")
        await self.subscribe("swarm.memory.query")
        await self.subscribe("swarm.tasks.completed")
    
    async def on_message(self, message):
        if message.topic == "swarm.memory.store":
            await self.store_memory(message.data)
        elif message.topic == "swarm.memory.query":
            await self.retrieve_memory(message.data)
        elif message.topic == "swarm.tasks.completed":
            await self.learn_from_task(message.data)
    
    async def learn_from_task(self, task_data):
        # Extract patterns and insights
        self.task_history.append(task_data)
        
        # Update performance metrics
        if len(self.task_history) > 10:
            insights = self.analyze_performance()
            await self.publish("swarm.insights.performance", insights)

Advanced Swarm Patterns

Consensus Mechanisms

class ConsensusManager:
    def __init__(self, agent: Agent):
        self.agent = agent
        self.votes = {}
        
    async def propose_action(self, action: str, data: Any):
        proposal_id = str(uuid.uuid4())
        await self.agent.publish("swarm.consensus.proposal", {
            "id": proposal_id,
            "action": action,
            "data": data,
            "proposer": self.agent.name
        })
        
        # Collect votes
        votes = await self.collect_votes(proposal_id, timeout=5.0)
        return self.evaluate_consensus(votes)
    
    def evaluate_consensus(self, votes: List[Dict]) -> bool:
        # Simple majority
        yes_votes = sum(1 for v in votes if v["vote"] == "yes")
        return yes_votes > len(votes) / 2

Dynamic Role Assignment

class AdaptiveAgent(Agent):
    def __init__(self, agent_id: int):
        super().__init__(name=f"adaptive-{agent_id}")
        self.current_role = SwarmRole.WORKER
        self.performance_score = 0.5
        
    async def evaluate_role_change(self):
        # Analyze swarm needs
        swarm_metrics = await self.get_swarm_metrics()
        
        # Determine optimal role
        if swarm_metrics["scout_shortage"] and self.performance_score > 0.7:
            await self.switch_role(SwarmRole.SCOUT)
        elif swarm_metrics["coordinator_needed"] and self.performance_score > 0.9:
            await self.switch_role(SwarmRole.COORDINATOR)

Running the Swarm

# launch_swarm.py
import asyncio
from swarm_agents import ScoutAgent, WorkerAgent, CoordinatorAgent, MemoryAgent

async def launch_swarm():
    # Create coordinator and memory
    coordinator = CoordinatorAgent()
    memory = MemoryAgent()
    
    # Create scouts
    scouts = [ScoutAgent(i) for i in range(3)]
    
    # Create workers with different capabilities
    workers = [
        WorkerAgent(0, ["image_processing", "ocr"]),
        WorkerAgent(1, ["nlp", "translation"]),
        WorkerAgent(2, ["data_analysis", "ml"]),
        WorkerAgent(3, ["web_scraping", "api_integration"]),
        WorkerAgent(4, ["image_processing", "ml"])
    ]
    
    # Start all agents
    all_agents = [coordinator, memory] + scouts + workers
    await asyncio.gather(*[agent.start() for agent in all_agents])
    
    print(f"Swarm launched with {len(all_agents)} agents")
    
    # Keep running
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(launch_swarm())

Monitoring and Optimization

Swarm Dashboard Metrics

  • Active agents by role
  • Task completion rate
  • Average task duration
  • Communication overhead
  • Consensus success rate

Performance Optimization

  1. Reduce Message Overhead

    • Use topic hierarchies efficiently
    • Batch related messages
    • Implement message priorities
  2. Balance Work Distribution

    • Monitor agent utilization
    • Implement work stealing
    • Use predictive task assignment
  3. Optimize Consensus

    • Use quorum-based decisions
    • Implement timeout mechanisms
    • Cache frequent decisions

Next Steps

  • Implement specialized swarm behaviors (foraging, flocking)
  • Add machine learning for task prediction
  • Create hybrid swarms with different agent types
  • Build fault-tolerant consensus protocols

Resources