import { HttpClient } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { interval, Observable, ReplaySubject } from 'rxjs';
import { filter, map, shareReplay } from 'rxjs/operators';
import { NotificationService } from './notification.service';
import { UserService } from './user.service';
import { downloadUrl, logIt } from './utils';
import {
  MESSAGE_TYPES,
  WebsocketService,
  WsMessage,
} from './websocket.service';

export interface StatusMessage {
  id: string;
  principal_id: string;
  action: string;
  started_at: number;
  finished_at: number | null;
  file?: string;
  company?: string | null;
  vendor_id?: string | null;
  url?: string | null;
  message: string;
  status: string;
  progress: number;
  timeout?: number;
}

const TASK_START_TIMEOUT = 120000;
const TASK_COMPLETE_TIMEOUT = 120000;

@Injectable({
  providedIn: 'root',
})
export class TaskService {
  private tasks: string[] = [];
  private taskLog: string[] = [];
  public readonly taskHash: Record<string, StatusMessage> = {};

  public readonly statusMessages$: Observable<StatusMessage>;
  public readonly tasks$: ReplaySubject<string[]> = new ReplaySubject<string[]>(
    1
  );
  public readonly taskLog$: ReplaySubject<string[]> = new ReplaySubject<
    string[]
  >(1);

  constructor(
    private http: HttpClient,
    private userService: UserService,
    private toaster: NotificationService,
    private wsService: WebsocketService
  ) {
    this.statusMessages$ = wsService.messages$.pipe(
      filter((msg) => msg.action === MESSAGE_TYPES.STATUS_MESSAGE),
      map((msg: WsMessage) => msg.data as StatusMessage),
      shareReplay(1)
    );

    this.statusMessages$.subscribe((msg) => {
      logIt('info', msg);
      this.addTask(msg);
      const task: StatusMessage = this.taskHash[msg.id];
      if (task && task.progress < 33) {
        task.progress = 33;
      }
      this.setTaskProgress(msg.id, 33, true);
      if (msg.url) {
        downloadUrl(msg.url);
      }
    });
  }

  public logTaskMsg(msg: StatusMessage) {
    this.taskLog.unshift(msg.message);
    while (this.taskLog.length > 10) this.taskLog.pop();
    this.taskLog$.next(this.taskLog);
    if (msg.status === 'success') {
      this.toaster.success(msg.message, 10000);
    } else if (msg.status === 'error') {
      this.toaster.error(msg.message, 10000);
    } else {
      this.toaster.info(msg.message, 10000);
    }
  }

  public addTask(task: StatusMessage): boolean {
    task.progress = task.progress || 0;
    const old_task: StatusMessage | undefined = this.taskHash[task.id];
    const startTimer = (timeout: number = 60000) => {
      if (old_task) task = old_task;
      task.timeout = setTimeout(() => {
        task.progress = 100;
        task.message = 'Task timed out...';
        task.status = 'error';
        this.setTaskProgress(task.id, task.progress);
        this.logTaskMsg(task);
      }, timeout);
    };
    if (!old_task) {
      this.tasks = [task.id, ...this.tasks];
      startTimer(TASK_START_TIMEOUT);
      this.taskHash[task.id] = task;

      const sub = interval(3000).subscribe(() => {
        this.setTaskProgress(task.id, task.progress + 1);
        if (task.progress >= 100) {
          sub.unsubscribe();
          if (task.timeout) clearTimeout(task.timeout);
        }
      });
    } else {
      task.progress = old_task.progress;
      task.timeout = old_task.timeout;
      let k: keyof StatusMessage;
      for (k in task) {
        // @ts-ignore
        old_task[k] = task[k];
      }
      if (task.timeout) clearTimeout(task.timeout);
      startTimer(TASK_COMPLETE_TIMEOUT);
    }
    this.logTaskMsg(task);

    return !old_task;
  }

  public setTaskProgress(
    task_id: string,
    progress: number,
    inc: boolean = false
  ): void {
    const task: StatusMessage = this.taskHash[task_id];
    if (!task) return;
    task.progress = Math.max(
      0,
      Math.min(100, inc ? task.progress + progress : progress)
    );
    this.tasks$.next(this.tasks);
  }
}
