import {Observable, Subject} from 'rxjs';

import {RrpcMethodPath} from '../rrpc-both/rrpc-method-path';

import {RrpcClientConnection} from './rrpc-client-connection';
import {RrpcClientSocketFactory} from './rrpc-client-socket-factory';

/**
 * @see {@link https://github.com/azarusio/aza-workspace/wiki/RRPC-gen-2 RRPC protocol wiki}
 */
export class RrpcClient {
  private _connection: RrpcClientConnection | null = null;

  private readonly _drainingSignal$ = new Subject<void>();

  public readonly draining$: Observable<void>;

  public constructor(
    private readonly _deadTimeout: number,
    private readonly _socketFactory: RrpcClientSocketFactory,
  ) {
    this.draining$ = this._drainingSignal$.asObservable();
  }

  public unsafeRawLiteralCall(
    path: RrpcMethodPath,
    params: unknown,
  ): Observable<unknown> {
    return new Observable<unknown>((subscriber) => {
      if (this._connection === null || this._connection.useless) {
        this._connection = new RrpcClientConnection(
          this._deadTimeout,
          this._drainingSignal$,
          this._socketFactory(),
        );
      }

      return this._connection.attach(subscriber, path, params);
    });
  }
}
