Skip to main content

Real-Time Data

Implement real-time features using ByteKit’s WebSocketHelper for WebSocket connections and PollingHelper for polling-based updates.

WebSocket Basics

Simple WebSocket Connection

import { WebSocketHelper } from '@bytekit/helpers';

const ws = new WebSocketHelper('wss://api.example.com/ws', {
  reconnect: true,
  maxReconnectAttempts: 5,
  reconnectDelayMs: 3000,
  heartbeatIntervalMs: 30000,
});

// Connect
await ws.connect();
console.log('WebSocket connected');

// Send a message
ws.send('greeting', { message: 'Hello, server!' });

// Subscribe to messages
const unsubscribe = ws.on('notification', (data) => {
  console.log('Received notification:', data);
});

// Unsubscribe
unsubscribe();

// Close connection
ws.close();

Message Types and Handlers

interface UserStatusMessage {
  userId: string;
  status: 'online' | 'offline' | 'away';
  lastSeen: string;
}

interface ChatMessage {
  id: string;
  from: string;
  to: string;
  message: string;
  timestamp: string;
}

const ws = new WebSocketHelper('wss://chat.example.com');
await ws.connect();

// Subscribe to user status updates
ws.on<UserStatusMessage>('user:status', (data) => {
  console.log(`User ${data.userId} is ${data.status}`);
  updateUserStatus(data.userId, data.status);
});

// Subscribe to chat messages
ws.on<ChatMessage>('chat:message', (data) => {
  console.log(`Message from ${data.from}: ${data.message}`);
  displayMessage(data);
});

// Subscribe to typing indicators
ws.on<{ userId: string; isTyping: boolean }>('user:typing', (data) => {
  showTypingIndicator(data.userId, data.isTyping);
});

Request-Response Pattern

interface ProfileRequest {
  userId: string;
}

interface ProfileResponse {
  id: string;
  name: string;
  email: string;
  avatar: string;
}

// Send request and wait for response
try {
  const profile = await ws.request<ProfileRequest, ProfileResponse>(
    'profile:get',
    { userId: '123' },
    'profile:response' // Optional custom response type
  );
  
  console.log('Profile:', profile);
} catch (error) {
  console.error('Request failed:', error);
}

Error Handling

Connection Errors

const ws = new WebSocketHelper('wss://api.example.com/ws');

// Handle connection errors
ws.onError((error) => {
  console.error('WebSocket error:', error);
  
  if (error.message.includes('authentication')) {
    // Handle auth errors
    redirectToLogin();
  } else {
    // Show error notification
    showNotification('Connection error', 'error');
  }
});

try {
  await ws.connect();
} catch (error) {
  console.error('Failed to connect:', error);
}

Automatic Reconnection

const ws = new WebSocketHelper('wss://api.example.com/ws', {
  reconnect: true,
  maxReconnectAttempts: 10,
  reconnectDelayMs: 2000,
});

let connectionAttempts = 0;

ws.onError((error) => {
  connectionAttempts++;
  console.log(`Connection attempt ${connectionAttempts} failed:`, error);
  
  if (connectionAttempts >= 10) {
    console.error('Max reconnection attempts reached');
    showOfflineMessage();
  }
});

ws.on('connected', () => {
  connectionAttempts = 0;
  console.log('Connected successfully');
  hideOfflineMessage();
});

Real-Time Chat Application

class ChatService {
  private ws: WebSocketHelper;
  private messageHandlers: Set<(msg: ChatMessage) => void> = new Set();
  private statusHandlers: Set<(status: UserStatusMessage) => void> = new Set();

  constructor(wsUrl: string, authToken: string) {
    this.ws = new WebSocketHelper(`${wsUrl}?token=${authToken}`, {
      reconnect: true,
      maxReconnectAttempts: 5,
      heartbeatIntervalMs: 30000,
    });

    this.setupListeners();
  }

  private setupListeners() {
    this.ws.on<ChatMessage>('chat:message', (message) => {
      this.messageHandlers.forEach(handler => handler(message));
    });

    this.ws.on<UserStatusMessage>('user:status', (status) => {
      this.statusHandlers.forEach(handler => handler(status));
    });

    this.ws.onError((error) => {
      console.error('Chat connection error:', error);
    });
  }

  async connect() {
    await this.ws.connect();
    console.log('Chat connected');
  }

  sendMessage(to: string, message: string) {
    this.ws.send('chat:send', {
      to,
      message,
      timestamp: new Date().toISOString(),
    });
  }

  sendTypingIndicator(to: string, isTyping: boolean) {
    this.ws.send('user:typing', { to, isTyping });
  }

  onMessage(handler: (msg: ChatMessage) => void) {
    this.messageHandlers.add(handler);
    return () => this.messageHandlers.delete(handler);
  }

  onStatusChange(handler: (status: UserStatusMessage) => void) {
    this.statusHandlers.add(handler);
    return () => this.statusHandlers.delete(handler);
  }

  disconnect() {
    this.ws.close();
    console.log('Chat disconnected');
  }

  isConnected(): boolean {
    return this.ws.isConnected();
  }
}

// Usage
const chat = new ChatService('wss://chat.example.com/ws', authToken);
await chat.connect();

// Listen for messages
const unsubscribe = chat.onMessage((message) => {
  console.log('New message:', message);
  displayMessage(message);
});

// Send message
chat.sendMessage('user-456', 'Hello!');

// Send typing indicator
chat.sendTypingIndicator('user-456', true);

Polling for Updates

Basic Polling

import { PollingHelper } from '@bytekit/helpers';
import { ApiClient } from '@bytekit/core';

const apiClient = new ApiClient({
  baseUrl: 'https://api.example.com',
});

// Poll for order status
const checkOrderStatus = async () => {
  return apiClient.get<{ status: string }>('/orders/123/status');
};

const result = await PollingHelper.poll(
  checkOrderStatus,
  {
    interval: 2000,          // Check every 2 seconds
    maxAttempts: 30,        // Maximum 30 attempts
    stopCondition: (result) => {
      return result.status === 'completed' || result.status === 'failed';
    },
    onAttempt: (attempt, result) => {
      console.log(`Attempt ${attempt}:`, result?.status);
    },
  }
);

if (result.success) {
  console.log('Order completed:', result.result);
} else {
  console.error('Polling failed:', result.error);
}

Polling with Exponential Backoff

const poller = new PollingHelper(
  () => apiClient.get('/api/long-running-task/status'),
  {
    interval: 1000,              // Start with 1 second
    maxAttempts: 20,
    backoffMultiplier: 2,        // Double interval each time
    maxBackoffInterval: 30000,   // Max 30 seconds between checks
    jitter: true,                // Add random jitter
    stopCondition: (result) => result.status === 'complete',
    onAttempt: (attempt, result) => {
      console.log(`Checking (attempt ${attempt}):`, result);
    },
  }
);

const result = await poller.start();

if (result.success) {
  console.log('Task completed in', result.duration, 'ms');
  console.log('Metrics:', result.metrics);
}

Abortable Polling

const poller = new PollingHelper(
  () => apiClient.get('/api/processing'),
  {
    interval: 2000,
    maxAttempts: 100,
    stopCondition: (result) => result.complete,
  }
);

// Start polling with abort capability
const resultPromise = poller.startWithAbort();

// Abort after 10 seconds
setTimeout(() => {
  poller.abort();
  console.log('Polling aborted by user');
}, 10000);

try {
  const result = await resultPromise;
  console.log('Result:', result);
} catch (error) {
  console.error('Polling failed:', error);
}

Combining WebSocket and Polling

Fallback Strategy

class RealTimeDataService {
  private ws: WebSocketHelper | null = null;
  private poller: PollingHelper | null = null;
  private apiClient: ApiClient;
  private useWebSocket: boolean = true;

  constructor(baseUrl: string, wsUrl: string) {
    this.apiClient = new ApiClient({ baseUrl });
    
    try {
      this.ws = new WebSocketHelper(wsUrl);
      this.setupWebSocket();
    } catch (error) {
      console.warn('WebSocket not available, falling back to polling');
      this.useWebSocket = false;
    }
  }

  private setupWebSocket() {
    if (!this.ws) return;

    this.ws.onError((error) => {
      console.error('WebSocket error, switching to polling:', error);
      this.useWebSocket = false;
      this.startPolling();
    });
  }

  async subscribe(channel: string, handler: (data: any) => void) {
    if (this.useWebSocket && this.ws) {
      await this.ws.connect();
      return this.ws.on(channel, handler);
    } else {
      return this.startPolling(channel, handler);
    }
  }

  private startPolling(channel?: string, handler?: (data: any) => void) {
    this.poller = new PollingHelper(
      () => this.apiClient.get(`/updates/${channel}`),
      {
        interval: 5000,
        maxAttempts: Infinity,
        onAttempt: (_attempt, result) => {
          if (result && handler) {
            handler(result);
          }
        },
      }
    );

    this.poller.start();

    return () => this.poller?.abort();
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
    if (this.poller) {
      this.poller.abort();
    }
  }
}

Live Dashboard Example

interface DashboardData {
  users: number;
  revenue: number;
  orders: number;
  activeUsers: number;
}

class LiveDashboard {
  private ws: WebSocketHelper;
  private updateHandlers: Set<(data: DashboardData) => void> = new Set();

  constructor(wsUrl: string, authToken: string) {
    this.ws = new WebSocketHelper(`${wsUrl}?token=${authToken}`, {
      reconnect: true,
      heartbeatIntervalMs: 30000,
    });

    this.setupListeners();
  }

  private setupListeners() {
    // Real-time metric updates
    this.ws.on<DashboardData>('metrics:update', (data) => {
      this.updateHandlers.forEach(handler => handler(data));
    });

    // Connection status
    this.ws.on('connected', () => {
      console.log('Dashboard connected - receiving live updates');
    });

    this.ws.onError((error) => {
      console.error('Dashboard connection error:', error);
    });
  }

  async start() {
    await this.ws.connect();
    
    // Request initial data
    const initialData = await this.ws.request<void, DashboardData>(
      'metrics:get',
      undefined,
      'metrics:response'
    );
    
    this.updateHandlers.forEach(handler => handler(initialData));
  }

  onUpdate(handler: (data: DashboardData) => void) {
    this.updateHandlers.add(handler);
    return () => this.updateHandlers.delete(handler);
  }

  stop() {
    this.ws.close();
  }
}

// Usage
const dashboard = new LiveDashboard('wss://api.example.com/ws', authToken);

// Subscribe to updates
dashboard.onUpdate((data) => {
  console.log('Dashboard updated:', data);
  updateUI(data);
});

// Start receiving updates
await dashboard.start();

Server-Sent Events Alternative

class SSEClient {
  private eventSource: EventSource | null = null;
  private handlers: Map<string, Set<(data: any) => void>> = new Map();

  constructor(private url: string) {}

  connect() {
    this.eventSource = new EventSource(this.url);

    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.emit('message', data);
    };

    this.eventSource.onerror = (error) => {
      console.error('SSE error:', error);
      this.emit('error', error);
    };
  }

  on(event: string, handler: (data: any) => void) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, new Set());
      
      if (this.eventSource) {
        this.eventSource.addEventListener(event, (e: any) => {
          const data = JSON.parse(e.data);
          this.emit(event, data);
        });
      }
    }

    this.handlers.get(event)!.add(handler);

    return () => {
      this.handlers.get(event)?.delete(handler);
    };
  }

  private emit(event: string, data: any) {
    this.handlers.get(event)?.forEach(handler => handler(data));
  }

  close() {
    this.eventSource?.close();
    this.handlers.clear();
  }
}

// Usage
const sse = new SSEClient('https://api.example.com/events');
sse.connect();

sse.on('update', (data) => {
  console.log('Received update:', data);
});

See Also