Overview
The Streaming Helper provides utilities for working with streaming data, including JSON line streaming, Server-Sent Events (SSE), and streaming file downloads with progress tracking.Interfaces
StreamOptions
Configuration options for streaming operations.Copy
interface StreamOptions {
timeout?: number; // Default: 30000ms
headers?: Record<string, string>;
onChunk?: (chunk: string) => void; // Called for each chunk
onError?: (error: Error) => void;
onComplete?: () => void;
}
StreamResponse
Response object from streaming operations.Copy
interface StreamResponse<T> {
data: T[]; // Accumulated data items
complete: boolean; // Whether stream completed successfully
error?: Error; // Error if stream failed
}
Class: StreamingHelper
streamJsonLines
Stream newline-delimited JSON (NDJSON) from an endpoint.Copy
static async streamJsonLines<T>(
endpoint: string,
options?: StreamOptions
): Promise<StreamResponse<T>>
endpoint- URL to stream fromoptions- Streaming options (optional)
Copy
import { StreamingHelper } from '@bytekit/utils';
interface LogEntry {
timestamp: string;
level: string;
message: string;
}
// Stream log entries
const response = await StreamingHelper.streamJsonLines<LogEntry>(
'https://api.example.com/logs/stream',
{
onChunk: (chunk) => {
const entry = JSON.parse(chunk);
console.log(`[${entry.level}] ${entry.message}`);
},
onError: (error) => {
console.error('Stream error:', error);
},
onComplete: () => {
console.log('Stream completed');
}
}
);
if (response.complete) {
console.log(`Received ${response.data.length} log entries`);
}
streamSSE
Stream Server-Sent Events with automatic reconnection.Copy
static streamSSE<T>(
endpoint: string,
options?: StreamOptions & { eventType?: string }
): {
subscribe: (callback: (data: T) => void) => () => void;
close: () => void;
}
endpoint- SSE endpoint URLoptions- Streaming options with optional event type (default: “message”)
Copy
import { StreamingHelper } from '@bytekit/utils';
interface NotificationData {
id: string;
title: string;
message: string;
timestamp: number;
}
// Create SSE connection
const stream = StreamingHelper.streamSSE<NotificationData>(
'https://api.example.com/notifications/stream',
{
eventType: 'notification',
onError: (error) => {
console.error('SSE error:', error);
},
onComplete: () => {
console.log('SSE connection closed');
}
}
);
// Subscribe to events
const unsubscribe = stream.subscribe((notification) => {
console.log('New notification:', notification.title);
showNotification(notification);
});
// Later: unsubscribe or close
// unsubscribe(); // Just this subscriber
// stream.close(); // Close entire connection
downloadStream
Download a file as a stream with progress tracking.Copy
static async downloadStream(
endpoint: string,
options?: StreamOptions & {
onProgress?: (progress: number) => void;
}
): Promise<Blob>
endpoint- File download URLoptions- Streaming options with progress callback
Copy
import { StreamingHelper } from '@bytekit/utils';
// Download file with progress
const blob = await StreamingHelper.downloadStream(
'https://api.example.com/files/large-file.zip',
{
onProgress: (percentage) => {
console.log(`Download progress: ${percentage}%`);
updateProgressBar(percentage);
},
onError: (error) => {
console.error('Download failed:', error);
},
onComplete: () => {
console.log('Download completed');
},
timeout: 60000 // 1 minute timeout
}
);
// Save or process the blob
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'downloaded-file.zip';
a.click();
URL.revokeObjectURL(url);
Complete Examples
Real-Time Dashboard with SSE
Copy
import { StreamingHelper } from '@bytekit/utils';
interface MetricUpdate {
metric: string;
value: number;
timestamp: number;
}
class RealtimeDashboard {
private stream: ReturnType<typeof StreamingHelper.streamSSE<MetricUpdate>>;
private metrics: Map<string, number[]> = new Map();
connect() {
this.stream = StreamingHelper.streamSSE<MetricUpdate>(
'https://api.example.com/metrics/stream',
{
onError: (error) => {
console.error('Connection error:', error);
this.showConnectionError();
},
onComplete: () => {
console.log('Stream closed');
this.showConnectionClosed();
}
}
);
this.stream.subscribe((update) => {
this.updateMetric(update);
});
}
private updateMetric(update: MetricUpdate) {
// Store metric history
if (!this.metrics.has(update.metric)) {
this.metrics.set(update.metric, []);
}
const history = this.metrics.get(update.metric)!;
history.push(update.value);
// Keep only last 100 values
if (history.length > 100) {
history.shift();
}
// Update UI
this.renderMetric(update.metric, update.value, history);
}
private renderMetric(name: string, value: number, history: number[]) {
const element = document.getElementById(`metric-${name}`);
if (element) {
element.querySelector('.value')!.textContent = value.toString();
this.updateChart(element, history);
}
}
private updateChart(element: HTMLElement, history: number[]) {
// Update chart visualization
const chart = element.querySelector('canvas');
// ... chart update logic
}
private showConnectionError() {
document.getElementById('status')!.textContent = 'Connection error';
}
private showConnectionClosed() {
document.getElementById('status')!.textContent = 'Connection closed';
}
disconnect() {
this.stream?.close();
}
}
// Usage
const dashboard = new RealtimeDashboard();
dashboard.connect();
// Cleanup on page unload
window.addEventListener('beforeunload', () => {
dashboard.disconnect();
});
Log Streaming Viewer
Copy
import { StreamingHelper } from '@bytekit/utils';
interface LogEntry {
timestamp: string;
level: 'info' | 'warn' | 'error';
service: string;
message: string;
}
class LogViewer {
private container: HTMLElement;
private filters = {
level: null as string | null,
service: null as string | null
};
constructor(containerId: string) {
this.container = document.getElementById(containerId)!;
}
async streamLogs(endpoint: string) {
const response = await StreamingHelper.streamJsonLines<LogEntry>(
endpoint,
{
onChunk: (chunk) => {
const entry = JSON.parse(chunk);
this.addLogEntry(entry);
},
onError: (error) => {
this.showError(error.message);
},
onComplete: () => {
this.showComplete();
},
timeout: 0 // No timeout for continuous streaming
}
);
if (!response.complete) {
console.error('Stream failed:', response.error);
}
}
private addLogEntry(entry: LogEntry) {
// Apply filters
if (this.filters.level && entry.level !== this.filters.level) {
return;
}
if (this.filters.service && entry.service !== this.filters.service) {
return;
}
// Create log element
const logEl = document.createElement('div');
logEl.className = `log-entry log-${entry.level}`;
logEl.innerHTML = `
<span class="timestamp">${entry.timestamp}</span>
<span class="level">${entry.level.toUpperCase()}</span>
<span class="service">${entry.service}</span>
<span class="message">${this.escapeHtml(entry.message)}</span>
`;
// Add to container
this.container.appendChild(logEl);
// Auto-scroll to bottom
this.container.scrollTop = this.container.scrollHeight;
// Limit entries (keep last 1000)
const entries = this.container.children;
if (entries.length > 1000) {
entries[0].remove();
}
}
setFilter(type: 'level' | 'service', value: string | null) {
this.filters[type] = value;
this.clearLogs();
}
clearLogs() {
this.container.innerHTML = '';
}
private escapeHtml(text: string): string {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
private showError(message: string) {
const errorEl = document.createElement('div');
errorEl.className = 'log-error';
errorEl.textContent = `Error: ${message}`;
this.container.appendChild(errorEl);
}
private showComplete() {
const completeEl = document.createElement('div');
completeEl.className = 'log-complete';
completeEl.textContent = 'Stream completed';
this.container.appendChild(completeEl);
}
}
// Usage
const viewer = new LogViewer('log-container');
viewer.streamLogs('https://api.example.com/logs/stream');
// Add filter controls
document.getElementById('level-filter')!.addEventListener('change', (e) => {
const select = e.target as HTMLSelectElement;
viewer.setFilter('level', select.value || null);
});
File Download Manager
Copy
import { StreamingHelper } from '@bytekit/utils';
interface Download {
id: string;
url: string;
filename: string;
progress: number;
status: 'pending' | 'downloading' | 'completed' | 'failed';
}
class DownloadManager {
private downloads: Map<string, Download> = new Map();
private progressElements: Map<string, HTMLElement> = new Map();
async downloadFile(url: string, filename: string) {
const id = this.generateId();
const download: Download = {
id,
url,
filename,
progress: 0,
status: 'pending'
};
this.downloads.set(id, download);
this.createProgressElement(download);
download.status = 'downloading';
this.updateProgressElement(download);
try {
const blob = await StreamingHelper.downloadStream(url, {
onProgress: (percentage) => {
download.progress = percentage;
this.updateProgressElement(download);
},
timeout: 120000 // 2 minutes
});
download.status = 'completed';
this.updateProgressElement(download);
// Save file
this.saveBlob(blob, filename);
} catch (error) {
download.status = 'failed';
this.updateProgressElement(download);
console.error('Download failed:', error);
}
}
private createProgressElement(download: Download) {
const el = document.createElement('div');
el.className = 'download-item';
el.id = `download-${download.id}`;
el.innerHTML = `
<div class="filename">${download.filename}</div>
<progress max="100" value="0"></progress>
<div class="status">Pending...</div>
`;
document.getElementById('downloads')!.appendChild(el);
this.progressElements.set(download.id, el);
}
private updateProgressElement(download: Download) {
const el = this.progressElements.get(download.id);
if (!el) return;
const progress = el.querySelector('progress')!;
const status = el.querySelector('.status')!;
progress.value = download.progress;
switch (download.status) {
case 'downloading':
status.textContent = `Downloading... ${download.progress}%`;
break;
case 'completed':
status.textContent = 'Completed';
el.classList.add('completed');
break;
case 'failed':
status.textContent = 'Failed';
el.classList.add('failed');
break;
}
}
private saveBlob(blob: Blob, filename: string) {
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = filename;
a.click();
URL.revokeObjectURL(url);
}
private generateId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
// Usage
const manager = new DownloadManager();
document.getElementById('download-btn')!.addEventListener('click', () => {
manager.downloadFile(
'https://api.example.com/files/large-report.pdf',
'report.pdf'
);
});
Best Practices
- Always implement error handling for stream failures
- Provide progress feedback for long-running streams
- Consider implementing reconnection logic for SSE
- Set appropriate timeouts based on expected stream duration
- Clean up resources (close streams) when no longer needed
- Handle backpressure for high-throughput streams
- Validate and sanitize streamed data before processing