import { Observable, Subscriber } from 'rxjs';

/**
 * While the subject has no subscriber, it buffers all `next` values.
 * As soon as a first subscriber is added, all values from the buffer are emitted to that subscriber
 * and the buffer is cleared.
 */
export class BufferSubject<T> extends Observable<T> {
  private buffer: T[] = [];
  private readonly subscribers = new Set<Subscriber<T>>();

  constructor(private readonly maxSize: number) {
    super((subscriber) => {
      this.subscribers.add(subscriber);
      const buffer = this.buffer;
      this.buffer = [];

      for (const item of buffer) {
        subscriber.next(item);
      }

      return () => {
        this.subscribers.delete(subscriber);
      };
    });
  }

  next(value: T) {
    const nextSubscriber = this.subscribers.values().next();

    if (!nextSubscriber.done && nextSubscriber.value) {
      nextSubscriber.value.next(value);
    } else if (this.buffer.length < this.maxSize) {
      this.buffer.push(value);
    } else {
      throw new Error(`Max buffer size reached:  ${this.maxSize}`);
    }
  }
}
