Events & Streaming
Your knowledge graph isn't static. Stream every change to Kafka, Webhooks, or time-series databases. Give your agents the full history they need.
Route Events Anywhere
Konnektr acts as a nervous system, routing graph changes to downstream systems in real-time.
Kafka / Event Hubs
High-throughput streaming for microservices. Native support for Azure Event Hubs and Confluent.
Webhooks
Trigger serverless functions or notify external systems. Works with Azure Functions, AWS Lambda, or any HTTP endpoint.
Azure Data Explorer
Native sink for Kusto. Turn graph updates into an append-only historical log for analytics and auditing.
MQTT
Broadcast updates to IoT devices or edge gateways. Lightweight pub/sub for constrained environments.
Industry-Standard Format
No proprietary formats. We emit CloudEvents 1.0, the CNCF standard for event data. Consume with any standard library in Python, Go, Node.js, or Java.
Konnektr.Graph.Twin.Update
Emitted when node properties change. Payload contains JSON Patch operations.
Konnektr.Graph.Relationship.Create
Emitted when new relationships are formed between nodes.
Konnektr.Graph.Twin.Delete
Emitted when nodes are removed. Includes full snapshot for audit trails.
{
"specversion": "1.0",
"type": "Konnektr.Graph.Twin.Update",
"source": "https://api.graph.konnektr.io",
"id": "550e8400-e29b-41d4-a716-446655440000",
"time": "2025-01-20T10:30:00Z",
"subject": "customer-acme-corp",
"datacontenttype": "application/json",
"data": {
"modelId": "dtmi:konnektr:Customer;1",
"patch": [
{
"op": "replace",
"path": "/healthScore",
"value": 72
}
]
}
}Consume Events Your Way
Standard CloudEvents format means you can use your existing tools and libraries.
Kafka Consumer
from cloudevents.http import from_json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'konnektr-graph-events',
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: from_json(m)
)
for message in consumer:
event = message.value
if event['type'] == 'Konnektr.Graph.Twin.Update':
node_id = event['subject']
changes = event['data']['patch']
# Trigger downstream actions
if any(p['path'] == '/healthScore' for p in changes):
notify_customer_success(node_id, changes)Webhook Handler
// Azure Function triggered by Konnektr webhook
import { app, HttpRequest, HttpResponseInit } from "@azure/functions";
app.http('graphWebhook', {
methods: ['POST'],
async handler(request: HttpRequest): Promise<HttpResponseInit> {
const event = await request.json();
// CloudEvents standard format
const { type, subject, data } = event;
if (type === 'Konnektr.Graph.Relationship.Create') {
// New relationship detected
await updateSearchIndex(subject, data);
await notifySlack(`New connection: ${subject}`);
}
return { status: 200 };
}
});Give Agents Historical Context
AI agents often need to know what happened, not just what is. By routing events to a time-series database, you give your agents long-term memory they can query.
Operational Context
"What changed in the customer's account before they churned?"
Agents can query historical events to understand the sequence of changes that led to an outcome.
Audit Trail
"Who modified this record and when?"
Every change is logged with user attribution, timestamps, and the full JSON Patch for compliance.
Time Travel Queries
"Show me the state of this entity 30 days ago"
Reconstruct any point-in-time view of your graph by replaying events from your history store.
Trend Analysis
"How has this metric changed over the past quarter?"
Aggregate historical events to identify patterns and trends that inform agent decisions.
Real-Time Intelligence
Event streaming enables reactive workflows that respond to graph changes instantly.
Real-Time Alerts
Trigger notifications when specific patterns appear in your graph. Alert on anomalies before they become problems.
Index Synchronization
Keep search indices, caches, and derived data stores in sync with your authoritative graph.
Agent Memory Updates
Push graph changes to your agents so they always have the latest context without polling.
Ready to stream your graph?
Configure event routing in minutes. Connect to Kafka, webhooks, or your time-series database of choice.