Konnektr LogoKonnektr
EVENTS_STREAMING

Events & Streaming

Your knowledge graph isn't static. Stream every change to Kafka, Webhooks, or time-series databases. Give your agents the full history they need.

Route Events Anywhere

Konnektr acts as a nervous system, routing graph changes to downstream systems in real-time.

Kafka / Event Hubs

High-throughput streaming for microservices. Native support for Azure Event Hubs and Confluent.

Webhooks

Trigger serverless functions or notify external systems. Works with Azure Functions, AWS Lambda, or any HTTP endpoint.

Azure Data Explorer

Native sink for Kusto. Turn graph updates into an append-only historical log for analytics and auditing.

MQTT

Broadcast updates to IoT devices or edge gateways. Lightweight pub/sub for constrained environments.

CLOUDEVENTS_1.0

Industry-Standard Format

No proprietary formats. We emit CloudEvents 1.0, the CNCF standard for event data. Consume with any standard library in Python, Go, Node.js, or Java.

Konnektr.Graph.Twin.Update

Emitted when node properties change. Payload contains JSON Patch operations.

Konnektr.Graph.Relationship.Create

Emitted when new relationships are formed between nodes.

Konnektr.Graph.Twin.Delete

Emitted when nodes are removed. Includes full snapshot for audit trails.

Node.Update.jsonJSON
{
  "specversion": "1.0",
  "type": "Konnektr.Graph.Twin.Update",
  "source": "https://api.graph.konnektr.io",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "time": "2025-01-20T10:30:00Z",
  "subject": "customer-acme-corp",
  "datacontenttype": "application/json",
  "data": {
    "modelId": "dtmi:konnektr:Customer;1",
    "patch": [
      {
        "op": "replace",
        "path": "/healthScore",
        "value": 72
      }
    ]
  }
}

Consume Events Your Way

Standard CloudEvents format means you can use your existing tools and libraries.

Kafka Consumer

consumer.pyPython
from cloudevents.http import from_json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'konnektr-graph-events',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: from_json(m)
)

for message in consumer:
    event = message.value

    if event['type'] == 'Konnektr.Graph.Twin.Update':
        node_id = event['subject']
        changes = event['data']['patch']

        # Trigger downstream actions
        if any(p['path'] == '/healthScore' for p in changes):
            notify_customer_success(node_id, changes)

Webhook Handler

graphWebhook.tsTypeScript
// Azure Function triggered by Konnektr webhook
import { app, HttpRequest, HttpResponseInit } from "@azure/functions";

app.http('graphWebhook', {
  methods: ['POST'],
  async handler(request: HttpRequest): Promise<HttpResponseInit> {
    const event = await request.json();

    // CloudEvents standard format
    const { type, subject, data } = event;

    if (type === 'Konnektr.Graph.Relationship.Create') {
      // New relationship detected
      await updateSearchIndex(subject, data);
      await notifySlack(`New connection: ${subject}`);
    }

    return { status: 200 };
  }
});
TEMPORAL_CONTEXT

Give Agents Historical Context

AI agents often need to know what happened, not just what is. By routing events to a time-series database, you give your agents long-term memory they can query.

Operational Context

"What changed in the customer's account before they churned?"

Agents can query historical events to understand the sequence of changes that led to an outcome.

Audit Trail

"Who modified this record and when?"

Every change is logged with user attribution, timestamps, and the full JSON Patch for compliance.

Time Travel Queries

"Show me the state of this entity 30 days ago"

Reconstruct any point-in-time view of your graph by replaying events from your history store.

Trend Analysis

"How has this metric changed over the past quarter?"

Aggregate historical events to identify patterns and trends that inform agent decisions.

Real-Time Intelligence

Event streaming enables reactive workflows that respond to graph changes instantly.

Real-Time Alerts

Trigger notifications when specific patterns appear in your graph. Alert on anomalies before they become problems.

Index Synchronization

Keep search indices, caches, and derived data stores in sync with your authoritative graph.

Agent Memory Updates

Push graph changes to your agents so they always have the latest context without polling.

Ready to stream your graph?

Configure event routing in minutes. Connect to Kafka, webhooks, or your time-series database of choice.

Cookie Notice

We use cookies to enhance your browsing experience.