import { timer, merge, NEVER, race, forkJoin, Observable } from 'rxjs';
import {
    filter,
    mergeMap,
    switchMap,
    take,
    withLatestFrom,
} from 'rxjs/operators';
import { ofType } from '@perpay-web/observable/operators/ofType';
import { latest } from '@perpay-web/observable/latest';

const DEFAULT_TIMEOUT_MS = 15000;

const toArray = (value) => {
    if (value === null || typeof value === 'undefined') {
        return [];
    }

    return Array.isArray(value) ? value : [value];
};

const normalizeArg = (func) =>
    typeof func === 'function' ? func : () => toArray(func);

const count = (list, value) => list.filter((item) => item === value).length;

const unique = (list) =>
    list.filter((item, index) => list.indexOf(item) === index);

/**
 * Main use-case: An epic needs to dispatch actions, wait for a future action emitted by
 * another epic, and respond with more actions. Additionally, error actions must be handled,
 * and it must stop listening if the awaited actions do not arrive within a given
 * time period.
 *
 * This encapsulates and streamlines observable manipulations that would otherwise be duplicated
 * inconsistently across various epics.
 *
 * BEHAVIORS:
 * 1. Accept a collection(*) of actionTypes to wait for
 * 2. Accept a collection(*) of actions that returns to dispatch immediately
 * 3. Accept a collection(*) of actions to dispatch after waiting
 * 4. Accept a collection(*) of error actionTypes to make us stop waiting and dispatch more actions
 * 5. Accept a collection(*) of actions to dispatch if an error action type is found
 * 6. Stop waiting if too much time passes and if so, dispatch a collection(*) of timeout actions
 *
 * (*) Any collection argument may also be passed as a function that returns a collection.
 *
 * These functions may accept the following arguments:
 * If a function is passed to `initialDispatch`, it may accept a `state` parameter.
 * If a function is passed to `waitFor`, it may accept a `state` parameter.
 * If a function is passed to `errorDispatch`, it may accept an `errorAction` and `state` parameter
 * If a function is passed to `waitForDispatch` it may accept a `state` and `results` parameter.
 *   `results` represents the actions from the `waitFor` collection.
 * If a function is passed to `timeoutDispatch` it may accept a `state` parameter.
 */
export function dispatchObservable({
    action$,
    state$,
    initialDispatch,
    waitFor,
    waitForDispatch,
    errors,
    errorDispatch,
    timeout = DEFAULT_TIMEOUT_MS,
    timeoutDispatch,
}) {
    // Allow these arguments to be passed as arrays
    // or as functions that return arrays or observables.
    const initialDispatchFn = normalizeArg(initialDispatch);
    const waitForFn = normalizeArg(waitFor);
    const waitForDispatchFn = normalizeArg(waitForDispatch);
    const errorFn = normalizeArg(errorDispatch);
    const timeoutFn = normalizeArg(timeoutDispatch);

    // This observable will immediately dispatch the actions returned by the initialDispatchFn
    const initialDispatch$ = latest(state$).pipe(
        mergeMap((state) => initialDispatchFn(state)),
    );

    // This observable will wait for the action$ observable to emit the N action types
    // returned by the waitForFn. Duplicate action types are handled by creating observables
    // that wait for each unique action type, and counting the number of action types that match
    // in the original array.
    // The waiting mechanism is provided by forkJoin, which is like Promise.all for observables.
    const waitFor$ = latest(state$).pipe(
        switchMap((state) => {
            const waitForArray = waitForFn(state);
            const uniqueWaitForArray = unique(waitForArray);

            return forkJoin(
                uniqueWaitForArray.map((type) =>
                    action$.pipe(
                        filter((action) => {
                            if (!type) {
                                throw new Error(
                                    'missing or undefined type in dispatchObservable waitFor',
                                );
                            }

                            if (typeof type === 'function') {
                                return type(action);
                            }
                            return action.type === type;
                        }),
                        take(count(waitForArray, type)),
                    ),
                ),
            );
        }),
        withLatestFrom(state$),
        mergeMap(([results, state]) => waitForDispatchFn(state, results)),
    );

    // This observable will wait for any given action types to be emitted and take only the first.
    // If an error action type is received, the actions returned by the errorFn will be dispatched.
    // If no errors are passed, we assign this a reference to NEVER since no action will be emitted.
    const errors$ =
        errors && errors.length
            ? action$.pipe(
                  ofType(...errors),
                  take(1),
                  withLatestFrom(state$),
                  mergeMap(([errorAction, state]) =>
                      errorFn(errorAction, state),
                  ),
              )
            : NEVER;

    // This observable will wait until the timeout has elapsed and then emit the actions
    // returned by the timeoutFn.
    const timeout$ = timer(timeout).pipe(
        withLatestFrom(state$),
        mergeMap(([, state]) => timeoutFn(state)),
    );

    // Create an observable that immediately subscribes to initialDispatch$ and
    // continues with the results of only one of error$, waitFor$, or timeout$,
    // whichever is first.
    return merge(initialDispatch$, race(errors$, waitFor$, timeout$));
}
