Skip to content

Interrupts DB

1. Introduction

The Interrupts DB provides a structured backend service for storing and querying two types of objects critical to coordinating distributed interrupt handling:

  • InterruptMessage: Represents an interrupt sent in the system, with metadata such as TTL, type, and routing policy.
  • InterruptWaiter: Tracks which subjects are waiting for responses to a given interrupt message and their responses.

This database interface is backed by TimescaleDB and exposes a Flask-based REST API for interacting with these objects programmatically.


Architecture

The Interrupts DB System is a distributed event-coordination and tracking service responsible for managing asynchronous interrupt messages and their acknowledgments in multi-agent environments. It ensures reliable persistence, policy-driven forwarding, and acknowledgment routing of interrupt and feedback messages between agents or services.

This system integrates a TimescaleDB backend, Flask REST and GraphQL query layers, and a NATS-based messaging fabric, enabling real-time interrupt dispatch, TTL-based expiry, and multi-target response tracking with policy enforcement.

interrupts-system

Download Image


1. Persistence and TTL Lifecycle Engine

This subsystem manages the creation, querying, and expiration of InterruptMessage and InterruptWaiter entries stored in TimescaleDB. It ensures time-sensitive delivery and tracks all response states.

InterruptMessage Table

Stores metadata about each interrupt including its ID, type, TTL, status, and forwarding policy. TTL is used for auto-expiry and to enforce time-bounded coordination.

InterruptWaiter Table

Tracks which subjects are expected to respond to an interrupt and maintains a mapping of received responses. This enables downstream fulfillment and resolution checks.

TTL Checker / Expiry Event Engine

Periodically scans for expired messages. When TTL elapses:

  • Updates the message status to "expired"
  • Emits expiry notifications to both source and target subjects

DB Client API

Provides internal access for creating messages and waiters, updating responses, and checking completion criteria. It’s the primary interface used by all messaging and query layers.

DB Query Module

Exposes REST and GraphQL endpoints to query interrupt messages and waiters based on attributes such as type, subject, or status.


2. Messaging and ACK Coordination Engine

This subsystem handles interrupt emission, feedback acknowledgment, success routing, and policy validation using a NATS-based communication layer.

Interrupt/Feedback ACK Events Listener

A persistent NATS subscription that listens on the interrupt.ack topic. When an ACK is received, it:

  • Validates the message ID and subject
  • Stores the response in the corresponding InterruptWaiter
  • Checks if forwarding policy criteria have been fulfilled

Interrupt/Feedback Forwarding Policy Checker

Evaluates the message’s forwarding_policy (e.g., broadcast, all, any). Once conditions are met:

  • A success ACK is sent back to the source subject
  • The InterruptMessage status is updated
  • Additional hooks (e.g., constraint enforcement) may be triggered

Interrupt/Feedback ACK Handler

Verifies if the source subject should be notified based on partial or complete responses. Delegates event emission to the forwarder or expiry handler.

Expiry Handler

When TTL expires before the forwarding policy is satisfied, this module:

  • Marks the message as "expired" in the DB
  • Sends expiry notifications to all relevant subjects via NATS

Interrupts Feedback Forwarder

Publishes success ACKs and feedback payloads to the source subject once the forwarding policy is fulfilled. Acts as the dispatch point for final delivery.


3. Constraint and Validation Layer

This optional layer integrates with the broader constraint checker architecture to validate content of responses or enforce rejection rules before forwarding an ACK.

Constraint Checker Integration

Intercepts interrupt/feedback responses before ACK delivery. If the response violates DSL-defined constraints:

  • Sends a reject ACK to the source subject
  • Flags the interrupt as failed or unresolved

4. Public Query Interfaces

This subsystem allows agents and administrators to query the state of interrupts, waiters, and acknowledgments in both REST and GraphQL formats.

Query REST API

Exposes endpoints for searching by:

  • message_id
  • status
  • source_subject_id
  • type, ttl, creation_ts

Query GraphQL Middleware

Enables expressive filtering of interrupt and waiter entities using GraphQL schema. Supports nested search, partial field selection, and projection-based queries.


Schema

InterruptMessage

@dataclass
class InterruptMessage:
    message_id: str
    type: str
    internal_data: Dict[str, Any]
    status: str
    ttl: int
    creation_ts: int
    forwarding_policy: str
Field Type Description
message_id str Unique ID of the interrupt message
type str Type or category of the interrupt
internal_data JSON Arbitrary structured data used by the system internally
status str Status of the message (e.g., pending, resolved, expired)
ttl int Time to live in seconds
creation_ts int Unix timestamp of when the message was created
forwarding_policy str Policy describing how to forward the interrupt (e.g., broadcast, direct)

InterruptWaiter

@dataclass
class InterruptWaiter:
    message_id: str
    source_subject_id: str
    destination_subject_ids: List[str]
    responses: Dict[str, Any]
Field Type Description
message_id str ID of the interrupt message this waiter belongs to
source_subject_id str ID of the subject that initiated the interrupt
destination_subject_ids List[str] List of subject IDs expected to respond to this interrupt
responses JSON Responses from the destination subjects mapped by subject ID

APIs Documentation with cURL Examples

Insert InterruptMessage

POST /interrupt/message

curl -X POST http://localhost:8000/interrupt/message \
     -H "Content-Type: application/json" \
     -d '{
           "message_id": "msg123",
           "type": "shutdown",
           "internal_data": {"reason": "maintenance"},
           "status": "pending",
           "ttl": 60,
           "creation_ts": 1717234567,
           "forwarding_policy": "broadcast"
         }'

Update InterruptMessage

PUT /interrupt/message/\:message_id

curl -X PUT http://localhost:8000/interrupt/message/msg123 \
     -H "Content-Type: application/json" \
     -d '{
           "status": "resolved",
           "ttl": 120
         }'

Delete InterruptMessage

DELETE /interrupt/message/\:message_id

curl -X DELETE http://localhost:8000/interrupt/message/msg123

Get InterruptMessage by ID

GET /interrupt/message/\:message_id

curl -X GET http://localhost:8000/interrupt/message/msg123

Query InterruptMessages

POST /interrupt/message/query

curl -X POST http://localhost:8000/interrupt/message/query \
     -H "Content-Type: application/json" \
     -d '{"status": "pending"}'

Insert InterruptWaiter

POST /interrupt/waiter

curl -X POST http://localhost:8000/interrupt/waiter \
     -H "Content-Type: application/json" \
     -d '{
           "message_id": "msg123",
           "source_subject_id": "subjectA",
           "destination_subject_ids": ["subjectB", "subjectC"],
           "responses": {}
         }'

Update InterruptWaiter

PUT /interrupt/waiter/\:message_id

curl -X PUT http://localhost:8000/interrupt/waiter/msg123 \
     -H "Content-Type: application/json" \
     -d '{
           "responses": {
             "subjectB": {"ack": true}
           }
         }'

Delete InterruptWaiter

DELETE /interrupt/waiter/\:message_id

curl -X DELETE http://localhost:8000/interrupt/waiter/msg123

Get InterruptWaiter by ID

GET /interrupt/waiter/\:message_id

curl -X GET http://localhost:8000/interrupt/waiter/msg123

Query InterruptWaiters

POST /interrupt/waiter/query

curl -X POST http://localhost:8000/interrupt/waiter/query \
     -H "Content-Type: application/json" \
     -d '{"source_subject_id": "subjectA"}'

Certainly — here’s the documentation continuing directly from Section 4:


Creating a Message (Interrupt Registration)

To initiate a new interrupt, use the send_interrupt() method from interrupt_emitter.py. This registers the message in the database and publishes it to all designated target subjects via NATS.

Python Example

from interrupt_emitter import send_interrupt

send_interrupt(
    message_id="msg456",
    type_="reboot-request",
    source_subject_id="controller-A",
    destination_subject_ids=["agent-1", "agent-2"],
    internal_data={"reason": "config mismatch"},
    ttl=120,
    forwarding_policy="all"
)

This will:

  • Save the InterruptMessage and corresponding InterruptWaiter to TimescaleDB.
  • Send NATS messages to subjects: interrupt.agent-1 and interrupt.agent-2.

Flow Explanation

The full interrupt lifecycle works as follows:

  1. Message Registration The source (e.g., controller-A) calls send_interrupt() with the message ID, target subjects, type, and payload.

  2. Persistence and Dispatch

  3. Message is saved in the interrupt_messages table.

  4. A waiter entry is created to track which subjects are expected to respond.
  5. NATS messages are dispatched to each interrupt.<destination> subject.

  6. ACK Listener Handling

  7. Subjects send responses to the interrupt.ack topic.

  8. The listener updates the InterruptWaiter.responses map with each subject's response.

  9. Success Criteria Evaluation

  10. When all destination subjects respond, the forwarding_policy is checked.

  11. If satisfied, a Success ACK is published back to the source.

  12. Constraint Enforcement

  13. (Optional) Constraint checks can reject messages even if all responses are received.

  14. Expiry Logic

  15. If the TTL expires before all expected responses arrive:

    • The message status is updat# Interrupts DB

Sending a Response (Interrupt ACK Format)

Each responding subject must publish to the interrupt.ack subject using the format below:

{
  "message_id": "msg456",
  "subject_id": "agent-1",
  "response": {
    "accepted": true,
    "details": "Reboot scheduled"
  }
}

Python Example to Send ACK

import asyncio
import json
from nats.aio.client import Client as NATS

async def send_ack():
    nc = NATS()
    await nc.connect(servers=["nats://localhost:4222"])

    ack_msg = {
        "message_id": "msg456",
        "subject_id": "agent-1",
        "response": {
            "accepted": True,
            "details": "Reboot scheduled"
        }
    }

    await nc.publish("interrupt.ack", json.dumps(ack_msg).encode())
    await nc.drain()

asyncio.run(send_ack())

Receiving a Notification as a Target Subject

All target subjects must subscribe to interrupt.<your_subject_id> to receive interrupt messages.

Python Example to Listen for Interrupts

import asyncio
from nats.aio.client import Client as NATS

async def listen_interrupts():
    nc = NATS()
    await nc.connect(servers=["nats://localhost:4222"])

    async def callback(msg):
        data = msg.data.decode()
        print("Received interrupt:", data)

    await nc.subscribe("interrupt.agent-1", cb=callback)

    print("Listening for interrupts...")
    while True:
        await asyncio.sleep(1)

asyncio.run(listen_interrupts())

Expiry Notifications

If a message expires before all subjects respond (based on ttl):

  • The system updates the message status to "expired" in the database.
  • Sends expiry notices to:

  • interrupt.<source_subject_id>

  • interrupt.<each_target_subject_id>

Example Expiry Notification

{
  "message_id": "msg456",
  "event": "expired"
}

These notifications allow components to handle stale interrupts proactively.


ed to expired. * Expiry notifications are sent to source and target subjects.