import { HttpTransportType, HubConnection, HubConnectionBuilder } from "@microsoft/signalr";
import { Observable, of } from "rxjs";

export abstract class SignalrServiceWrapper {
  private readonly route: string;

  public constructor(route: string) {
    this.route = route;
  }

  protected _connection: HubConnection | null = null;

  public get connection(): HubConnection {
    this.assertConnection();
    return this._connection!;
  }

  /**
   * Establishes a connection to the real-time status hub.
   */

  public establishConnection(msalToken: string): Observable<void> {
    if (this._connection) {
      return of();
    }

    this._connection = new HubConnectionBuilder()
      .withUrl(this.route, {
        skipNegotiation: true,
        transport: HttpTransportType.WebSockets,
        accessTokenFactory(): string | Promise<string> {
          return msalToken;
        }
      })
      .withAutomaticReconnect()
      .build();

    return new Observable<void>(observer => {
      this._connection!.start()
        .then(() => {
          observer.next();
          observer.complete();
        })
        .catch(error => observer.error(error));
    });
  }

  /**
   * Listens for events from the SignalR hub.
   * Keeps the subscription open until explicitly unsubscribed.
   */
  public listen<T>(methodName: string): Observable<T> {
    this.assertConnection();

    return new Observable<T>(observer => {
      const handler = (data: T) => observer.next(data);

      // Register the handler for the specified method
      this._connection!.on(methodName, handler);

      // Handle unsubscribe to remove listener
      return () => {
        this._connection!.off(methodName, handler);
      };
    });
  }

  /**
 * Removes a specific listener from the SignalR hub.
 */
  public removeListener(methodName: string): void {
    this.assertConnection();
    this._connection!.off(methodName);
  }


  /**
   * Closes the connection to the real-time status hub.
   */
  public closeConnection(): Observable<void> {
    this.assertConnection();

    return new Observable<void>(observer => {
      this._connection!.stop()
        .then(() => {
          observer.next();
          observer.complete();

          this._connection = null;
        })
        .catch(error => observer.error(error));
    });
  }

  protected assertConnection(): void {
    if (!this._connection) {
      throw new Error('Connection to real-time status hub has not been established.');
    }
  }
}
