import { HttpClient } from '@angular/common/http';
import { Injectable, OnDestroy } from '@angular/core';
import {
	AsyncSubject,
	BehaviorSubject,
	NEVER,
	Observable,
	Subject,
	asapScheduler,
	firstValueFrom,
	of,
	scheduled,
	timer,
} from 'rxjs';
import {
	distinctUntilChanged,
	filter,
	mergeAll,
	switchMap,
	take,
	takeUntil,
	tap,
} from 'rxjs/operators';
import { Permits } from 'src/lib/constants/permissions';
import { commonGlobal } from 'src/lib/environment/common-global';
import { getEnvironment } from 'src/lib/environment/environment';
import { convertSetToArray } from 'src/lib/utilities/convert';
import { SessionLogger } from 'src/lib/utilities/logging/session-logger';
import * as wampyLib from 'wampy';
import { LoginService } from '../api/generic/login/login.service';
import { PermissionStoreService } from '../stores/permission-store/permission-store.service';
import { InitializationService } from '../utility/initialization/initialization.service';
import { Payload, Wampy, WampyStatic } from './wampy';

type SessionKeyObject = Record<number, number>;

type UserSessionSet = Record<number, SessionKeyObject>;

interface WampTraceLog {
	topic: string;
	obj?: any;
}

// Build Trace Log
function btl(topic: string, obj?: any): WampTraceLog {
	return {
		topic: topic,
		obj: obj,
	};
}

let initialized = false;

@Injectable({
	providedIn: 'root',
})
export class WampService implements OnDestroy {
	private _unsubscribe$ = new AsyncSubject<null>();

	private _logStop = new Subject<null>();

	private _hasConnected = false;
	private _isConnected = new BehaviorSubject<boolean>(false);
	private _ws: Wampy;
	private _onConnect = new Subject<any>();
	private _onClose = new Subject<null>();
	private _onError = new Subject<any>();
	private _onReconnect = new Subject<null>();
	private _onReconnectSuccess = new Subject<any>();

	private __onSubscribeHooked = new Subject<WampTraceLog>();
	private __onSubscribeUnhooked = new Subject<WampTraceLog>();
	private __onCallHooked = new Subject<WampTraceLog>();
	private __onCallUnhooked = new Subject<WampTraceLog>();
	private __onPublishHooked = new Subject<WampTraceLog>();
	private __onPublishUnhooked = new Subject<WampTraceLog>();

	private __onCallPushed = new Subject<WampTraceLog>();
	private __onCallSucceed = new Subject<WampTraceLog>();
	private __onCallErrored = new Subject<WampTraceLog>();
	private __onPublishPushed = new Subject<WampTraceLog>();
	private __onPublishSucceed = new Subject<WampTraceLog>();
	private __onPublishErrored = new Subject<WampTraceLog>();
	private __onSubscribeReceived = new Subject<WampTraceLog>();
	private __onSubscribeErrored = new Subject<WampTraceLog>();

	private __hookStats = {
		subscribe: {},
		call: {},
		publish: {},
	};

	get isConnected$() {
		return this._isConnected.asObservable();
	}
	get onConnect$() {
		return this._onConnect.asObservable();
	}
	get onClose$() {
		return this._onClose.asObservable();
	}
	get onError$() {
		return this._onError.asObservable();
	}
	get onReconnect$() {
		return this._onReconnect.asObservable();
	}
	get onReconnectSuccess$() {
		return this._onReconnectSuccess.asObservable();
	}

	private _currentUserLinkId: number;
	private _reconnectDelayScale = 1;
	private _autoReconnectInterval =
		getEnvironment().settings.services.wamp.hotRetryDelay;
	private _autoReconnectMaxTries = 12; // 1 minute at the default of 5 seconds.

	constructor(
		private httpClient: HttpClient,
		private initService: InitializationService,
		private permissionService: PermissionStoreService,
		private loginService: LoginService,
	) {
		this._currentUserLinkId = this.initService.link_id;

		this.initGlobals();
		this.initSubscriptions();

		this.onClose$
			.pipe(
				filter(() => this._hasConnected && this._ws != null),
				switchMap(() => {
					if (getEnvironment().production) {
						const retryDelay = this.getScaledRetryDelay();
						console.info(
							`Wamp connection failed, attempting reconnect in ${retryDelay / 1000} seconds.`,
						);
						return timer(retryDelay);
					} else return NEVER;
				}),
				switchMap(async () => {
					try {
						console.info('Wamp attempting reconnection.');
						await this._ws.connect();
					} catch {
						console.info('Wamp reconnect failed.');
					}
				}),
				takeUntil(this._unsubscribe$),
			)
			.subscribe();

		this.isConnected$
			.pipe(
				distinctUntilChanged(),
				filter((x) => x),
				takeUntil(this._unsubscribe$),
			)
			.subscribe({
				next: () => {
					this._reconnectDelayScale = 1;
					console.info('Wamp successfully connected.');
				},
			});

		this.subscribe$('ping').subscribe();
	}

	public init = () => {
		if (initialized) {
			return;
		}
		initialized = true;

		this.permissionService
			.getFieldSet$()
			.pipe(
				take(1),
				filter((x) =>
					x.canDo(Permits['ga_direct_message|gabby|participate gabby']),
				),
				switchMap(async () => {
					await this.initWamp(0);
				}),
			)
			.subscribe();
	};

	private waitForConnection = async () => {
		await firstValueFrom(
			this.isConnected$.pipe(
				filter((x) => x),
				take(1),
			),
		);
	};

	private connectWamp = async () => {
		const ws = new (wampyLib.Wampy as WampyStatic)(
			this.initService.websocket.url,
			{
				autoReconnect: true,
				reconnectInterval: this._autoReconnectInterval,
				maxRetries: this._autoReconnectMaxTries,
				realm: this.initService.websocket.realm,
				authid: 'fakeid',
				authmethods: ['ga.garealm'],
				onChallenge: ((auth_method: string) => {
					if (auth_method === 'ga.garealm') {
						return firstValueFrom(
							this.loginService.getLoginStatus().pipe(
								filter((l) => l?.uid != null),
								switchMap(() => {
									return this.httpClient.get('/api/v1/gradchat/token', {
										withCredentials: true,
									});
								}),
							),
						);
					} else {
						console.error(`Unknown auth method attempted, `, auth_method);
						throw Error();
					}
				}) as any,
				onClose: () => {
					if (this._ws === ws) {
						this._onClose.next(null);
					}
				},
				onError: ((error) => {
					if (this._ws === ws) {
						this._onError.next(error);
					}
				}) as any,
				onReconnect: () => {
					if (this._ws === ws) {
						this._onReconnect.next(null);
					}
				},
				onReconnectSuccess: ((welcomeDetails) => {
					if (this._ws === ws) {
						this._onReconnectSuccess.next(welcomeDetails);
					}
				}) as any,
			},
		);

		this._ws = ws;
		const welcomeDetails = await this._ws.connect();

		this._hasConnected = true;
		this._onConnect.next(welcomeDetails);
	};

	private initWamp = async (attemptCount: number): Promise<boolean> => {
		console.info('Wamp attempting to initialize.');

		try {
			await this._ws?.abort();
		} catch {
			// Do nothing.
		}

		try {
			await this.connectWamp();
		} catch {
			const retryDelay =
				attemptCount < this._autoReconnectMaxTries
					? this._autoReconnectInterval
					: this.getScaledRetryDelay();
			console.info(
				`Wamp init failed, trying again in ${retryDelay / 1000} seconds.`,
			);
			await new Promise((resolve) => setTimeout(resolve, retryDelay)); // Wait before trying again.
			await this.initWamp(++attemptCount);
			return false;
		}

		return true;
	};

	private getScaledRetryDelay = () => {
		const reconnectDelay =
			getEnvironment().settings.services.wamp.scaledRetryDelay *
			this._reconnectDelayScale;
		this._reconnectDelayScale = Math.min(this._reconnectDelayScale * 5, 60 * 5); // Max out at 5 minutes seconds.

		return reconnectDelay;
	};

	public call$ = <T>(topic: string, args: Payload): Observable<T> => {
		return new Observable<T>((o) => {
			this.__onCallHooked.next(btl(topic));

			this.__onCallPushed.next(btl(topic, args));

			(async () => {
				try {
					await this.waitForConnection();
					const result = await this._ws.call(topic, args);
					this.__onCallSucceed.next(btl(topic, result.argsList));
					o.next(result.argsList);
					o.complete();
				} catch (err) {
					this.__onCallErrored.next(btl(topic, err));
					o.error(err);
				}
			})();

			return () => {
				this.__onCallUnhooked.next(btl(topic));
			};
		});
	};

	public publish$ = (
		topic: string,
		args: Payload,
		targetLinkIds?: number[],
		sendToSelfSession?: boolean,
	): Observable<unknown> => {
		targetLinkIds = targetLinkIds || [this._currentUserLinkId];
		sendToSelfSession = sendToSelfSession || false;
		return this.call$<UserSessionSet[]>(
			'ga.get_link_sessions',
			targetLinkIds,
		).pipe(
			switchMap((response) => {
				const sessionIdSet = new Set<number>();

				if (response == null) {
					throw new Error('Could not get sessionIds for publish');
				} else {
					response.forEach((set) => {
						for (const k in set) {
							if (set.hasOwnProperty(k)) {
								for (const l in set[k]) {
									if (set[k].hasOwnProperty(l)) {
										sessionIdSet.add(set[k][l]);
									}
								}
							}
						}
					});
				}

				return of(null).pipe(
					tap(() => {
						this.__onPublishHooked.next(btl(topic));
						this.__onPublishPushed.next(btl(topic, args));
					}),
					switchMap(() =>
						this.call$<any>('ga.publish', [
							topic,
							args,
							{
								eligible: convertSetToArray(sessionIdSet),
								exclude_me: !sendToSelfSession,
							},
						]),
					),
				);
			}),
		);
	};

	public publishGlobal$ = (
		topic: string,
		args: Payload,
	): Observable<unknown> => {
		return of(null).pipe(
			tap(() => {
				this.__onPublishHooked.next(btl(topic));
				this.__onPublishPushed.next(btl(topic, args));
			}),
			switchMap(() => this.call$<any>('ga.publish', [topic, args])),
		);
	};

	public subscribe$ = <T>(topic: string): Observable<T> => {
		return new Observable<T>((o) => {
			this.__onSubscribeHooked.next(btl(topic));

			const onEvent = (result: any) => {
				this.__onSubscribeReceived.next(btl(topic, result.argsList));
				o.next(result.argsList);
			};

			(async () => {
				try {
					await this.waitForConnection();
					await this._ws.subscribe(topic, onEvent);
				} catch (err) {
					this.__onSubscribeErrored.next(btl(topic, err));
					o.error(err);
				}
			})();

			return async () => {
				try {
					this.__onSubscribeUnhooked.next(btl(topic));
					await this.waitForConnection();
					await this._ws.unsubscribe(topic, onEvent);
				} catch (err) {
					console.error(err);
				}
			};
		});
	};

	private initSubscriptions = () => {
		// Hook stats
		this.__onSubscribeHooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.subscribe[x.topic] =
				(this.__hookStats.subscribe[x.topic] || 0) + 1;
		});
		this.__onSubscribeUnhooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.subscribe[x.topic] =
				(this.__hookStats.subscribe[x.topic] || 0) - 1;
		});
		this.__onCallHooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.call[x.topic] =
				(this.__hookStats.call[x.topic] || 0) + 1;
		});
		this.__onCallUnhooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.call[x.topic] =
				(this.__hookStats.call[x.topic] || 0) - 1;
		});
		this.__onPublishHooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.publish[x.topic] =
				(this.__hookStats.publish[x.topic] || 0) + 1;
		});
		this.__onPublishUnhooked.pipe(takeUntil(this._logStop)).subscribe((x) => {
			this.__hookStats.publish[x.topic] =
				(this.__hookStats.publish[x.topic] || 0) - 1;
		});

		// Session logger
		this.onConnect$
			.pipe(
				SessionLogger.logObservableTap((l, x) =>
					l('WampService', 'onConnect$', x),
				),
			)
			.subscribe();
		this.onClose$
			.pipe(SessionLogger.logObservableNext('WampService', 'onClose$'))
			.subscribe();
		this.onError$
			.pipe(
				SessionLogger.logObservableTap((l, x) =>
					l('WampService', 'onError$', x),
				),
			)
			.subscribe();
		this.onReconnect$
			.pipe(SessionLogger.logObservableNext('WampService', 'onReconnect$'))
			.subscribe();
		this.onReconnectSuccess$
			.pipe(
				SessionLogger.logObservableTap((l, x) =>
					l('WampService', 'onReconnectSuccess$', x),
				),
			)
			.subscribe();

		scheduled([this.onConnect$, this.onReconnectSuccess$], asapScheduler)
			.pipe(mergeAll())
			.subscribe(() => this._isConnected.next(true));
		scheduled([this.onClose$, this.onError$], asapScheduler)
			.pipe(mergeAll())
			.subscribe(() => this._isConnected.next(false));
	};

	private initGlobals = () => {
		commonGlobal.wampStats = () => {
			return this.__hookStats;
		};
		commonGlobal.registerLogging((enable) => {
			if (enable) {
				// Public logs
				this.onConnect$
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onConnect', x));
				this.onClose$
					.pipe(takeUntil(this._logStop))
					.subscribe(() => console.debug('onClose'));
				this.onError$
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onError', x));
				this.onReconnect$
					.pipe(takeUntil(this._logStop))
					.subscribe(() => console.debug('onReconnect'));
				this.onReconnectSuccess$
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onReconnectSuccess', x));

				// Hook logs
				this.__onSubscribeHooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onSubscribeHooked', x.topic, x.obj));
				this.__onSubscribeUnhooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) =>
						console.debug('onSubscribeUnhooked', x.topic, x.obj),
					);
				this.__onCallHooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onCallHooked', x.topic, x.obj));
				this.__onCallUnhooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onCallUnhooked', x.topic, x.obj));
				this.__onPublishHooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onPublishHooked', x.topic, x.obj));
				this.__onPublishUnhooked
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onPublishUnhooked', x.topic, x.obj));

				// Receiving logs
				this.__onCallPushed
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.info('onCallPushed', x.topic, x.obj));
				this.__onCallSucceed
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onCallSucceed', x.topic, x.obj));
				this.__onCallErrored
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.error('onCallErrored', x.topic, x.obj));
				this.__onPublishPushed
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.info('onPublishPushed', x.topic, x.obj));
				this.__onPublishSucceed
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.debug('onPublishSucceed', x.topic, x.obj));
				this.__onPublishErrored
					.pipe(takeUntil(this._logStop))
					.subscribe((x) => console.error('onPublishErrored', x.topic, x.obj));
				this.__onSubscribeReceived
					.pipe(takeUntil(this._logStop))
					.subscribe((x) =>
						console.info('onSubscribeReceived', x.topic, x.obj),
					);
				this.__onSubscribeErrored
					.pipe(takeUntil(this._logStop))
					.subscribe((x) =>
						console.error('onSubscribeErrored', x.topic, x.obj),
					);
			} else {
				this._logStop.next(null);
			}
		});
	};

	ngOnDestroy() {
		try {
			this._ws?.abort();
		} catch {
			// Do nothing.
		}

		this._unsubscribe$.next(null);
		this._unsubscribe$.complete();
		this._unsubscribe$ = null;
	}
}
