Skip to main content

Overview

The Streaming Helper provides utilities for working with streaming data, including JSON line streaming, Server-Sent Events (SSE), and streaming file downloads with progress tracking.

Interfaces

StreamOptions

Configuration options for streaming operations.
interface StreamOptions {
  timeout?: number;                    // Default: 30000ms
  headers?: Record<string, string>;
  onChunk?: (chunk: string) => void;  // Called for each chunk
  onError?: (error: Error) => void;
  onComplete?: () => void;
}

StreamResponse

Response object from streaming operations.
interface StreamResponse<T> {
  data: T[];         // Accumulated data items
  complete: boolean; // Whether stream completed successfully
  error?: Error;     // Error if stream failed
}

Class: StreamingHelper

streamJsonLines

Stream newline-delimited JSON (NDJSON) from an endpoint.
static async streamJsonLines<T>(
  endpoint: string,
  options?: StreamOptions
): Promise<StreamResponse<T>>
Parameters:
  • endpoint - URL to stream from
  • options - Streaming options (optional)
Returns: Promise resolving to stream response with accumulated data Example:
import { StreamingHelper } from '@bytekit/utils';

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
}

// Stream log entries
const response = await StreamingHelper.streamJsonLines<LogEntry>(
  'https://api.example.com/logs/stream',
  {
    onChunk: (chunk) => {
      const entry = JSON.parse(chunk);
      console.log(`[${entry.level}] ${entry.message}`);
    },
    onError: (error) => {
      console.error('Stream error:', error);
    },
    onComplete: () => {
      console.log('Stream completed');
    }
  }
);

if (response.complete) {
  console.log(`Received ${response.data.length} log entries`);
}

streamSSE

Stream Server-Sent Events with automatic reconnection.
static streamSSE<T>(
  endpoint: string,
  options?: StreamOptions & { eventType?: string }
): {
  subscribe: (callback: (data: T) => void) => () => void;
  close: () => void;
}
Parameters:
  • endpoint - SSE endpoint URL
  • options - Streaming options with optional event type (default: “message”)
Returns: Object with subscribe and close methods Example:
import { StreamingHelper } from '@bytekit/utils';

interface NotificationData {
  id: string;
  title: string;
  message: string;
  timestamp: number;
}

// Create SSE connection
const stream = StreamingHelper.streamSSE<NotificationData>(
  'https://api.example.com/notifications/stream',
  {
    eventType: 'notification',
    onError: (error) => {
      console.error('SSE error:', error);
    },
    onComplete: () => {
      console.log('SSE connection closed');
    }
  }
);

// Subscribe to events
const unsubscribe = stream.subscribe((notification) => {
  console.log('New notification:', notification.title);
  showNotification(notification);
});

// Later: unsubscribe or close
// unsubscribe(); // Just this subscriber
// stream.close(); // Close entire connection

downloadStream

Download a file as a stream with progress tracking.
static async downloadStream(
  endpoint: string,
  options?: StreamOptions & {
    onProgress?: (progress: number) => void;
  }
): Promise<Blob>
Parameters:
  • endpoint - File download URL
  • options - Streaming options with progress callback
Returns: Promise resolving to downloaded file as Blob Example:
import { StreamingHelper } from '@bytekit/utils';

// Download file with progress
const blob = await StreamingHelper.downloadStream(
  'https://api.example.com/files/large-file.zip',
  {
    onProgress: (percentage) => {
      console.log(`Download progress: ${percentage}%`);
      updateProgressBar(percentage);
    },
    onError: (error) => {
      console.error('Download failed:', error);
    },
    onComplete: () => {
      console.log('Download completed');
    },
    timeout: 60000 // 1 minute timeout
  }
);

// Save or process the blob
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'downloaded-file.zip';
a.click();
URL.revokeObjectURL(url);

Complete Examples

Real-Time Dashboard with SSE

import { StreamingHelper } from '@bytekit/utils';

interface MetricUpdate {
  metric: string;
  value: number;
  timestamp: number;
}

class RealtimeDashboard {
  private stream: ReturnType<typeof StreamingHelper.streamSSE<MetricUpdate>>;
  private metrics: Map<string, number[]> = new Map();
  
  connect() {
    this.stream = StreamingHelper.streamSSE<MetricUpdate>(
      'https://api.example.com/metrics/stream',
      {
        onError: (error) => {
          console.error('Connection error:', error);
          this.showConnectionError();
        },
        onComplete: () => {
          console.log('Stream closed');
          this.showConnectionClosed();
        }
      }
    );
    
    this.stream.subscribe((update) => {
      this.updateMetric(update);
    });
  }
  
  private updateMetric(update: MetricUpdate) {
    // Store metric history
    if (!this.metrics.has(update.metric)) {
      this.metrics.set(update.metric, []);
    }
    
    const history = this.metrics.get(update.metric)!;
    history.push(update.value);
    
    // Keep only last 100 values
    if (history.length > 100) {
      history.shift();
    }
    
    // Update UI
    this.renderMetric(update.metric, update.value, history);
  }
  
  private renderMetric(name: string, value: number, history: number[]) {
    const element = document.getElementById(`metric-${name}`);
    if (element) {
      element.querySelector('.value')!.textContent = value.toString();
      this.updateChart(element, history);
    }
  }
  
  private updateChart(element: HTMLElement, history: number[]) {
    // Update chart visualization
    const chart = element.querySelector('canvas');
    // ... chart update logic
  }
  
  private showConnectionError() {
    document.getElementById('status')!.textContent = 'Connection error';
  }
  
  private showConnectionClosed() {
    document.getElementById('status')!.textContent = 'Connection closed';
  }
  
  disconnect() {
    this.stream?.close();
  }
}

// Usage
const dashboard = new RealtimeDashboard();
dashboard.connect();

// Cleanup on page unload
window.addEventListener('beforeunload', () => {
  dashboard.disconnect();
});

Log Streaming Viewer

import { StreamingHelper } from '@bytekit/utils';

interface LogEntry {
  timestamp: string;
  level: 'info' | 'warn' | 'error';
  service: string;
  message: string;
}

class LogViewer {
  private container: HTMLElement;
  private filters = {
    level: null as string | null,
    service: null as string | null
  };
  
  constructor(containerId: string) {
    this.container = document.getElementById(containerId)!;
  }
  
  async streamLogs(endpoint: string) {
    const response = await StreamingHelper.streamJsonLines<LogEntry>(
      endpoint,
      {
        onChunk: (chunk) => {
          const entry = JSON.parse(chunk);
          this.addLogEntry(entry);
        },
        onError: (error) => {
          this.showError(error.message);
        },
        onComplete: () => {
          this.showComplete();
        },
        timeout: 0 // No timeout for continuous streaming
      }
    );
    
    if (!response.complete) {
      console.error('Stream failed:', response.error);
    }
  }
  
  private addLogEntry(entry: LogEntry) {
    // Apply filters
    if (this.filters.level && entry.level !== this.filters.level) {
      return;
    }
    if (this.filters.service && entry.service !== this.filters.service) {
      return;
    }
    
    // Create log element
    const logEl = document.createElement('div');
    logEl.className = `log-entry log-${entry.level}`;
    logEl.innerHTML = `
      <span class="timestamp">${entry.timestamp}</span>
      <span class="level">${entry.level.toUpperCase()}</span>
      <span class="service">${entry.service}</span>
      <span class="message">${this.escapeHtml(entry.message)}</span>
    `;
    
    // Add to container
    this.container.appendChild(logEl);
    
    // Auto-scroll to bottom
    this.container.scrollTop = this.container.scrollHeight;
    
    // Limit entries (keep last 1000)
    const entries = this.container.children;
    if (entries.length > 1000) {
      entries[0].remove();
    }
  }
  
  setFilter(type: 'level' | 'service', value: string | null) {
    this.filters[type] = value;
    this.clearLogs();
  }
  
  clearLogs() {
    this.container.innerHTML = '';
  }
  
  private escapeHtml(text: string): string {
    const div = document.createElement('div');
    div.textContent = text;
    return div.innerHTML;
  }
  
  private showError(message: string) {
    const errorEl = document.createElement('div');
    errorEl.className = 'log-error';
    errorEl.textContent = `Error: ${message}`;
    this.container.appendChild(errorEl);
  }
  
  private showComplete() {
    const completeEl = document.createElement('div');
    completeEl.className = 'log-complete';
    completeEl.textContent = 'Stream completed';
    this.container.appendChild(completeEl);
  }
}

// Usage
const viewer = new LogViewer('log-container');
viewer.streamLogs('https://api.example.com/logs/stream');

// Add filter controls
document.getElementById('level-filter')!.addEventListener('change', (e) => {
  const select = e.target as HTMLSelectElement;
  viewer.setFilter('level', select.value || null);
});

File Download Manager

import { StreamingHelper } from '@bytekit/utils';

interface Download {
  id: string;
  url: string;
  filename: string;
  progress: number;
  status: 'pending' | 'downloading' | 'completed' | 'failed';
}

class DownloadManager {
  private downloads: Map<string, Download> = new Map();
  private progressElements: Map<string, HTMLElement> = new Map();
  
  async downloadFile(url: string, filename: string) {
    const id = this.generateId();
    const download: Download = {
      id,
      url,
      filename,
      progress: 0,
      status: 'pending'
    };
    
    this.downloads.set(id, download);
    this.createProgressElement(download);
    
    download.status = 'downloading';
    this.updateProgressElement(download);
    
    try {
      const blob = await StreamingHelper.downloadStream(url, {
        onProgress: (percentage) => {
          download.progress = percentage;
          this.updateProgressElement(download);
        },
        timeout: 120000 // 2 minutes
      });
      
      download.status = 'completed';
      this.updateProgressElement(download);
      
      // Save file
      this.saveBlob(blob, filename);
      
    } catch (error) {
      download.status = 'failed';
      this.updateProgressElement(download);
      console.error('Download failed:', error);
    }
  }
  
  private createProgressElement(download: Download) {
    const el = document.createElement('div');
    el.className = 'download-item';
    el.id = `download-${download.id}`;
    el.innerHTML = `
      <div class="filename">${download.filename}</div>
      <progress max="100" value="0"></progress>
      <div class="status">Pending...</div>
    `;
    
    document.getElementById('downloads')!.appendChild(el);
    this.progressElements.set(download.id, el);
  }
  
  private updateProgressElement(download: Download) {
    const el = this.progressElements.get(download.id);
    if (!el) return;
    
    const progress = el.querySelector('progress')!;
    const status = el.querySelector('.status')!;
    
    progress.value = download.progress;
    
    switch (download.status) {
      case 'downloading':
        status.textContent = `Downloading... ${download.progress}%`;
        break;
      case 'completed':
        status.textContent = 'Completed';
        el.classList.add('completed');
        break;
      case 'failed':
        status.textContent = 'Failed';
        el.classList.add('failed');
        break;
    }
  }
  
  private saveBlob(blob: Blob, filename: string) {
    const url = URL.createObjectURL(blob);
    const a = document.createElement('a');
    a.href = url;
    a.download = filename;
    a.click();
    URL.revokeObjectURL(url);
  }
  
  private generateId(): string {
    return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }
}

// Usage
const manager = new DownloadManager();

document.getElementById('download-btn')!.addEventListener('click', () => {
  manager.downloadFile(
    'https://api.example.com/files/large-report.pdf',
    'report.pdf'
  );
});

Best Practices

  1. Always implement error handling for stream failures
  2. Provide progress feedback for long-running streams
  3. Consider implementing reconnection logic for SSE
  4. Set appropriate timeouts based on expected stream duration
  5. Clean up resources (close streams) when no longer needed
  6. Handle backpressure for high-throughput streams
  7. Validate and sanitize streamed data before processing