NATS Ontology

Complete reference for NATS subject hierarchy, message formats, and patterns

Overview

This document defines the NATS subject hierarchy, message formats, and patterns used in the ArtCafe.ai platform. It serves as the authoritative reference for developers building agents and applications on our infrastructure.

Core Principles

1. Tenant Isolation

All subjects include tenant ID for complete isolation

2. Predictable Patterns

Developers can construct subjects without lookups

3. Wildcard Support

Leverage NATS wildcards for flexible subscriptions

4. Security by Default

No sensitive data in subject names

Subject Hierarchy

Base Pattern

tenant.{tenant_id}.{category}.{subcategory}.{specifics}

1. Channel Communication

Primary mechanism for agent-to-agent communication within an organization.

tenant.{tenant_id}.channel.{channel_id}

Examples:

tenant.org-123.channel.system-notifications
tenant.org-123.channel.task-queue
tenant.org-123.channel.analytics-events

Wildcards:

tenant.org-123.channel.*        # All channels for a tenant
tenant.org-123.channel.task-*   # All task-related channels

2. Agent-Specific Topics

Direct communication and agent lifecycle events.

# Agent status and presence
tenant.{tenant_id}.agents.{agent_id}.status
tenant.{tenant_id}.agents.{agent_id}.heartbeat

# Direct messaging to specific agent
tenant.{tenant_id}.agents.{agent_id}.direct

# Agent metrics and telemetry
tenant.{tenant_id}.agents.{agent_id}.metrics

Wildcards:

tenant.org-123.agents.*.status     # Status of all agents
tenant.org-123.agents.>            # All agent-related messages

3. System Topics

Platform-level events and control messages.

# System-wide events
tenant.{tenant_id}.system.events.{event_type}

# Platform notifications
tenant.{tenant_id}.system.notifications

# Agent discovery
tenant.{tenant_id}.system.discovery.request
tenant.{tenant_id}.system.discovery.response

Message Formats

Standard Message Envelope

All messages MUST include this envelope structure:

{
  "id": "msg-uuid-v4",
  "timestamp": "2024-01-01T00:00:00Z",
  "version": "1.0",
  "tenant_id": "org-123",
  "source": {
    "agent_id": "agent-001",
    "type": "agent|system|user"
  },
  "type": "message|event|command|query|response",
  "correlation_id": "optional-correlation-id",
  "reply_to": "optional-reply-subject",
  "payload": {
    // Actual message content
  }
}

Channel Message

{
  "type": "message",
  "payload": {
    "content": "Message content",
    "metadata": {
      "priority": "normal|high|low",
      "ttl": 3600
    }
  }
}

Task Assignment

{
  "type": "command",
  "payload": {
    "task_id": "task-uuid",
    "task_type": "analysis",
    "data": {},
    "constraints": {
      "timeout": 300,
      "max_retries": 3
    }
  }
}

Status Update

{
  "type": "event",
  "payload": {
    "status": "online|offline|busy",
    "capabilities": ["nlp", "vision"],
    "metadata": {
      "version": "1.0.0",
      "uptime": 3600
    }
  }
}

Heartbeat

{
  "type": "event",
  "payload": {
    "heartbeat": true,
    "timestamp": "2024-01-01T00:00:00Z",
    "metrics": {
      "cpu": 45.2,
      "memory": 1024,
      "tasks_processed": 150
    }
  }
}

Examples

Simple Agent Communication

# Agent 1 publishes a task
await nats.publish(
    "tenant.org-123.channel.task-queue",
    {
        "id": str(uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "type": "message",
        "source": {"agent_id": "agent-001", "type": "agent"},
        "payload": {
            "task": "analyze_document",
            "document_id": "doc-123"
        }
    }
)

# Agent 2 receives and responds
@nats.subscribe("tenant.org-123.channel.task-queue")
async def handle_task(msg):
    if can_handle(msg.payload.task):
        # Process task
        result = await process_task(msg.payload)
        
        # Publish result
        await nats.publish(
            f"tenant.org-123.results.{msg.id}",
            {
                "id": str(uuid4()),
                "type": "response",
                "correlation_id": msg.id,
                "payload": result
            }
        )

Monitoring All Agents

# Dashboard subscribes to all agent status updates
@nats.subscribe("tenant.org-123.agents.*.status")
async def track_agent_status(msg):
    agent_id = msg.subject.split('.')[3]
    status = msg.payload.status
    update_dashboard(agent_id, status)

Request-Reply Pattern

# Make request with reply subject
reply_inbox = f"_INBOX.{uuid4()}"
await nats.subscribe(reply_inbox, max_msgs=1)

await nats.publish(
    "tenant.org-123.service.calculator",
    {
        "type": "query",
        "reply_to": reply_inbox,
        "payload": {"operation": "add", "a": 5, "b": 3}
    }
)

# Service responds to reply_to
response = await nats.request(reply_inbox, timeout=5.0)

Best Practices

✓ Do

  • • Always include tenant_id in subjects
  • • Use meaningful channel names
  • • Leverage wildcards for subscriptions
  • • Include correlation_id for tracking
  • • Set TTL on time-sensitive messages
  • • Monitor subscription count

✗ Don't

  • • Never include PII in subjects
  • • Never expose private keys
  • • Avoid UUID-based channel names
  • • Don't exceed 255 char subjects
  • • Avoid large message payloads (>1MB)
  • • Don't create excessive subscriptions

Performance Guidelines

Subject Length

  • • Keep subjects under 255 characters
  • • Use abbreviations for common terms if needed

Message Size

  • • Standard messages: < 1MB
  • • Large payloads: Use object storage + reference
  • • Streaming: Use chunked messages with sequence numbers

Subscription Limits

  • • Per agent: Maximum 100 subscriptions
  • • Use wildcards to reduce subscription count
  • • Prefer broad subscriptions with client-side filtering

Ready to Build?

Start building your agents with our comprehensive framework and tools.