azure-eventhub-ts
High-throughput event streaming and real-time data ingestion.
- risk
- unknown
- source
- community
- date added
- 2026-02-27
Azure Event Hubs SDK for TypeScript
High-throughput event streaming and real-time data ingestion.
Installation
npm install @azure/event-hubs @azure/identity
For checkpointing with consumer groups:
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
Environment Variables
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net EVENTHUB_NAME=my-eventhub STORAGE_ACCOUNT_NAME=<storage-account> STORAGE_CONTAINER_NAME=checkpoints
Authentication
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs"; import { DefaultAzureCredential } from "@azure/identity"; const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!; const eventHubName = process.env.EVENTHUB_NAME!; const credential = new DefaultAzureCredential(); // Producer const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential); // Consumer const consumer = new EventHubConsumerClient( "$Default", // Consumer group fullyQualifiedNamespace, eventHubName, credential );
Core Workflow
Send Events
const producer = new EventHubProducerClient(namespace, eventHubName, credential); // Create batch and add events const batch = await producer.createBatch(); batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } }); batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } }); await producer.sendBatch(batch); await producer.close();
Send to Specific Partition
// By partition ID const batch = await producer.createBatch({ partitionId: "0" }); // By partition key (consistent hashing) const batch = await producer.createBatch({ partitionKey: "device-123" });
Receive Events (Simple)
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential); const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`); } }, processError: async (err, context) => { console.error(`Error on partition ${context.partitionId}: ${err.message}`); }, }); // Stop after some time setTimeout(async () => { await subscription.close(); await consumer.close(); }, 60000);
Receive with Checkpointing (Production)
import { EventHubConsumerClient } from "@azure/event-hubs"; import { ContainerClient } from "@azure/storage-blob"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"; const containerClient = new ContainerClient( `https://${storageAccount}.blob.core.windows.net/${containerName}`, credential ); const checkpointStore = new BlobCheckpointStore(containerClient); const consumer = new EventHubConsumerClient( "$Default", namespace, eventHubName, credential, checkpointStore ); const subscription = consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Processing: ${JSON.stringify(event.body)}`); } // Checkpoint after processing batch if (events.length > 0) { await context.updateCheckpoint(events[events.length - 1]); } }, processError: async (err, context) => { console.error(`Error: ${err.message}`); }, });
Receive from Specific Position
const subscription = consumer.subscribe({ processEvents: async (events, context) => { /* ... */ }, processError: async (err, context) => { /* ... */ }, }, { startPosition: { // Start from beginning "0": { offset: "@earliest" }, // Start from end (new events only) "1": { offset: "@latest" }, // Start from specific offset "2": { offset: "12345" }, // Start from specific time "3": { enqueuedOn: new Date("2024-01-01") }, }, });
Event Hub Properties
// Get hub info const hubProperties = await producer.getEventHubProperties(); console.log(`Partitions: ${hubProperties.partitionIds}`); // Get partition info const partitionProperties = await producer.getPartitionProperties("0"); console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
Batch Processing Options
const subscription = consumer.subscribe( { processEvents: async (events, context) => { /* ... */ }, processError: async (err, context) => { /* ... */ }, }, { maxBatchSize: 100, // Max events per batch maxWaitTimeInSeconds: 30, // Max wait for batch } );
Key Types
import { EventHubProducerClient, EventHubConsumerClient, EventData, ReceivedEventData, PartitionContext, Subscription, SubscriptionEventHandlers, CreateBatchOptions, EventPosition, } from "@azure/event-hubs"; import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
Event Properties
// Send with properties const batch = await producer.createBatch(); batch.tryAdd({ body: { data: "payload" }, properties: { eventType: "telemetry", deviceId: "sensor-1", }, contentType: "application/json", correlationId: "request-123", }); // Access in receiver consumer.subscribe({ processEvents: async (events, context) => { for (const event of events) { console.log(`Type: ${event.properties?.eventType}`); console.log(`Sequence: ${event.sequenceNumber}`); console.log(`Enqueued: ${event.enqueuedTimeUtc}`); console.log(`Offset: ${event.offset}`); } }, });
Error Handling
consumer.subscribe({ processEvents: async (events, context) => { try { for (const event of events) { await processEvent(event); } await context.updateCheckpoint(events[events.length - 1]); } catch (error) { // Don't checkpoint on error - events will be reprocessed console.error("Processing failed:", error); } }, processError: async (err, context) => { if (err.name === "MessagingError") { // Transient error - SDK will retry console.warn("Transient error:", err.message); } else { // Fatal error console.error("Fatal error:", err); } }, });
Best Practices
- Use checkpointing - Always checkpoint in production for exactly-once processing
- Batch sends - Use
createBatch()for efficient sending - Partition keys - Use partition keys to ensure ordering for related events
- Consumer groups - Use separate consumer groups for different processing pipelines
- Handle errors gracefully - Don't checkpoint on processing failures
- Close clients - Always close producer/consumer when done
- Monitor lag - Track
lastEnqueuedSequenceNumbervs processed sequence
When to Use
This skill is applicable to execute the workflow or actions described in the overview.