import {NextObserver, Subscriber, Subscription, TeardownLogic} from 'rxjs';

import {RrpcClientCode} from '../rrpc-both/rrpc-client-code';
import {RrpcClientFrame} from '../rrpc-both/rrpc-client-frame';
import {RrpcClientFrameType} from '../rrpc-both/rrpc-client-frame-type';
import {RrpcMethodPath} from '../rrpc-both/rrpc-method-path';
import {RrpcPanicError} from '../rrpc-both/rrpc-panic.error';
import {RrpcServerCode} from '../rrpc-both/rrpc-server-code';
import {RrpcServerFrame} from '../rrpc-both/rrpc-server-frame';
import {RrpcServerFrameType} from '../rrpc-both/rrpc-server-frame-type';
import {RrpcSubscriptionId} from '../rrpc-both/rrpc-subscription-id';
import {isEnumValue} from '../utils/is-enum-value';

import {ClosedUnexpectedlyRrpcClientError} from './error/closed-unexpectedly-rrpc-client.error';
import {ConnectionTimeoutRrpcClientError} from './error/connection-timeout-rrpc-client.error';
import {UnexpectedConnectionRrpcClientError} from './error/unexpected-connection-rrpc-client.error';
import {RrpcClientSocket} from './rrpc-client-socket';
import {RrpcClientSocketEventMap} from './rrpc-client-socket-event-map';

const GOING_AWAY = 1001;

class FutureSubscription extends Subscription {
  public constructor(
    public readonly subscriber: Subscriber<unknown>,
    public readonly path: RrpcMethodPath,
    public readonly params: unknown,
    unsubscribe: () => void,
  ) {
    super(unsubscribe);
  }
}

// TODO add timeToLive
// TODO add subscription queue (prevent send already unsubscribed subscription)
export class RrpcClientConnection {
  private _draining = false;

  private _newSubscribeId = 0;

  private _futureSubscription = new Set<FutureSubscription>();

  private _currentSubscribers = new Map<
    RrpcSubscriptionId,
    Subscriber<unknown>
  >();

  private _deadTimeoutId: ReturnType<typeof setTimeout> | null = null;

  public constructor(
    private readonly _deadTimeout: number,
    private readonly _drainingObserver: NextObserver<void>,
    private readonly _socket: RrpcClientSocket,
  ) {
    this._socket.addEventListener('close', this._onSocketClose.bind(this));
    this._socket.addEventListener('error', this._onSocketError.bind(this));
    this._socket.addEventListener('message', this._onSocketMessage.bind(this));
    this._socket.addEventListener('open', this._onSocketOpen.bind(this));
    this._restartDeadTimeout();
  }

  private _sendFrame(frame: RrpcClientFrame): void {
    this._socket.send(JSON.stringify(frame));
  }

  private _killAllActiveSubscriptions(errorFactory: () => unknown): void {
    const {_futureSubscription, _currentSubscribers} = this;

    // fill new to prevent sending unsubscribe after close
    this._futureSubscription = new Set<FutureSubscription>();
    this._currentSubscribers = new Map<
      RrpcSubscriptionId,
      Subscriber<unknown>
    >();

    for (const subscriber of _futureSubscription) {
      subscriber.subscriber.error(errorFactory());
    }

    for (const subscriber of _currentSubscribers.values()) {
      subscriber.error(errorFactory());
    }
  }

  private _closeSocketAndClear(): void {
    if (this._deadTimeoutId !== null) {
      clearTimeout(this._deadTimeoutId);
    }

    this._socket.close(RrpcClientCode.NO_ACTIVE_SUBSCRIPTIONS);
  }

  private _sendSubscribeFrame(
    subscriber: Subscriber<unknown>,
    path: RrpcMethodPath,
    params: unknown,
  ): TeardownLogic {
    const id = this._newSubscribeId++ as RrpcSubscriptionId;

    this._sendFrame([RrpcClientFrameType.SUBSCRIBE, id, path, params]);
    this._currentSubscribers.set(id, subscriber);
    return this._onSubscriptionTeardown.bind(this, id);
  }

  private _onSubscriptionTeardown(id: RrpcSubscriptionId): void {
    // prevent send unsubscribe if server send complete/error
    if (!this._currentSubscribers.has(id)) {
      return;
    }

    this._currentSubscribers.delete(id);

    if (this._currentSubscribers.size === 0) {
      this._closeSocketAndClear();
      return;
    }

    this._sendFrame([RrpcClientFrameType.UNSUBSCRIBE, id]);
  }

  private _onNextFrame(id: RrpcSubscriptionId, next: unknown): void {
    this._currentSubscribers.get(id)?.next(next);
  }

  private _onCompleteFrame(id: RrpcSubscriptionId): void {
    const subscriber: Subscriber<unknown> | undefined =
      this._currentSubscribers.get(id);

    if (subscriber === undefined) {
      return;
    }

    this._currentSubscribers.delete(id);
    subscriber.complete();

    if (this._currentSubscribers.size === 0) {
      this._closeSocketAndClear();
    }
  }

  private _onErrorFrame(id: RrpcSubscriptionId, error: unknown): void {
    const subscriber: Subscriber<unknown> | undefined =
      this._currentSubscribers.get(id);

    if (subscriber === undefined) {
      return;
    }

    this._currentSubscribers.delete(id);
    subscriber.error(error); // TODO deserialize error

    if (this._currentSubscribers.size === 0) {
      this._closeSocketAndClear();
    }
  }

  private _onPingFrame(): void {
    this._sendFrame([RrpcClientFrameType.PONG]);
  }

  private _onDrainingFrame(): void {
    this._draining = true;
    this._drainingObserver.next();
  }

  private _onSocketClose({code}: RrpcClientSocketEventMap['close']): void {
    if (code === GOING_AWAY) {
      // GOING_AWAY sends Firefox when a tab is closed.
      // There is no point in trying to do something.
      return;
    }

    this._killAllActiveSubscriptions(() =>
      isEnumValue(code, RrpcServerCode)
        ? new RrpcPanicError(code)
        : new ClosedUnexpectedlyRrpcClientError(code),
    );
  }

  private _onSocketError(): void {
    this._killAllActiveSubscriptions(
      () => new UnexpectedConnectionRrpcClientError(),
    );
  }

  private _onSocketMessage({data}: RrpcClientSocketEventMap['message']): void {
    this._restartDeadTimeout();

    if (typeof data === 'string') {
      const frame: RrpcServerFrame = JSON.parse(data);

      switch (frame[0]) {
        case RrpcServerFrameType.NEXT:
          this._onNextFrame(frame[1], frame[2]);
          break;
        case RrpcServerFrameType.COMPLETE:
          this._onCompleteFrame(frame[1]);
          break;
        case RrpcServerFrameType.ERROR:
          this._onErrorFrame(frame[1], frame[2]);
          break;
        case RrpcServerFrameType.PING:
          this._onPingFrame();
          break;
        case RrpcServerFrameType.DRAINING:
          this._onDrainingFrame();
          break;
      }
    }
  }

  private _onSocketOpen(): void {
    const {_futureSubscription} = this;

    if (_futureSubscription.size === 0) {
      this._socket.close(RrpcClientCode.NO_FUTURE_SUBSCRIPTIONS);
      return;
    }

    this._futureSubscription = new Set<FutureSubscription>();

    for (const item of _futureSubscription) {
      item.add(
        this._sendSubscribeFrame(item.subscriber, item.path, item.params),
      );
    }
  }

  private _onDead(): void {
    this._deadTimeoutId = null;
    this._socket.close(RrpcClientCode.CONNECTION_TIMEOUT);
    this._killAllActiveSubscriptions(
      () => new ConnectionTimeoutRrpcClientError(),
    );
  }

  private _restartDeadTimeout(): void {
    if (this._deadTimeoutId !== null) {
      clearTimeout(this._deadTimeoutId);
    }

    this._deadTimeoutId = setTimeout(
      this._onDead.bind(this),
      this._deadTimeout,
    );
  }

  public get useless(): boolean {
    return (
      this._draining ||
      this._socket.readyState === this._socket.CLOSING ||
      this._socket.readyState === this._socket.CLOSED
    );
  }

  public attach(
    subscriber: Subscriber<unknown>,
    path: RrpcMethodPath,
    params: unknown,
  ): TeardownLogic {
    if (this._socket.readyState !== this._socket.OPEN) {
      const early = new FutureSubscription(subscriber, path, params, () => {
        this._futureSubscription.delete(early);
      });

      this._futureSubscription.add(early);

      return early;
    }

    return this._sendSubscribeFrame(subscriber, path, params);
  }
}
