import { useEffect, useMemo, useRef } from 'react';
import {
    Subject,
    catchError,
    concat,
    forkJoin,
    map,
    merge,
    of,
    scan,
    take,
    BehaviorSubject,
    exhaustMap,
    share,
    mergeMap,
    ignoreElements,
    bufferWhen,
    EMPTY,
    from,
} from 'rxjs';

import { useConstCallback } from '@perpay-web/hooks/useConstCallback';

// These states are intended to be generic and similar to the Promise states,
// so they are different than the data module state values even though they correspond.
export const SIDE_EFFECT_INITIAL_STATE = 'initial';
export const SIDE_EFFECT_PENDING_STATE = 'pending';
export const SIDE_EFFECT_COMPLETE_STATE = 'complete';
export const SIDE_EFFECT_ERROR_STATE = 'error';

const INITIAL_STATE_VALUE = {
    state: SIDE_EFFECT_INITIAL_STATE,
    errors: {},
    value: null,
};

const getSubscription = (
    stateSubject$,
    sideEffect$,
    sideEffectResult$,
    reset$,
) => {
    // First emit the initial state value to initialize the scan() below
    const initialState$ = stateSubject$.pipe(take(1));

    // Emit a state change when a new side-effect is kicked off
    const loadingState$ = sideEffect$.pipe(
        exhaustMap(() =>
            concat(
                of({
                    state: SIDE_EFFECT_PENDING_STATE,
                    errors: {},
                }),
                sideEffectResult$.pipe(
                    take(1),
                    ignoreElements(),
                    catchError(() => EMPTY),
                ),
            ),
        ),
    );

    // Emit a state change back to the initial state whenever reset$ emits.
    const resetState$ = reset$.pipe(map(() => INITIAL_STATE_VALUE));

    // Emit a state change when the side-effect completes or errors.
    const successOrErrorState$ = merge(
        sideEffectResult$,
        // We use Observable error handling here to interrupt the result
        // if the result is pending when the reset is initiated.
        // This prevents the success or error result from overwriting the reset state.
        resetState$.pipe(
            mergeMap(() => {
                throw new Error('reset');
            }),
        ),
    ).pipe(
        map((results) => ({
            state: SIDE_EFFECT_COMPLETE_STATE,
            errors: {},
            value: results,
        })),
        catchError((e, source$) => {
            // In the reset case, just resubscribe to the original Observable
            // so that we listen for the next side-effect.
            if (e.message === 'reset') {
                return source$;
            }

            return concat(
                of({
                    state: SIDE_EFFECT_ERROR_STATE,
                    errors: { message: [e.message || e.response.message] },
                }),
                source$,
            );
        }),
    );

    // When each state change occurs and emits the next state,
    // we want to merge it into the previous state and expose
    // it via the stateSubject$ stream.
    const subscription = merge(
        initialState$,
        loadingState$,
        successOrErrorState$,
        resetState$,
    )
        .pipe(
            // Store the updated state in a persistent state object
            scan((acc, d) => ({ ...acc, ...d }), {}),
        )
        .subscribe(stateSubject$);

    return subscription;
};

export const useSideEffect = (sideEffect) => {
    // A side-effect is a function that returns a Promise, an Observable or Array of values.
    const sideEffectCb = useConstCallback(sideEffect);

    // sideEffect$ represents a stream of side-effects.
    // Since Observables themselves model side-effects, sideEffect$ is a higher-order Observable.
    // We create it as a Subject so that it is a hot observable, meaning one source can emit to
    // multiple observers (multicasting) and side-effects can be emitted even when
    // no subscriber is listening.
    // We do not expose the raw Subject reference to limit the API surface area.
    const { sideEffect$, next } = useMemo(() => {
        const sideEffectSubject$ = new Subject();

        return {
            sideEffect$: sideEffectSubject$,
            next: (nextSideEffectParam) => {
                sideEffectSubject$.next(nextSideEffectParam);
            },
        };
    }, []);

    // sideEffectResult$ represents a stream of completed or errored side-effects.
    // The mergeMap operator flattens the stream of side-effects to be a first-order stream of results.
    // This is useful to expose to enable multiple streams of side-effects to be composed using
    // standard stream-manipulation tools like forkJoin.
    // A composed side-effect is itself a side-effect, making the pattern highly flexible and
    // capable of elegantly representing a wide variety of changes in the system.
    const sideEffectResult$ = useMemo(
        () =>
            sideEffect$.pipe(
                // Run the side-effect and flatten with an exhaust strategy
                exhaustMap((nextSideEffectParam) => {
                    const result$ = from(
                        sideEffectCb(nextSideEffectParam),
                    ).pipe(share());

                    // When the side-effect has completed, it could have emitted one or more times.
                    // We buffer those emissions so that the complete state isn't triggered multiple times.
                    // If it has emitted once, extract it from the array and forward that single value to subscribers.
                    // If it has emitted multiple items, forward that array of values together.
                    const resultComplete$ = result$.pipe(ignoreElements());
                    return result$.pipe(
                        bufferWhen(() => resultComplete$),
                        map((results) =>
                            results.length === 1 ? results[0] : results,
                        ),
                    );
                }),
                // The exhaustMap operator subscribes to the sideEffect$ Subject and
                // returns a non-Subject Observable. We need the multicast behavior
                // of a Subject for the sideEffectResult$ stream to work as expected,
                // so we pipe to the share() operator to get a Subject back
                share(),
            ),
        [sideEffectCb, sideEffect$],
    );

    // reset$ represents a stream of intentions to reset the side-effect state.
    // We do not expose the raw Subject reference to limit the API surface area.
    const { reset$, reset } = useMemo(() => {
        const resetSubject$ = new Subject();

        return {
            reset$: resetSubject$.asObservable(),
            reset: () => resetSubject$.next(),
        };
    }, []);

    // stateSubject$ represents a stream of states that the side-effect moves through.
    // It is a BehaviorSubject so that it exposes a `.value` property enabling consumers
    // to access the current state easily without invoking any Observable machinery.
    const stateSubject$ = useMemo(
        () => new BehaviorSubject(INITIAL_STATE_VALUE),
        [],
    );

    // We use a ref here for the subscription instead of a useEffect.
    // This is because a child's useEffects will run BEFORE a parent's useEffects.
    // In perpay-web, often a parent component will define a hook and then pass
    // a callback to a child component, like an onMount prop.
    // If we define this listener here in a useEffect, and pass the emit callback to a child,
    // then the listener will start listening in the parent useEffect AFTER the child's
    // has already fired its callback within its useEffect.
    // If we use a ref to initialize the listener only once, then we
    // start listening immediately when the component first renders, before
    // any of its children can call dataRequest.
    const subscriptionRef = useRef(null);
    if (!subscriptionRef.current) {
        subscriptionRef.current = getSubscription(
            stateSubject$,
            sideEffect$,
            sideEffectResult$,
            reset$,
        );
    }
    useEffect(() => {
        const subscription = subscriptionRef.current;
        return () => subscription.unsubscribe();
    }, []);

    return {
        state$: stateSubject$,
        sideEffect$: sideEffectResult$,
        next,
        reset,
    };
};

// A side-effect is a function that returns an Observable-like value,
// so to compose side-effects we must also return a function that returns an Observable-like value.
// We use forkJoin to ensure that the side-effect is not complete until all the inner side-effects
// are complete. forkJoin also composes the error states of the inner side-effects.
export const composeSideEffects =
    (...sideEffectObservables) =>
    () =>
        // NOTE: You may see a false-positive deprecation warning here.
        // The checker is unable to verify that the array of Observables has
        // the correct type.
        // Rest assured, this usage is valid and not deprecated.
        forkJoin(
            sideEffectObservables.map((sideEffect$) =>
                // We pipe take(1) to communicate that we want this side-effect to only care
                // about the first result from each individual side-effect. forkJoin only emits when
                // the inner Observables emit complete, and sideEffect$ does not complete.
                sideEffect$.pipe(take(1)),
            ),
        );
