import { mergeImmutable } from '@bbraun/shared/util-lang';
import { EMPTY, merge, Observable, of, Subject } from 'rxjs';
import {
  catchError,
  distinctUntilChanged,
  first,
  map,
  scan,
  share,
  switchMap,
  tap,
} from 'rxjs/operators';
import { onSubscription } from '../operators/on-subscription';
import { blockingOperationHandler } from '../blocking-operation-handler/blocking-operation-handler';

export function withSharedValue<T, V>(
  observable: Observable<T>,
  mapping: (v: T) => Observable<V>,
  {
    onError,
  }: {
    onError: (error: unknown | undefined) => Observable<V>;
  },
): Observable<V> {
  // keep track of the latest emitted value from the result observable
  let latestValue:
    | { hasValue: false; error?: unknown }
    | { hasValue: true; input: T; value: V } = { hasValue: false };

  const resubscribeSubject = new Subject<void>();

  const source$ = merge(
    resubscribeSubject,
    observable
      .pipe(distinctUntilChanged())
      .pipe(tap({ complete: () => resubscribeSubject.complete() })),
  ).pipe(
    map(
      () => () =>
        observable
          .pipe(first())
          .pipe(
            switchMap((input) =>
              latestValue.hasValue && latestValue.input === input
                ? of(latestValue)
                : mapping(input)
                    .pipe(first())
                    .pipe(
                      map(
                        (value) =>
                          ({
                            hasValue: true,
                            input,
                            value,
                          } as const),
                      ),
                    ),
            ),
          )
          .pipe(catchError((error) => of({ hasValue: false, error } as const))),
    ),
  );

  const operationHandler = blockingOperationHandler(
    {
      next: (value) => (latestValue = value),
    },
    { sourceOnly: true, source: source$, strategy: 'concat' },
  );

  const values$ = operationHandler.results
    .pipe(switchMap((result) => (result.hasValue ? of(result.value) : EMPTY)))
    .pipe(
      switchMap((value) =>
        value.hasValue ? of(value.value) : onError(value.error),
      ),
    )
    .pipe(
      scan(
        (prev: V, next) =>
          // eslint-disable-next-line deprecation/deprecation
          mergeImmutable(prev, next, { mode: 'unsafe' }).result,
      ),
    )
    .pipe(share());

  return values$.pipe(distinctUntilChanged()).pipe(
    onSubscription({
      afterSubscribe: () => {
        resubscribeSubject.next();
      },
    }),
  );
}
