/**
 * @author 贝才[beica1@outook.com]
 * @date 2021/2/8
 * @description
 *   feed.ts of essential
 */
import * as R from 'ramda'
import Socket from '../net/websocket/WebSocket'
import queue from '../task/task.queue'
import { on, off, emit, Handler } from '../tools/event'
import { getSpeedAc } from './../../src/common/goSpeed'

const STREAM_ID = Math.random()
  .toString(36)
  .slice(2, 7)

interface WatchOption<T> {
  immediate?: boolean
  from?: T
}

export enum SubscribeMode {
  UNION,
  REPLACE
}

type MarketFeedEvents = 'connecting' | 'open' | 'closed'

interface Options<T> {
  subscribeMode?: SubscribeMode

  server: string

  generateSubscribeCodes(codes: Array<string>): string

  parseQuoteMessage(message: string | Buffer | Blob): T | null

  on?: Record<MarketFeedEvents, () => void>
}

type TMessageEssential = {
  code: string
  updatetime: string
}

class MarketFeed<TMessage extends TMessageEssential> {
  private readonly options
  private readonly cache: Record<string, TMessage> = {}
  private readonly queue
  private socket: Socket | null = null
  private persistCodes: Array<string> = []
  private subscribedCodes: Array<string> = []
  private connecting = false

  constructor(options: Options<TMessage>) {
    this.options = options
    this.queue = queue(() => this.send())
  }

  private isOpened() {
    return this.socket?.isOpen()
  }

  restoreSubscribe() {
    this.send()
  }

  private onSocketOpen(resolve: (value: unknown) => void) {
    return (socket: Socket) => {
      this.emit('open')
      this.socket = socket
      this.connecting = false
      resolve(true)
      this.restoreSubscribe()
    }
  }

  private onSocketClosed(reject: () => void) {
    return () => {
      // release and reset
      this.emit('closed')
      if (this.isOpened()) {
        ; (this.socket as Socket).release()
      }
      this.socket = null
      this.connecting = false
      reject()
    }
  }

  private emit(method: MarketFeedEvents) {
    const fn = this.options.on?.[method]
    if (typeof fn === 'function') {
      fn()
    }
  }

  private connectSocket() {
    this.connecting = true
    return new Promise((resolve, reject) => {
      new Socket(this.options.server, { binaryType: 'arraybuffer' })
        .on('message', this.handleMessage.bind(this))
        .on('open', this.onSocketOpen(resolve).bind(this))
        .on('closed', this.onSocketClosed(reject).bind(this))
        .on('connecting', () => {
          this.emit('connecting')
        })
        .connect()
    })
  }

  private start() {
    if (this.isOpened()) return true
    if (this.connecting) return true
    return this.connectSocket()
  }

  private static payload(codes: string): string {
    const ac = getSpeedAc('uuid')
    let k = ''
    if (ac) {
      k = `,"uuid":"${ac}"`
    }
    return `{"p":{"codes":"${codes.replaceAll('EX|', 'XTREND|')}","device":"5"${k}},"type":"rtc"}`
  }

  private send() {
    if (this.isOpened()) {
      const codes = R.union(this.persistCodes, this.subscribedCodes)
      const message = MarketFeed.payload(this.options.generateSubscribeCodes(codes))
        ; (this.socket as Socket).send(message)
      return true
    }
    return false
  }

  private updateExist(codes: Array<string>, persist: boolean) {
    if (persist) {
      this.persistCodes = codes
    } else {
      this.subscribedCodes = codes
    }
  }

  private update(codes: Array<string>, mode: 'union' | 'diff' = 'union', persist: boolean = false) {
    if (this.options.subscribeMode === SubscribeMode.REPLACE) {
      this.updateExist(codes, persist)
      return
    }
    const pre = persist ? this.persistCodes : this.subscribedCodes
    const method = mode === 'union' ? 'union' : 'difference'
    const next = R[method](pre, codes)
    this.updateExist(next, persist)
  }

  reConnect() {
    this.socket?.retry(true)
  }

  subscribe(codes: Array<string>, persist: boolean = false) {
    this.start()
    return this.queue(() => this.update(codes, 'union', persist))
  }

  unSubscribe(codes: Array<string>, persist: boolean = false) {
    return this.queue(() => this.update(codes, 'diff', persist))
  }

  /**
   * genera watch event name
   * @param code
   */
  private static getWatchEvent(code: string) {
    if (!code) console.error("MONITOR SYMBOL'S CODE IS EMPTY")
    return `stream.${STREAM_ID}.${code}`
  }

  private static broadcast<T extends TMessageEssential>(message: T) {
    emit(MarketFeed.getWatchEvent(message.code), message)
    emit(MarketFeed.getWatchEvent('*'), message)
  }

  private handleMessage(message: string | Blob | Buffer) {
    const quoteMessage = this.options.parseQuoteMessage(message)
    if (quoteMessage) {
      this.updateLatestQuote(quoteMessage)
      MarketFeed.broadcast(quoteMessage)
    }
  }

  updateLatestQuote(quote: TMessage, force?: boolean) {
    const latest = this.getLatestQuote(quote.code)
    if (force || !latest || latest.updatetime < quote.updatetime) {
      this.cache[quote.code] = quote
    }
  }

  getLatestQuote(code: string): TMessage | undefined {
    return this.cache[code]
  }

  /**
   * watch a specified symbol flush event
   * @param {string} code
   * @param {string} handler
   * @param {object} [options]
   * @returns {function} unwatch
   */
  watch(
    code: string | '*', // * represent any kind of code
    handler: Handler,
    options?: WatchOption<TMessage>
  ): Noop {
    const event = MarketFeed.getWatchEvent(code)

    // remove last subscribe
    off(event, handler)

    // subscribe new
    on(event, handler)

    if (options?.from) {
      this.updateLatestQuote(options.from)
    }

    if (options?.immediate) {
      const quote = this.getLatestQuote(code)
      if (quote) handler(quote)
    }

    return () => off(event, handler)
  }
}

const makeFeed = <T extends TMessageEssential>(options: Options<T>) => new MarketFeed<T>(options)

export default makeFeed
