import { Observable, Subject } from 'rxjs';
import { concatMap, distinct, filter, map, windowTime } from 'rxjs/operators';
import { Injectable } from '@angular/core';

/**
 * Broadcast Event interface.
 */
interface BroadcastEvent {
    // Event name
    key: any;
    // Event data
    data?: any;
}

@Injectable()
export class Broadcaster {
    // EventBus
    private eventBus: Subject<BroadcastEvent>;

    /**
     *
     */
    constructor() {
        this.eventBus = new Subject<BroadcastEvent>();
    }

    /**
     * Emit an event.
     *
     * @param key
     * @param data
     */
    broadcast(key: any, data?: any): void {
        this.eventBus.next({key, data});
    }

    /**
     * Return the event bus as observable, to subsribe events.
     *
     * @param key
     */
    on<T>(key: any): Observable<T> {
        return this.eventBus.asObservable().pipe(
            filter(event => event.key === key),
            windowTime(100),
            concatMap(obs => obs.pipe(distinct(event =>  {
                let json = '';
                try {
                    json = JSON.stringify(event.data);
                } catch {}

                return event.key + json;
            }))),
            map(event => event.data as T),
        );
    }
}
