import { NgZone } from '@angular/core';
import { EventSourcePolyfill } from 'event-source-polyfill';
import { Observable, Subject } from 'rxjs';

export class SseStream {
  private eventSubject: Subject<MessageEvent>;
  private eventSource: EventSource;

  constructor(private zone: NgZone, private url: string, headers: { [name: string]: string }) {
    this.eventSubject = new Subject();

    this.eventSource = new EventSourcePolyfill(url, {
      headers: { ...headers },
    });

    this.addMessageHandler();
    this.addErrorHandler();
  }

  private addMessageHandler(): void {
    this.eventSource.onmessage = (event) => {
      this.zone.run(() => {
        this.eventSubject.next(event);
      });
    };
  }

  private addErrorHandler(): void {
    this.eventSource.onerror = (event) => {
      this.eventSubject.error(event);
      console.warn(`Connection to EventSource ${this.url} lost, reconnect should happen automatically.`);
    };
  }

  public message$(): Observable<MessageEvent> {
    return this.eventSubject.asObservable();
  }

  public close(): void {
    this.eventSubject.complete();

    this.eventSource.close();
    this.eventSource.onmessage = null;
    this.eventSource.onerror = null;
  }
}
