import {HttpTransportType, HubConnection, HubConnectionBuilder} from "@microsoft/signalr";
import {BehaviorSubject, distinctUntilChanged, filter, Observable, of, Subject, takeUntil, tap} from "rxjs";
import {ConnectionStatusEnum} from "./enums/connection-status.enum";
import {DestroyRef, inject} from "@angular/core";
import {takeUntilDestroyed} from "@angular/core/rxjs-interop";

export abstract class SignalrServiceWrapper {
  private readonly destroyRef = inject(DestroyRef);
  private readonly route: string;
  private readonly _connectionStatus$ = new BehaviorSubject<ConnectionStatusEnum>(ConnectionStatusEnum.idle);
  /**
   * Emits a value any time the status of the service connection changes.
   * @protected
   */
  public connectionStatus$ = this._connectionStatus$.asObservable();
  private readonly _connectionLost$ = new Subject<void>();
  private readonly _invocationQueue$ = new BehaviorSubject<{
    methodName: string,
    completion$: Subject<void>,
    args: any[]
  }[]>([]);

  protected constructor(route: string) {
    this.route = route;
    this.registerDequeueListener();
  }

  protected _connection: HubConnection | null = null;

  /**
   * Provides a reference to the underlying HubConnection.
   *
   * @deprecated This getter should not be used directly; instead, operations should be executed through the wrapper's methods.
   */
  public get connection(): HubConnection {
    this.assertConnection();
    return this._connection!;
  }

  /**
   * Establishes a connection to the real-time status hub.
   */
  public establishConnection(msalToken: string): Observable<void> {
    if (this._connection) {
      return of();
    }

    this._connection = new HubConnectionBuilder()
      .withUrl(this.route, {
        skipNegotiation: true,
        transport: HttpTransportType.WebSockets,
        accessTokenFactory(): string | Promise<string> {
          return msalToken;
        }
      })
      .withAutomaticReconnect()
      .build();

    return new Observable<void>(observer => {
      this._connection!.start()
        .then(() => {
          this._establishConnectionStatusListener(true);

          observer.next();
          observer.complete();
        })
        .catch(error => {
          this._establishConnectionStatusListener(false);

          observer.error(error);
          observer.complete();
        });
    });
  }

  /**
   * Listens for events from the SignalR hub.
   * Keeps the subscription open until explicitly unsubscribed.
   */
  public listen<T>(methodName: string): Observable<T> {
    this.assertConnection();

    return new Observable<T>(observer => {
      const handler = (data: T) => observer.next(data);

      // Register the handler for the specified method
      this._connection!.on(methodName, handler);

      // Handle unsubscribe to remove listener
      return () => {
        this._connection!.off(methodName, handler);
      };
    });
  }

  public listenMultiple<TArgs extends any[]>(methodName: string): Observable<TArgs> {
    this.assertConnection();

    return new Observable<TArgs>(observer => {
      const handler = (...args: TArgs) => observer.next(args);

      // Register the handler for the specified method
      this._connection!.on(methodName, handler);

      // Handle unsubscribe to remove listener
      return () => {
        this._connection!.off(methodName, handler);
      };
    });
  }

  /**
   * Removes a specific listener from the SignalR hub.
   */
  public removeListener(methodName: string): void {
    this.assertConnection();
    this._connection!.off(methodName);
  }

  /**
   * Closes the connection to the real-time status hub.
   */
  public closeConnection(): Observable<void> {
    this.assertConnection();

    return new Observable<void>(observer => {
      this._connection!.stop()
        .then(() => {
          observer.next();
          observer.complete();

          this._connection = null;
        })
        .catch(error => observer.error(error));
    });
  }

  public invoke(methodName: string, ...args: any[]): Observable<void> {
    const completion$ = new Subject<void>();
    this._invocationQueue$.next([
      ...this._invocationQueue$.value,
      {
        methodName,
        completion$,
        args
      }
    ]);

    return completion$.asObservable();
  }

  protected assertConnection(): void {
    if (!this._connection) {
      throw new Error('Connection to real-time status hub has not been established.');
    }
  }

  private registerDequeueListener() {
    this._connectionStatus$.pipe(
      distinctUntilChanged(),
      tap((x) => {
        if (x !== ConnectionStatusEnum.connected) {
          this._connectionLost$.next()
        }
      }),
      filter(x => x === ConnectionStatusEnum.connected),
      tap(() => {
        this._invocationQueue$.pipe(
          takeUntil(this._connectionLost$),
          takeUntilDestroyed(this.destroyRef)
        ).subscribe((queue) => {
          const workItem = queue.shift();

          if (workItem) {
            this._connection!.invoke(workItem.methodName, ...workItem.args)
              .then(() => {
                workItem.completion$.next();
              })
              .catch((e) => {
                workItem.completion$.error(e);
              })
              .finally(() => {
                workItem.completion$.complete();
              });
          }
        });
      })
    ).pipe(
      takeUntilDestroyed(this.destroyRef)
    ).subscribe();
  }

  private _establishConnectionStatusListener(initialStatus: boolean): void {
    this.assertConnection();

    this._connectionStatus$.next(initialStatus ? ConnectionStatusEnum.connected : ConnectionStatusEnum.disconnected);

    this._connection!.onclose(() => {
      this._connectionStatus$.next(ConnectionStatusEnum.disconnected);
    });

    this._connection!.onreconnected(() => {
      this._connectionStatus$.next(ConnectionStatusEnum.connected);
    });

    this._connection!.onreconnecting(() => {
      this._connectionStatus$.next(ConnectionStatusEnum.reconnecting);
    });
  }
}
