Back to Tutorials
Beginner
20 min
Getting Started

Understanding Channels

Learn how to use channels for agent communication

You'll Learn

  • Pub/sub concepts
  • Topic hierarchies
  • Subscriptions
  • Publishing messages

What are Channels?

Channels in ArtCafe.ai are named communication paths that agents use to exchange messages. Think of them as topic-based message streams where agents can publish and subscribe.

Channel Hierarchy

ArtCafe uses a hierarchical topic structure with dots as separators:

tasks.vision.ocr
tasks.vision.detection
tasks.nlp.translation
tasks.nlp.sentiment

Subscription Patterns

Exact Match

# Only receives messages on this exact topic
await agent.subscribe("tasks.vision.ocr")

Wildcards

Single Level (*)

# Matches any single level
await agent.subscribe("tasks.*.ocr")
# Matches: tasks.vision.ocr, tasks.experimental.ocr
# NOT: tasks.vision.advanced.ocr

Multi Level (>)

# Matches any number of levels
await agent.subscribe("tasks.vision.>")
# Matches: tasks.vision.ocr, tasks.vision.detection.faces

Publishing Messages

Basic Publishing

await agent.publish("tasks.vision.ocr", {
    "image_url": "https://example.com/image.jpg",
    "language": "en"
})

With Headers

await agent.publish(
    topic="tasks.vision.ocr",
    data={"image_url": "..."},
    headers={
        "priority": "high",
        "deadline": "2024-01-20T10:00:00Z"
    }
)

Channel Patterns

Request-Reply

# Request
reply_topic = f"replies.{agent.id}.{uuid.uuid4()}"
await agent.publish("services.translate", {
    "text": "Hello",
    "target": "es",
    "reply_to": reply_topic
})

# Listen for reply
response = await agent.wait_for_message(reply_topic, timeout=5.0)

Broadcast

# All subscribed agents receive this
await agent.publish("alerts.system", {
    "level": "warning",
    "message": "High memory usage detected"
})

Work Queue

# Multiple workers, but each message processed once
await agent.subscribe("tasks.process", queue_group="workers")

Best Practices

  1. Use Clear Hierarchies

    • company.department.service.action
    • domain.category.subcategory.specific
  2. Avoid Deep Nesting

    • Keep to 3-4 levels maximum
    • Use parameters in message data instead
  3. Plan for Growth

    • Design topics to accommodate new services
    • Use versioning when needed: api.v1.users

Example: Multi-Agent System

# Coordinator agent
class Coordinator(Agent):
    async def on_start(self):
        await self.subscribe("results.>")
    
    async def distribute_work(self, tasks):
        for i, task in enumerate(tasks):
            await self.publish(f"work.batch.{i % 5}", task)

# Worker agents
class Worker(Agent):
    def __init__(self, worker_id):
        super().__init__(name=f"worker-{worker_id}")
        self.worker_id = worker_id
    
    async def on_start(self):
        # Join work queue group
        await self.subscribe(
            f"work.batch.{self.worker_id}", 
            queue_group="workers"
        )
    
    async def on_message(self, message):
        result = await self.process_task(message.data)
        await self.publish("results.processed", result)

Monitoring Channels

Use the ArtCafe dashboard to:

  • View active channels
  • Monitor message flow
  • Debug subscription issues
  • Analyze performance metrics

Next Steps

Now that you understand channels, learn about: