import { IClientOptions, IConnectPacket, MqttClient, Packet, QoS } from 'mqtt'
import { mittError } from '@/common/mitt/mitt'
import MqttException from '@/common/errors/MqttException'
// @ts-ignore
import * as mqttConnect from 'mqtt/dist/mqtt.min.js'
import { IPublishPacket } from 'mqtt-packet'
import { DATA_COORDINATE, MAP_COORDINATE } from '@/common/global/JlinkValues'
import JlinkTask from '@/common/global/JlinkTask'
import CoordinateUtil from '@/common/global/util/CoordinateUtil'

export default class BasicMqtt   {
  listener: Map<string, { qos: 0 | 1 | 2, handler: Array<MqttMsgHandler<any>> }> = new Map()
  private lastMessage: Map<string, any> = new Map()
  private coordinate = new CoordinateUtil()
  private mqtt?: MqttClient
  private connecting?:boolean

  getConnected () {
    return this.mqtt?.connected || false
  }

  getConnecting () {
    return this.connecting || false
  }

  private heartInterval:NodeJS.Timer|undefined

  private async connectMqtt (res:ConnectionOptions): Promise<void> {
    const _this = this
    const option = {
      clientId: res.clientId,
      username: res.username,
      password: res.password,
      keepalive: 60,
      clean: true,
      reconnectPeriod: 1000
    } as IClientOptions
    await new Promise((resolve, reject) => {
      _this.mqtt = (mqttConnect).connect(`${res.protocol}://${res.host}:${res.port}/${res.path}`, option)
      _this.mqtt!.once('connect', (packet: IConnectPacket) => {
        console.log('mqtt connect', _this.listener)
        for (const topic of Array.from(_this.listener.keys())) {
          _this.subscribe(topic, _this.listener.get(topic)!.qos)
        }
        _this.mqtt!.on('message', (topic: string, message: Buffer, packet: IPublishPacket) => {
          const messageStr = String(message)
          if (messageStr) {
            JlinkTask.catchAwait(async function () {
              const sub = _this.listener.get(topic)?.handler
              let msg
              for (const sub1 of (sub || [])) {
                if (sub1.type === 'AIRCRAFT_OSD') {
                  const message = JSON.parse(messageStr) as MqttAircraftOsdResult
                  if (message.lng && message.lat) {
                    const coordinate = _this.coordinate.coordinateTransform({ lng: message.lng, lat: message.lat, height: message.height }, DATA_COORDINATE, MAP_COORDINATE)
                    message.lng = coordinate.lng
                    message.lat = coordinate.lat
                  }
                  // if (message.homeLongitude && message.homeLatitude) {
                  //   const coordinate = JlinkUtils.coordinate.coordinateTransform({ lng: message.homeLongitude, lat: message.homeLatitude, height: 0 }, DATA_COORDINATE, MAP_COORDINATE)
                  //   message.homeLongitude = coordinate.lng
                  //   message.homeLatitude = coordinate.lat
                  // }
                  message.payloads?.forEach(function (it) {
                    const coordinate = _this.coordinate.coordinateTransform({ lng: it.measure_target_longitude, lat: it.measure_target_latitude, height: 0 }, DATA_COORDINATE, MAP_COORDINATE)
                    it.measure_target_longitude = coordinate.lng
                    it.measure_target_latitude = coordinate.lat
                  })
                  // if ((_this.lastMessage.get(topic) as MqttAircraftOsdResult)?.modeCode && (_this.lastMessage.get(topic) as MqttAircraftOsdResult).modeCode !== 0 && message.modeCode === 0) { JlinkStorage.remove('emergency') }
                  msg = message
                } else if (sub1.type === 'DOCK_OSD') {
                  const message = JSON.parse(messageStr) as MqttDockOsdResult
                  if (message.lng && message.lat) {
                    const coordinate = _this.coordinate.coordinateTransform({ lng: message.lng, lat: message.lat, height: message.height }, DATA_COORDINATE, MAP_COORDINATE)
                    message.lng = coordinate.lng
                    message.lat = coordinate.lat
                  }
                  if (message.alternateLandPointLongitude && message.alternateLandPointLatitude) {
                    const coordinate = _this.coordinate.coordinateTransform({ lng: message.alternateLandPointLongitude, lat: message.alternateLandPointLatitude, height: message.alternateLandPointSafeLandHeight }, DATA_COORDINATE, MAP_COORDINATE)
                    message.alternateLandPointLongitude = coordinate.lng
                    message.alternateLandPointLatitude = coordinate.lat
                  }
                  msg = message
                } else if (sub1.type === 'RC_OSD') {
                  const message = JSON.parse(messageStr) as MqttRcOsdResult
                  if (message.lng && message.lat) {
                    const coordinate = _this.coordinate.coordinateTransform({ lng: message.lng, lat: message.lat, height: 0 }, DATA_COORDINATE, MAP_COORDINATE)
                    message.lng = coordinate.lng
                    message.lat = coordinate.lat
                  }
                  msg = message
                } else {
                  msg = JSON.parse(messageStr)
                }
                _this.lastMessage.set(topic, msg)

                let res:any
                if (sub1.contextThis) {
                  if (sub1.contextThis.deref()) {
                    res = await sub1.apply(sub1.contextThis.deref(), [msg,topic])
                  } else {
                    console.error('引用对象已经被未收,方法却未注销，请检查')
                  }
                } else {
                  res = await sub1(msg,topic)
                }
                if (res) {
                  console.log(' mqtt callback', res)
                }
              }
            })
          }
        })
        // websocket方式连接 60s 超时 需要发心跳数据
        // 每间隔50s发送心跳
        clearInterval(_this.heartInterval)
        _this.heartInterval = setInterval(async function () {
          JlinkTask.catchAwait(async function () {
            await _this.send('heartTopic', JSON.stringify({ heart: true }), {})
          })
        }, 50000)
        resolve('success')
      })
      _this.mqtt!.once('error', function (error:Error) {
        mittError(new MqttException(error.message))
      })
      _this.mqtt!.once('close', function () {
        clearInterval(_this.heartInterval)
        delete _this.heartInterval
        mittError(new MqttException('mqtt连接已断开'))
      })
      _this.mqtt!.once('disconnect', function () {
        clearInterval(_this.heartInterval)
        delete _this.heartInterval
        mittError(new MqttException('mqtt连接已断开'))
      })
    })
  }

  async connection (res:ConnectionOptions): Promise<void> {
    this.connecting = true
    try {
      await this.disconnection()
      await this.connectMqtt(res)
      this.connecting = false
    } catch (e:any) {
      this.connecting = false
      throw e
    }
  }

  async disconnection () {
    for (const l of Array.from(this.listener.keys())) {
      await this.unsubscribe(l)
    }
    this.mqtt?.end(true)
    delete this.mqtt
    clearInterval(this.heartInterval)
    delete this.heartInterval
  }

  async send (topic: string, mess: string, opts: IClientPublishOptions): Promise<number> {
    const _this = this
    await JlinkTask.asyncGet(() => _this.mqtt?.connected, 1000, 5, '连接已断开')
    return new Promise((resolve, reject) => {
      let received = false
      _this.mqtt?.publish(topic, mess, opts, (err?: Error, tas?: Packet) => {
        console.log(mess)
        received = true
        if (err) {
          reject(new MqttException(err.message))
        } else {
          resolve(0)
        }
      })
      setTimeout(function () {
        if (!received) {
          reject(new MqttException('发送超时'))
        }
      }, 5000)
    })
  }

  async register(topic: string, qos: QoS, handler: MqttMsgHandler<any>): Promise<boolean> {
    console.log(`register ${topic}`)
    const handlers = this.listener.get(topic)
    if (!handlers) {
      this.listener.set(topic, { qos, handler: new Array<MqttMsgHandler<any>>() })
      this.listener.get(topic)?.handler.push(handler)
      await this.subscribe(topic, qos)
      return true
    } else {
      const index = handlers.handler.indexOf(handler)
      if (index < 0) {
        handlers.handler.push(handler)
      } else {
        return false
      }
    }
    if (this.lastMessage.get(topic)) {
      handler.apply(handler.contextThis?.deref() || this, [this.lastMessage.get(topic),topic])
    }
    return true
  }

  async unregister (topic: string, handler: MqttMsgHandler<any>): Promise<boolean> {
    console.log(`unregister ${topic}`)
    const handlers = this.listener.get(topic)
    if (handlers) {
      const index = handlers.handler.indexOf(handler)
      if (index > -1) {
        handlers.handler.splice(index, 1)
      } else {
        return false
      }
      if (handlers.handler.length === 0) {
        await this.unsubscribe(topic)
        this.listener.delete(topic)
      }
      return true
    } else {
      return false
    }
  }

  private async subscribe<T> (topic: string, qos: QoS): Promise<void> {
    const _this = this
    //  await asyncGet(() => _this.mqtt?.connected, 500, 10, '注册超时')
    _this.mqtt?.subscribe(topic, { qos })
  }

  private unsubscribe (topic:string) :Promise<boolean> {
    const _this = this
    if (_this.mqtt) {
      let received = false
      return new Promise((resolve, reject) => {
        _this.mqtt?.unsubscribe(topic, undefined, function (error, p) {
          received = true
          if (error) {
            reject(new MqttException(error.message))
          } else {
            resolve(true)
          }
        })
        // setTimeout(function () {
        //   if (!received) {
        //     reject(new MqttException('反注册超时'))
        //   }
        // }, 5000)
      })
    } else {
      return Promise.resolve(false)
    }
  }
}
