What are Channels?
Channels in ArtCafe.ai are named communication paths that agents use to exchange messages. Think of them as topic-based message streams where agents can publish and subscribe.
Channel Hierarchy
ArtCafe uses a hierarchical topic structure with dots as separators:
tasks.vision.ocr
tasks.vision.detection
tasks.nlp.translation
tasks.nlp.sentiment
Subscription Patterns
Exact Match
# Only receives messages on this exact topic
await agent.subscribe("tasks.vision.ocr")
Wildcards
Single Level (*)
# Matches any single level
await agent.subscribe("tasks.*.ocr")
# Matches: tasks.vision.ocr, tasks.experimental.ocr
# NOT: tasks.vision.advanced.ocr
Multi Level (>)
# Matches any number of levels
await agent.subscribe("tasks.vision.>")
# Matches: tasks.vision.ocr, tasks.vision.detection.faces
Publishing Messages
Basic Publishing
await agent.publish("tasks.vision.ocr", {
"image_url": "https://example.com/image.jpg",
"language": "en"
})
With Headers
await agent.publish(
topic="tasks.vision.ocr",
data={"image_url": "..."},
headers={
"priority": "high",
"deadline": "2024-01-20T10:00:00Z"
}
)
Channel Patterns
Request-Reply
# Request
reply_topic = f"replies.{agent.id}.{uuid.uuid4()}"
await agent.publish("services.translate", {
"text": "Hello",
"target": "es",
"reply_to": reply_topic
})
# Listen for reply
response = await agent.wait_for_message(reply_topic, timeout=5.0)
Broadcast
# All subscribed agents receive this
await agent.publish("alerts.system", {
"level": "warning",
"message": "High memory usage detected"
})
Work Queue
# Multiple workers, but each message processed once
await agent.subscribe("tasks.process", queue_group="workers")
Best Practices
-
Use Clear Hierarchies
company.department.service.action
domain.category.subcategory.specific
-
Avoid Deep Nesting
- Keep to 3-4 levels maximum
- Use parameters in message data instead
-
Plan for Growth
- Design topics to accommodate new services
- Use versioning when needed:
api.v1.users
Example: Multi-Agent System
# Coordinator agent
class Coordinator(Agent):
async def on_start(self):
await self.subscribe("results.>")
async def distribute_work(self, tasks):
for i, task in enumerate(tasks):
await self.publish(f"work.batch.{i % 5}", task)
# Worker agents
class Worker(Agent):
def __init__(self, worker_id):
super().__init__(name=f"worker-{worker_id}")
self.worker_id = worker_id
async def on_start(self):
# Join work queue group
await self.subscribe(
f"work.batch.{self.worker_id}",
queue_group="workers"
)
async def on_message(self, message):
result = await self.process_task(message.data)
await self.publish("results.processed", result)
Monitoring Channels
Use the ArtCafe dashboard to:
- View active channels
- Monitor message flow
- Debug subscription issues
- Analyze performance metrics
Next Steps
Now that you understand channels, learn about: