import { Observable, of } from 'rxjs';
import type { ObservedValueOf, OperatorFunction } from 'rxjs';
import {
  catchError,
  concatMap,
  endWith,
  exhaustMap,
  map,
  mergeMap,
  scan,
  startWith,
  switchMap,
  tap,
} from 'rxjs/operators';
import { withSubscriptionHandlers } from '../observable/with-subscription-handlers';
import { isDefined } from '../operators/is-defined';
import { CallableOperationFn } from './blocking-operation.type';
import { StrategyType } from './options.type';

export interface BlockingOperationStatus<T> {
  type: string;
  running: boolean;
  hasValue: false;
  error?: any;
  completed: boolean;
  accumulatedValue?: T;
}

export interface BlockingOperationResultWithValue<T> {
  type: string;
  running: true;
  hasValue: true;
  error: undefined;
  value: T;
  accumulatedValue: T | undefined;
  completed: boolean;
}

export function hasValue<T>(
  result: BlockingOperationResult<T>,
): result is BlockingOperationResultWithValue<T> {
  return result.hasValue;
}

export function hasValueOfType<T, TValue extends T>(
  isOfType: (value: T) => value is TValue,
): (
  result: BlockingOperationResult<T>,
) => result is BlockingOperationResultWithValue<TValue> {
  return (
    result: BlockingOperationResult<T>,
  ): result is BlockingOperationResultWithValue<TValue> =>
    hasValue(result) && isOfType(result.value);
}

export type BlockingOperationResult<T> =
  | BlockingOperationStatus<T>
  | BlockingOperationResultWithValue<T>;

function createAccumulator<T>(
  aggregator: (acc: T | undefined, next: T) => T,
): (
  acc: BlockingOperationResult<T> | undefined,
  value: BlockingOperationResult<T>,
) => BlockingOperationResult<T> {
  return (
    acc: BlockingOperationResult<T> | undefined,
    value: BlockingOperationResult<T>,
  ) => ({
    ...value,
    // prettier-ignore
    accumulatedValue: value.hasValue
      ? aggregator(acc ? acc.accumulatedValue : undefined, value.value)
      : acc
        ? acc.accumulatedValue
        : undefined,
  });
}

export type ExecutionStrategy<T> = (
  project: (
    value: CallableOperationFn<T>,
  ) => Observable<BlockingOperationResult<T>>,
) => OperatorFunction<
  CallableOperationFn<T>,
  ObservedValueOf<Observable<BlockingOperationResult<T>>>
>;

export function createExhaustMapExecutionStrategy<T>(): ExecutionStrategy<T> {
  return (project) => exhaustMap(project);
}

export function createConcatMapExecutionStrategy<T>(): ExecutionStrategy<T> {
  return (project) => concatMap(project);
}

export function createMergeMapExecutionStrategy<T>(
  concurrent?: number,
): ExecutionStrategy<T> {
  return (project) => mergeMap(project, concurrent);
}

export function createSwitchMapExecutionStrategy<T>(): ExecutionStrategy<T> {
  return (project) => switchMap(project);
}

export function createExecutionStrategy<T>(
  type: StrategyType,
): ExecutionStrategy<T> {
  switch (type) {
    case 'merge':
      return createMergeMapExecutionStrategy();
    case 'switch':
      return createSwitchMapExecutionStrategy();
    case 'exhaust':
      return createExhaustMapExecutionStrategy();
    case 'concat':
    case 'single':
    default:
      return createConcatMapExecutionStrategy();
  }
}

export function blockingOperation<T>(
  source: Observable<CallableOperationFn<T>>,
  handler: {
    next?: (value: T) => void;
    error?: (error: any) => void;
    complete?: (value: T | undefined) => void;
  },
  {
    strategy,
    isRunning = false,
    aggregator = (_, next) => next,
    onUnsubscribe = () => {},
  }: {
    strategy: ExecutionStrategy<T>;
    isRunning?: boolean;
    aggregator?: (acc: T | undefined, next: T) => T;
    onUnsubscribe?: () => void;
  },
): Observable<BlockingOperationResult<T>> {
  const initialFrameBase = {
    type: 'initial',
    error: false,
    hasValue: false as const,
    completed: false,
    accumulatedValue: undefined,
  };

  const startRunningFrame = {
    type: 'start',
    running: true,
    error: false,
    hasValue: false as const,
    completed: false,
    accumulatedValue: undefined,
  };

  const clearFrame = {
    type: 'clear',
    running: false,
    error: false,
    hasValue: false as const,
    completed: false,
    accumulatedValue: undefined,
  };

  const resultFrameBase = {
    type: 'result',
    running: true as const,
    hasValue: true as const,
    error: undefined,
    completed: false,
    accumulatedValue: undefined,
  };

  const endRunningFrame = {
    type: 'end',
    running: false,
    error: false,
    hasValue: false as const,
    completed: true,
    accumulatedValue: undefined,
  };

  const errorFrameBase = {
    type: 'error',
    running: false,
    hasValue: false as const,
    completed: true,
    accumulatedValue: undefined,
  };

  const results$ = withSubscriptionHandlers(source, { onUnsubscribe })
    .pipe(
      strategy((fn: CallableOperationFn<T>) =>
        fn()
          .pipe(map((v) => ({ ...resultFrameBase, value: v })))
          .pipe(tap((result) => handler.next && handler.next(result.value)))
          .pipe(
            scan<
              BlockingOperationResult<T>,
              BlockingOperationResult<T> | undefined
            >(createAccumulator(aggregator), undefined),
          )
          .pipe(isDefined())
          .pipe(
            startWith(startRunningFrame),
            endWith(endRunningFrame),
            scan<
              BlockingOperationResult<T>,
              BlockingOperationResult<T> | undefined
            >(
              (
                lastValue: BlockingOperationResult<T> | undefined,
                next: BlockingOperationResult<T>,
              ) => {
                if (next.completed && handler.complete) {
                  handler.complete(
                    lastValue ? lastValue.accumulatedValue : undefined,
                  );
                }
                return next;
              },
              undefined,
            ),
            isDefined(),
            endWith(clearFrame),
            catchError((error) => of({ ...errorFrameBase, error })),
            tap({
              next: (next) => {
                if (next.error && handler.error) {
                  handler.error(next.error);
                }
              },
              error: (error) => handler.error && handler.error(error),
            }),
          ),
      ),
    )
    .pipe(startWith({ ...initialFrameBase, running: isRunning }));

  return results$;
}
