import {Observable, Observer, Subject, Subscription, timer} from 'rxjs';
import {debounceTime, filter, share} from 'rxjs/operators';
import {RxWebSocket} from './rx-web-socket';
import {WS_CHANNELS} from './web-socket.interface';

interface IFiltersToSend {
  action: string;
  filter: any;
}

interface IFilterSubscription {
  filter: any;
  observable?: Observable<any>;
}

interface IChannelSubscription {
  name: WS_CHANNELS;
  filters: IFilterSubscription[];
  filtersToSend: IFiltersToSend[];
  observable?: Observable<any>;
  observer?: Observer<any>;
  serverNotification?: Subscription;
}

export class RxWebSocketChannels {
  private readonly channelsMatch: (source: any, target: any) => boolean;
  private readonly filtersMatch: (source: any, target: any) => boolean;
  private connection: RxWebSocket;
  private observable: Observable<any>;
  private subs: Map<string, IChannelSubscription> = new Map<string, IChannelSubscription>();

  constructor(connection: RxWebSocket) {
    this.connection = connection;
    this.channelsMatch = this.connection.options.channelsMatch || RxWebSocketChannels.channelsMatchDefault;
    this.filtersMatch = this.connection.options.filtersMatch || RxWebSocketChannels.filtersMatchDefault;
    this.observable = this.connection.observable;
    this.connection.connectionStatus.subscribe(state => this.reSubscribe(state));
  }

  private static channelsMatchDefault(source: any, target: any): boolean {
    return source === target;
  }

  private static filtersMatchDefault(source: any, target: any): boolean {
    if (target) {
      for (const p in source) {
        if (source[p] !== target[p]) {
          return false;
        }
      }
      return true;
    }
    return false;
  }

  public subscribe(channel: WS_CHANNELS, responseHandle: (message: any) => void): Subscription {
    return this.initChannel(channel).observable.subscribe(responseHandle);
  }

  public subscribeFilter(channel: WS_CHANNELS, chFilter: any, responseHandle: any): Subscription {
    return this.initFilter(channel, chFilter).observable.subscribe(responseHandle);
  }

  private reSubscribe(state: string): void {
    if (state === this.connection.connectionStatusOptions.openedAfterRetry) {
      this.subs.forEach(channelSub => {
        const filters = this.getChannelFilters(channelSub);
        this.joinChannel(channelSub.name, filters);
      });
    }
  }

  private initChannel(channel: WS_CHANNELS, fromFilter?: boolean): IChannelSubscription {
    let channelSub: IChannelSubscription;
    if (this.subs.has(channel)) {
      channelSub = this.subs.get(channel) as IChannelSubscription;
    } else {
      channelSub = {
        filters: [],
        filtersToSend: [],
        name: channel,
        observable: undefined,
        observer: undefined,
        serverNotification: undefined
      };
      channelSub.observable = this.createNewChannelObservable(channel, channelSub, fromFilter);
      channelSub.serverNotification = this.createNewChannelServerSubscriptionObservable(channelSub);
    }
    return channelSub;
  }

  private createNewChannelServerSubscriptionObservable(channelSub: IChannelSubscription): Subscription {
    return new Observable(observer => (channelSub.observer = observer))
      .pipe(debounceTime(10))
      .subscribe(channel => this.debounceChannelNotifications(channel as string));
  }

  private createNewChannelObservable(channel: string, channelSub: IChannelSubscription, fromFilter?: boolean) {
    return new Observable(observer => {
      if (!fromFilter) {
        this.joinChannel(channel);
      }
      this.subs.set(channel, channelSub);
      const channelsSubscription = this.observable.pipe(filter(this.createChannelFilter(channel))).subscribe(observer);
      return () => {
        this.leaveChannel(channel);
        channelsSubscription.unsubscribe();
        channelSub.serverNotification.unsubscribe();
        this.subs.delete(channel);
      };
    }).pipe(share());
  }

  private initFilter(channel: WS_CHANNELS, chFilter: any): IFilterSubscription {
    const channelSub = this.initChannel(channel, true);
    let filterSub = this.findExistingFilter(channelSub.filters, chFilter) as IFilterSubscription;
    if (filterSub === undefined) {
      filterSub = {filter: chFilter, observable: undefined};
      filterSub.observable = this.createNewFilterObservable(channelSub, chFilter, filterSub);
    }
    return filterSub as IFilterSubscription;
  }

  private findExistingFilter(channelFilters: {filter: any}[], chFilter: any) {
    let i = 0;
    const l = channelFilters.length;
    for (; i < l; i++) {
      if (this.filtersMatch(channelFilters[i].filter, chFilter)) {
        return channelFilters[i];
      }
    }
    return undefined;
  }

  private createNewFilterObservable(channelSub: IChannelSubscription, chFilter: any, filterSub: IFilterSubscription): Observable<any> {
    return new Observable(observer => {
      channelSub.observer.next(channelSub.name);
      channelSub.filtersToSend.push({
        action: this.connection.options.filterJoinAction,
        filter: chFilter
      });
      channelSub.filters.push(filterSub);
      const filterSubscription = channelSub.observable.pipe(filter(this.createFilterFunction(chFilter))).subscribe(observer);
      return () => {
        channelSub.filters.splice(channelSub.filters.indexOf(filterSub), 1);
        channelSub.observer.next(channelSub.name);
        channelSub.filtersToSend.push({
          action: this.connection.options.filterLeaveAction,
          filter: chFilter
        });
        filterSubscription.unsubscribe();
      };
    }).pipe(share({connector: () => new Subject(), resetOnRefCountZero: () => timer(5000)}));
  }

  private getChannelFilters(channelSub: IChannelSubscription): IFiltersToSend[] {
    return channelSub.filters.map(filterSub => ({
      action: this.connection.options.filterJoinAction,
      filter: filterSub.filter
    }));
  }

  private joinChannel(channel: string, filters?: IFiltersToSend[]) {
    this.connection.subject.next({
      action: this.connection.options.channelJoinAction,
      channel,
      filters
    });
  }

  private debounceChannelNotifications(channel: string) {
    const channelSub = this.subs.get(channel);
    if (!channelSub) {
      return;
    }
    const filtersToSend: IFiltersToSend[] = [];
    const filters = channelSub.filtersToSend;
    let i = 0;
    const l = filters.length;
    for (; i < l; i++) {
      const filterToSend = filters[i];
      const existingFilter = this.findExistingFilter(filtersToSend, filterToSend.filter) as IFiltersToSend;
      if (existingFilter) {
        // do not send the filter if they match with an existing one (probably is REMOVE / ADD action)
        filtersToSend.splice(filtersToSend.indexOf(existingFilter), 1);
      } else {
        filtersToSend.push({
          action: filterToSend.action,
          filter: filterToSend.filter
        });
      }
    }

    channelSub.filtersToSend = [];
    if (filtersToSend.length) {
      this.joinChannel(channel, filtersToSend);
    }
  }

  private leaveChannel(channel: string) {
    this.connection.subject.next({
      action: this.connection.options.channelLeaveAction,
      channel
    });
  }

  private createChannelFilter(channel: string) {
    return (data: any) => this.channelsMatch(channel, data.channel);
  }

  private createFilterFunction(chFilter: any) {
    return (data: any) => this.filtersMatch(chFilter, data.filter);
  }
}
