import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { environment as env } from '../environments/environment';
import {
  Observable,
  timer,
  Subject,
  EMPTY,
  interval,
  Subscription,
  ReplaySubject,
} from 'rxjs';
import {
  retryWhen,
  tap,
  delayWhen,
  switchAll,
  catchError,
  takeWhile,
  map,
  filter,
  shareReplay,
} from 'rxjs/operators';
import { AuthService } from '@auth0/auth0-angular';
import { NotificationService } from './notification.service';
import { UserService } from './user.service';
import { logIt } from './utils';

export const WS_ENDPOINT = env.wsEndpoint;
export const RECONNECT_INTERVAL = env.reconnectInterval;

export enum CMD_MESSAGE_TYPES {
  DISCONNECT = 'disconnect',
}

export interface CmdMessage {
  cmd: string;
}

export interface PingMessage {
  client_ts: number;
}

export interface PongMessage {
  server_ts: number;
  client_ts: number;
}

export interface WsMessage {
  status: boolean;
  action: string;
  cmd?: string;
  data: any;
}

export enum MESSAGE_TYPES {
  CHAT_MESSAGE = 'chatMessage',
  CMD = 'cmd',
  HELLO = 'hello',
  WELCOME = 'welcome',
  PING = 'ping',
  PONG = 'pong',
  DISCONNECT = 'disconnect',
  STATUS_MESSAGE = 'statusMessage',
}

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  private socket$: WebSocketSubject<WsMessage> | undefined;
  private connected$: ReplaySubject<boolean> = new ReplaySubject<boolean>(1);
  public readonly socketConnected$: Observable<boolean>;
  private messagesSubject$ = new Subject<Observable<WsMessage>>();

  public readonly messages$: Observable<WsMessage>;
  public readonly pongMessages$: Observable<PongMessage>;

  private tryReconnect = false;
  private pings: Subscription | undefined;

  constructor(
    private http: HttpClient,
    private userService: UserService,
    private toaster: NotificationService,
    private authService: AuthService
  ) {
    this.socketConnected$ = this.connected$.asObservable();
    this.connected$.next(false);
    this.messages$ = this.messagesSubject$.pipe(
      switchAll(),
      // catchError(e => {
      //   throw e;
      // }),
      shareReplay(1)
    );
    this.pongMessages$ = this.messages$.pipe(
      filter((msg) => msg.action === MESSAGE_TYPES.PONG),
      map((msg: WsMessage) => msg.data as PongMessage)
    );
    this.pongMessages$.subscribe((msg) => {
      const server_to_client = Date.now() - msg.server_ts;
      const total_round_trip = Date.now() - msg.client_ts;
      const client_to_server = msg.server_ts - msg.client_ts;
      logIt('info', {
        ...msg,
        client_to_server,
        server_to_client,
        total_round_trip,
      });
    });

    userService.accessTokenRaw$.subscribe((rawToken) => {
      if (rawToken) {
        this.connect();
      } else {
        this.tryReconnect = false;
        this.close();
      }
    });
  }

  /**
   * Creates a new WebSocket subject and send it to the messages subject
   * @param cfg if true the observable will be retried.
   */
  public connect(cfg: { reconnect: boolean } = { reconnect: false }): void {
    if (!this.userService.accessTokenRaw) {
      logIt('warn', 'tried to connect ws before access token available');
      return;
    }
    if (!this.socket$ || this.socket$.closed) {
      this.socket$ = this.getNewWebSocket();
      const messages = this.socket$.pipe(
        catchError((err) => {
          logIt('error', '1****', err);
          throw err;
        }),
        cfg.reconnect ? this.reconnect : (o) => o,
        tap({
          next: (msg) => {
            if (msg.action === MESSAGE_TYPES.DISCONNECT) {
              this.tryReconnect = false;
              this.close();
            }
          },
        }),
        catchError((_) => {
          logIt('error', '2****', _);
          return EMPTY;
        })
      );
      //TODO only next an observable if a new subscription was made double-check this
      this.messagesSubject$.next(messages);
    }
  }

  /**
   * Retry a given observable by a time span
   * @param observable the observable to be retried
   */
  private reconnect(observable: Observable<WsMessage>): Observable<WsMessage> {
    return observable.pipe(
      retryWhen((errors) =>
        errors.pipe(
          tap((val) => logIt('info', '[Chat Service] Try to reconnect', val)),
          delayWhen((_) => timer(RECONNECT_INTERVAL))
        )
      )
    );
  }

  close() {
    this.socket$?.complete();
    this.socket$ = undefined;
  }

  sendMessage(msg: WsMessage) {
    this.socket$?.next(msg);
  }

  /**
   * Return a custom WebSocket subject which reconnects after failure
   */
  private getNewWebSocket(): WebSocketSubject<WsMessage> {
    logIt(
      'info',
      'getNewWebSocket',
      `${WS_ENDPOINT}`,
      this.userService.accessTokenRaw
    );
    return webSocket<WsMessage>({
      url: WS_ENDPOINT,
      openObserver: {
        next: () => {
          logIt('info', '[wsService]: connection ok');
          this.connected$.next(true);
          this.startPing();
        },
      },
      closeObserver: {
        next: () => {
          logIt('info', '[wsService]: connection closed');
          this.connected$.next(false);
          this.stopPing();
          this.socket$ = undefined;
          if (this.tryReconnect) {
            this.connect({ reconnect: true });
          }
        },
      },
      protocol: [
        'auth',
        (this.userService.accessTokenRaw || ' ').split(' ')[1],
      ],
    });
  }

  private stopPing() {
    if (this.pings && !this.pings.closed) {
      this.pings.unsubscribe();
    }
    this.pings = undefined;
  }

  private startPing() {
    this.stopPing();
    this.pings = interval(5 * 60000)
      .pipe(takeWhile(() => this.socket$ !== undefined))
      .subscribe(() => this.sendPing());
  }

  private sendPing() {
    this.sendMessage({
      status: true,
      action: MESSAGE_TYPES.PING,
      data: { client_ts: Date.now() },
    });
  }
}
