import {
  BehaviorSubject,
  combineLatest,
  EMPTY,
  merge,
  Observable,
  Subject,
} from 'rxjs';
import {
  distinctUntilChanged,
  filter,
  map,
  share,
  shareReplay,
  switchMap,
  takeUntil,
  tap,
} from 'rxjs/operators';
import { waitUntil } from '../observable/wait-until';
import { mapToVoid } from '../operators/map-to-void';
import { onSubscription } from '../operators/on-subscription';
import {
  blockingOperation,
  BlockingOperationResult,
  createExecutionStrategy,
} from './blocking-operation';
import { CallableOperationFn } from './blocking-operation.type';
import {
  DisableAutoConnectResultOption,
  EnableAutoConnectResultOption,
  SourceOnlyOption,
  StrategyType,
} from './options.type';

export interface BlockingOperationHandler<T> {
  next: (operation: CallableOperationFn<T>) => void;
  results: Observable<BlockingOperationResult<T>>;
  running: Observable<boolean>;
  queueSize: Observable<number>;
}

export function blockingOperationHandler<T>(
  handler: {
    next?: (value: T) => void;
    error?: (error: any) => void;
    complete?: (value: T | undefined) => void;
  },
  options: SourceOnlyOption<T>,
): Pick<BlockingOperationHandler<T>, 'results'>;
export function blockingOperationHandler<T>(
  handler: {
    next?: (value: T) => void;
    error?: (error: any) => void;
    complete?: (value: T | undefined) => void;
  },
  options?: EnableAutoConnectResultOption<T>,
): Omit<BlockingOperationHandler<T>, 'results'>;
export function blockingOperationHandler<T>(
  handler: {
    next?: (value: T) => void;
    error?: (error: any) => void;
    complete?: (value: T | undefined) => void;
  },
  options: DisableAutoConnectResultOption<T>,
): BlockingOperationHandler<T>;
export function blockingOperationHandler<T>(
  handler: {
    next?: (value: T) => void;
    error?: (error: any) => void;
    complete?: (value: T | undefined) => void;
  },
  {
    sourceOnly = false,
    source = EMPTY,
    strategy = 'single',
    isRunning = false,
    aggregator = (_, next) => next,
    maxRunningSize = Number.MAX_SAFE_INTEGER,
    onUnsubscribe = () => {},
    autoConnectResult = true,
  }:
    | SourceOnlyOption<T>
    | EnableAutoConnectResultOption<T>
    | DisableAutoConnectResultOption<T> = {},
): BlockingOperationHandler<T> {
  const queueSizeSubject = new BehaviorSubject(0);
  const runningSizeSubject = new BehaviorSubject(0);
  const taskSourceCompletedSubject = new Subject<boolean>();

  const subject = new Subject<CallableOperationFn<T>>();
  const sourceOrEmpty: Observable<CallableOperationFn<T>> = sourceOnly
    ? EMPTY
    : subject;

  const tasksSource$ = merge(
    sourceOrEmpty,
    source.pipe(
      tap({
        next: () => {
          const queueSize = calculateQueueSize(
            strategy,
            queueSizeSubject.value,
          );
          queueSizeSubject.next(queueSize);
        },
        complete: () => taskSourceCompletedSubject.next(sourceOnly),
      }),
    ),
  );

  const results$: Observable<BlockingOperationResult<T>> = blockingOperation(
    waitUntil(
      tasksSource$,
      runningSizeSubject
        .pipe(filter((size) => size < maxRunningSize))
        .pipe(mapToVoid()),
    ).pipe(
      tap<CallableOperationFn<T>>(() => {
        if (strategy === 'switch' || strategy === 'exhaust') {
          runningSizeSubject.next(1);
        } else {
          runningSizeSubject.next(runningSizeSubject.value + 1);
        }
      }),
    ),
    handler,
    {
      isRunning,
      aggregator,
      strategy: createExecutionStrategy(strategy),
      onUnsubscribe,
    },
  )
    .pipe(
      tap((result) => {
        if (result.completed) {
          if (strategy === 'switch' || strategy === 'exhaust') {
            queueSizeSubject.next(0);
            runningSizeSubject.next(0);
          } else {
            queueSizeSubject.next(queueSizeSubject.value - 1);
            runningSizeSubject.next(runningSizeSubject.value - 1);
          }
        }
      }),
      map((result, index) => {
        // for the initial frame on index 0, we will always respect the value of isRunning
        if (index > 0) {
          return {
            ...result,
            running: queueSizeSubject.value > 0,
          } as BlockingOperationResult<T>;
        } else {
          return result;
        }
      }),
    )
    .pipe(
      onSubscription({
        onUnsubscribe: () => {
          queueSizeSubject.next(0);
          runningSizeSubject.next(0);
        },
      }),
    );

  const queueSize$ = queueSizeSubject
    .pipe(distinctUntilChanged())
    .pipe(shareReplay({ refCount: true, bufferSize: 1 }));
  const isQueueEmpty$ = queueSize$
    .pipe(map((queueSize) => queueSize === 0))
    .pipe(distinctUntilChanged());

  const running$ = runningSizeSubject
    .pipe(map((runningSize) => runningSize > 0))
    .pipe(distinctUntilChanged())
    .pipe(shareReplay({ refCount: true, bufferSize: 1 }));

  const sharedResult$ = results$
    .pipe(
      takeUntil(
        combineLatest([
          taskSourceCompletedSubject,
          isQueueEmpty$,
          running$,
        ]).pipe(
          filter(
            ([isSourceCompleted, isQueueEmpty, running]) =>
              isSourceCompleted && isQueueEmpty && !running,
          ),
        ),
      ),
    )
    .pipe(share());

  return {
    next: (operation: CallableOperationFn<T>) => {
      const queueSize = calculateQueueSize(strategy, queueSizeSubject.value);
      queueSizeSubject.next(queueSize);
      subject.next(operation);
    },
    results: sharedResult$,
    running: autoConnectResult
      ? merge(sharedResult$.pipe(switchMap(() => EMPTY)), running$)
      : running$,
    queueSize: autoConnectResult
      ? merge(sharedResult$.pipe(switchMap(() => EMPTY)), queueSize$)
      : queueSize$,
  };
}

export class SingleStrategyError extends Error {
  constructor() {
    super(
      'Using the single strategy, only one operation may run at the same time.',
    );
  }
}

function calculateQueueSize(strategy: StrategyType, queueSize: number) {
  if (strategy === 'single' && queueSize > 0) {
    throw new SingleStrategyError();
  }

  if (strategy === 'switch' || strategy === 'exhaust') {
    return 1;
  } else {
    return queueSize + 1;
  }
}
