import {
  first,
  merge,
  Observable,
  retryWhen,
  switchMap,
  throwError,
  timer,
} from 'rxjs';

import {RrpcMethodPath} from '../rrpc-both/rrpc-method-path';
import {RrpcClientError} from '../rrpc-client/error/rrpc-client.error';
import {RrpcClient} from '../rrpc-client/rrpc-client';
import {refreshWhen} from '../rx/refresh-when';
import {ValueTransformer} from '../transformer/value-transformer';

import {AzaRrpcRef} from './aza-rrpc-ref';

/**
 * Automatically re-subscribe when draining or connection error occurs.
 */
export class AzaRrpcPersistentRef<
  P extends object,
  A extends readonly unknown[],
  N,
> extends AzaRrpcRef<P, A, N> {
  public constructor(
    path: RrpcMethodPath,
    dto: new (...params: A) => P,
    next: ValueTransformer<N>,
    private readonly _retryPeriod = 10000,
  ) {
    super(path, dto, next);
  }

  protected override _createObservable(
    client: RrpcClient,
    data: P,
  ): Observable<N> {
    return super._createObservable(client, data).pipe(
      refreshWhen(client.draining$),
      retryWhen((errors$) =>
        errors$.pipe(
          switchMap((error) =>
            error instanceof RrpcClientError
              ? merge(client.draining$, timer(this._retryPeriod)).pipe(first())
              : throwError(error),
          ),
        ),
      ),
    );
  }
}
