import { EMPTY, from, merge, Observable, of } from 'rxjs';
import { concatMap, endWith, map, scan, takeWhile } from 'rxjs/operators';

const COMPLETED = Symbol('COMPLETED');
/**
 * Buffers source values and emits them based on a notifier. Notifier values are not buffered.
 *
 * Use this operator if you have a dependency between the processing of the source values and your notifier observable,
 * that is, if you need the re-evalute a notifier each time a source value has been emitted and/or acted upon.
 *
 * WARNING
 * If not used correctly, this operator may buffer your source values until the end of days (or memory runs out - whichever comes first).
 *
 * This operator is similar to "rxjs/operators/zip", but does not buffer notifiers.
 * I.e. even if the notifier emits multiple values, before the source emits multiple values,
 * only a single source value is emitted until a new value is emitted by the notifier.
 *
 * Example: Only emit a source value if there are less than 10 source values "doingSomething"
 * @example
 *
 * waitUntil(
 *  source,
 *  notifier.pipe(startWith()).pipe(sum(0)).pipe(filter((sum) => sum < 10)),
 * )
 *   .pipe(tap({ next: (buffer) => notifier.next(+1) }))
 *   .pipe(mergeMap(() => doSomething()))
 *   .pipe(tap({ next: (buffer) => notifier.next(-1) }))
 *
 * @param source
 * @param notifier
 */
export function waitUntil<T>(
  source$: Observable<T>,
  notifier$: Observable<void>,
): Observable<T> {
  const initialMerge: [
    number,
    { value: T | typeof COMPLETED; index: number } | undefined,
  ] = [-1, undefined];
  const initialBuffer: {
    isCompleted: boolean;
    bufferedIndex: number;
    emittedIndex: number;
    buffer: ReadonlyArray<T>;
    emit: { value: T } | false;
  } = {
    isCompleted: false,
    bufferedIndex: -1,
    emittedIndex: -1,
    buffer: [],
    emit: false,
  };

  return merge(
    notifier$
      .pipe(map((_, index) => index))
      .pipe(map((value) => ({ type: 'notifier', value } as const))),
    source$
      .pipe(map((value, index) => ({ value, index })))
      .pipe(map((value) => ({ type: 'data', value } as const)))
      .pipe(endWith({ type: 'data', value: COMPLETED } as const)),
  )
    .pipe(
      scan(
        (
          acc: readonly [
            number,
            { value: T | typeof COMPLETED; index: number } | undefined,
          ],
          event,
        ) =>
          event.type === 'notifier'
            ? ([event.value, acc[1]] as const)
            : ([
                acc[0],
                event.value === COMPLETED
                  ? ({
                      value: COMPLETED,
                      index: acc[1] !== undefined ? acc[1].index : -1,
                    } as const)
                  : event.value,
              ] as const),
        initialMerge,
      ),
    )
    .pipe(
      concatMap(([notifierIndex, data]) =>
        data ? of([notifierIndex, data] as const) : EMPTY,
      ),
    )
    .pipe(
      scan(
        (
          {
            buffer,
            bufferedIndex: previousBufferedIndex,
            emittedIndex: previousEmittedIndex,
          },
          [notifierIndex, { value, index: valueIndex }],
        ) => {
          const isAlreadyBuffered = previousBufferedIndex >= valueIndex;
          const canEmit = previousEmittedIndex < notifierIndex;

          if (canEmit && isAlreadyBuffered && buffer.length > 0) {
            return {
              isCompleted: value === COMPLETED && buffer.length <= 1,
              bufferedIndex: previousBufferedIndex,
              emittedIndex: notifierIndex,
              buffer: buffer.slice(1),
              emit: { value: buffer[0] },
            } as const;
          } else if (value !== COMPLETED && canEmit && !isAlreadyBuffered) {
            return {
              isCompleted: false,
              bufferedIndex: valueIndex,
              emittedIndex: notifierIndex,
              buffer: buffer.length > 0 ? [...buffer.slice(1), value] : [],
              emit: { value: buffer.length > 0 ? buffer[0] : value },
            } as const;
          } else if (value !== COMPLETED && !isAlreadyBuffered) {
            return {
              isCompleted: false,
              bufferedIndex: valueIndex,
              emittedIndex: previousEmittedIndex,
              buffer: [...buffer, value],
              emit: false,
            } as const;
          } else {
            return {
              isCompleted: value === COMPLETED && buffer.length === 0,
              bufferedIndex: previousBufferedIndex,
              emittedIndex: previousEmittedIndex,
              buffer,
              emit: false,
            } as const;
          }
        },
        initialBuffer,
      ),
    )
    .pipe(
      concatMap((v) => (v.isCompleted ? from([v, COMPLETED] as const) : of(v))),
    )
    .pipe(takeWhile((v) => v !== COMPLETED))
    .pipe(
      concatMap((v) => (v !== COMPLETED && v.emit ? of(v.emit.value) : EMPTY)),
    );
}
