client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "math/rand"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/gorilla/websocket"
  14. "github.com/pkg/errors"
  15. "gorm.io/gorm"
  16. )
  17. type BiliWsClientConfig struct {
  18. Name string
  19. Host string
  20. Port int64
  21. AuthBody string
  22. }
  23. type BiliWsClient struct {
  24. *websocket.Conn
  25. conf *BiliWsClientConfig
  26. dispather *protoDispather
  27. decoder *DecodeManager
  28. bufPool sync.Pool
  29. sequenceId int32
  30. closeFlag chan struct{}
  31. authed bool
  32. }
  33. func NewBiliWsClient(conf *BiliWsClientConfig) *BiliWsClient {
  34. if conf == nil {
  35. panic("[BiliWsClient | NewBiliWsClient] conf == nil")
  36. }
  37. c := &BiliWsClient{
  38. conf: conf,
  39. dispather: newMessageDispather(),
  40. closeFlag: make(chan struct{}),
  41. decoder: NewDecodeManager(),
  42. }
  43. var err error
  44. wsAddr := fmt.Sprintf("ws://%s:%d/sub", c.conf.Host, c.conf.Port)
  45. c.Conn, _, err = websocket.DefaultDialer.Dial(wsAddr, nil)
  46. if err != nil {
  47. log.Fatal("[BiliWsClient | NewBiliWsClient] connect err", err)
  48. return nil
  49. }
  50. log.Println("[BiliWsClient | NewBiliWsClient] connect success")
  51. c.bufPool = sync.Pool{
  52. New: func() interface{} {
  53. return new(bytes.Buffer)
  54. },
  55. }
  56. c.registerProtoHandler(OP_AUTH_REPLY, c.AuthResp)
  57. c.registerProtoHandler(OP_HEARTBEAT_REPLY, c.HeartBeatResp)
  58. c.registerProtoHandler(OP_SEND_SMS_REPLY, c.MsgResp)
  59. err = c.sendAuth(c.conf.AuthBody)
  60. if err != nil {
  61. log.Fatal("[BiliWsClient | NewBiliWsClient] sendAuth err:", err)
  62. return nil
  63. }
  64. return c
  65. }
  66. func (c *BiliWsClient) sendAuth(authBody string) (err error) {
  67. p := &Proto{
  68. Operation: OP_AUTH,
  69. Body: []byte(authBody),
  70. }
  71. return c.sendMsg(p)
  72. }
  73. func (c *BiliWsClient) Run() {
  74. wg := sync.WaitGroup{}
  75. wg.Add(2)
  76. go func() {
  77. defer wg.Done()
  78. c.doReadLoop()
  79. }()
  80. go func() {
  81. defer wg.Done()
  82. c.doEventLoop()
  83. }()
  84. wg.Done()
  85. }
  86. func (c *BiliWsClient) sendHeartBeat() {
  87. if !c.authed {
  88. return
  89. }
  90. msg := &Proto{
  91. Operation: OP_HEARTBEAT,
  92. SequenceId: c.sequenceId,
  93. }
  94. c.sequenceId++
  95. err := c.sendMsg(msg)
  96. if err != nil {
  97. log.Fatal("[BiliWsClient | sendHeartBeat] err", err)
  98. return
  99. }
  100. log.Println("[BiliWsClient | sendHeartBeat] seq:", msg.SequenceId)
  101. }
  102. func (c *BiliWsClient) registerProtoHandler(cmd int32, logic ProtoLogic) {
  103. c.dispather.Register(cmd, logic)
  104. }
  105. func (c *BiliWsClient) Close() {
  106. }
  107. func (c *BiliWsClient) sendMsg(msg *Proto) (err error) {
  108. dataBuff := c.bufPool.Get().(*bytes.Buffer)
  109. packLen := int32(RawHeaderSize + len(msg.Body))
  110. msg.HeaderLength = RawHeaderSize
  111. binary.Write(dataBuff, binary.BigEndian, packLen)
  112. binary.Write(dataBuff, binary.BigEndian, int16(RawHeaderSize))
  113. binary.Write(dataBuff, binary.BigEndian, msg.Version)
  114. binary.Write(dataBuff, binary.BigEndian, msg.Operation)
  115. binary.Write(dataBuff, binary.BigEndian, msg.SequenceId)
  116. binary.Write(dataBuff, binary.BigEndian, msg.Body)
  117. err = c.Conn.WriteMessage(websocket.BinaryMessage, dataBuff.Bytes())
  118. if err != nil {
  119. err = errors.Wrapf(err, "[BiliWsClient | SendMsg] WriteMessage err")
  120. return
  121. }
  122. return
  123. }
  124. func (c *BiliWsClient) ReadMsg() (p *Proto, err error) {
  125. _, buf, err := c.Conn.ReadMessage()
  126. if err != nil {
  127. err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] conn err")
  128. return
  129. }
  130. if len(buf) < RawHeaderSize {
  131. err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] buf:%d less", len(buf))
  132. return
  133. }
  134. p = &Proto{
  135. PacketLength: int32(binary.BigEndian.Uint32(buf[PackOffset:HeaderOffset])),
  136. HeaderLength: int16(binary.BigEndian.Uint16(buf[HeaderOffset:VerOffset])),
  137. Version: int16(binary.BigEndian.Uint16(buf[VerOffset:OperationOffset])),
  138. Operation: int32(binary.BigEndian.Uint32(buf[OperationOffset:SeqIdOffset])),
  139. SequenceId: int32(binary.BigEndian.Uint32(buf[SeqIdOffset:])),
  140. }
  141. if p.PacketLength < 0 || p.PacketLength > MaxPackSize {
  142. err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] PacketLength:%d err", p.PacketLength)
  143. return
  144. }
  145. if p.HeaderLength != RawHeaderSize {
  146. err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] HeaderLength:%d err", p.PacketLength)
  147. return
  148. }
  149. if bodyLen := int(p.PacketLength - int32(p.HeaderLength)); bodyLen > 0 {
  150. p.Body = buf[p.HeaderLength:p.PacketLength]
  151. } else {
  152. err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] BodyLength:%d err", bodyLen)
  153. return
  154. }
  155. p.BodyMuti, err = c.decoder.Decode(int64(p.Version), p.Body)
  156. if len(p.BodyMuti) > 0 {
  157. p.Body = p.BodyMuti[0]
  158. }
  159. return
  160. }
  161. func (c *BiliWsClient) doEventLoop() {
  162. ticker := time.NewTicker(time.Second * 5)
  163. for {
  164. select {
  165. case <-c.closeFlag:
  166. goto exit
  167. case <-ticker.C:
  168. c.sendHeartBeat()
  169. default:
  170. }
  171. }
  172. exit:
  173. c.Close()
  174. }
  175. func (c *BiliWsClient) doReadLoop() {
  176. for {
  177. msg, err := c.ReadMsg()
  178. if err != nil {
  179. log.Fatal("[BiliWsClient | ReadMsg] err:", err)
  180. goto exit
  181. }
  182. err = c.dispather.Do(msg)
  183. if err != nil {
  184. log.Fatal("[BiliWsClient | ReadMsg] dispather err:", err)
  185. goto exit
  186. }
  187. }
  188. exit:
  189. c.Close()
  190. }
  191. func (c *BiliWsClient) AuthResp(msg *Proto) (err error) {
  192. resp := &AuthRespParam{}
  193. if err = json.Unmarshal(msg.Body, resp); err != nil {
  194. err = errors.Wrapf(err, "[BiliWsClient | AuthResp] Unmarshal err")
  195. return
  196. }
  197. if resp.Code != 0 {
  198. err = fmt.Errorf("[BiliWsClient | AuthResp] code:%d", resp.Code)
  199. return
  200. }
  201. c.authed = true
  202. log.Println("[BiliWsClient | AuthResp] auth success")
  203. return
  204. }
  205. func (c *BiliWsClient) HeartBeatResp(msg *Proto) (err error) {
  206. log.Println("[BiliWsClient | HeartBeatResp] recv HeartBeat resp", msg.Body)
  207. return
  208. }
  209. //MsgResp 可以这里做回调
  210. func (c *BiliWsClient) MsgResp(msg *Proto) (err error) {
  211. log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d \n", msg.Version)
  212. //
  213. rds, err := RdsConn()
  214. if err != nil {
  215. log.Println("err rds", err)
  216. return
  217. }
  218. defer rds.Close()
  219. //
  220. mapFansName := map[int64]string{
  221. 14578426: "战姬众",
  222. 23225527: "NB人",
  223. 23303212: "夏韭菜",
  224. 23221095: "波波派",
  225. // 24087754: "", //罗兹Blazing
  226. }
  227. //
  228. mapUserKey := "map_user"
  229. db := DBConn()
  230. sqlDB, _ := db.DB()
  231. defer sqlDB.Close()
  232. // 通过一定的概率检测rds 缓存是否一致
  233. if rand.Intn(10) >= 0 {
  234. var count int64
  235. db.Table("accounts").Where("uid!=0").Count(&count)
  236. rdsLen := rds.HLen(mapUserKey).Val()
  237. log.Println("check rds cache", rdsLen, count)
  238. if rdsLen != count {
  239. tmpAccount := []Account{}
  240. db.Where("uid!=0").Select("id", "uid").Find(&tmpAccount)
  241. // 进行同步
  242. all := rds.HGetAll(mapUserKey).Val()
  243. for _, v := range tmpAccount {
  244. tmpUid := strconv.Itoa(int(v.Uid))
  245. tmpId := strconv.Itoa(int(v.Id))
  246. if _, ok := all[tmpUid]; ok {
  247. delete(all, tmpUid)
  248. } else {
  249. rds.HSet(mapUserKey, tmpUid, tmpId)
  250. }
  251. }
  252. // 删除多余的
  253. for tmpUid := range all {
  254. rds.HDel(mapUserKey, tmpUid)
  255. }
  256. log.Println("modify rds cache ok", len(tmpAccount), rdsLen)
  257. }
  258. }
  259. //
  260. for _, cmd := range msg.BodyMuti {
  261. strCmd := string(cmd)
  262. log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d cmd:%s \n", msg.Version, strCmd)
  263. // 这里进行具体逻辑的处理
  264. if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_DM") {
  265. // 弹幕
  266. dm := LiveDM{}
  267. err := json.Unmarshal(cmd, &dm)
  268. if err != nil {
  269. log.Println("json error", string(cmd), err)
  270. continue
  271. }
  272. //
  273. accountId := rds.HGet(mapUserKey, strconv.Itoa(int(dm.Data.Uid))).Val()
  274. log.Printf("弹幕 %#v accountID: %s\n", dm, accountId)
  275. if accountId == "" {
  276. if dm.Data.RoomID == 23225527 {
  277. // 执行验证码验证
  278. if len(dm.Data.Msg) == 6 {
  279. codeUserKey := fmt.Sprintf("auth_code:%s", dm.Data.Msg)
  280. if aid := rds.Get(codeUserKey).Val(); aid != "" {
  281. db.Debug().Model(&Account{}).Where("id=?", aid).UpdateColumn("uid", dm.Data.Uid)
  282. rds.HSet(mapUserKey, strconv.Itoa(int(dm.Data.Uid)), aid)
  283. rds.Del(codeUserKey)
  284. // 获取新人礼包的礼劵数量
  285. tmpCoupon := []string{}
  286. db.Debug().Table("configs").Where("`key`=?", "new_user_coupon").Pluck("val", &tmpCoupon)
  287. log.Printf("验证通过 uid: %d accountID: %s new_coupon: %v \n", dm.Data.Uid, aid, tmpCoupon)
  288. if len(tmpCoupon) == 0 {
  289. continue
  290. }
  291. tmpCouponNum, _ := strconv.Atoi(tmpCoupon[0])
  292. // 送5个礼劵作为新人礼包
  293. err := db.Transaction(func(tx *gorm.DB) error {
  294. insertDB := CouponRecord{
  295. AccountId: aid,
  296. AddCnt: tmpCouponNum,
  297. Type: 6,
  298. Day: time.Now().Format("20060102"),
  299. CreatedAt: time.Now(),
  300. UpdatedAt: time.Now(),
  301. }
  302. tx.Debug().Save(&insertDB)
  303. tx.Debug().Model(&Account{}).Where("id=?", aid).Update("coupon", gorm.Expr("coupon + ?", tmpCouponNum))
  304. return nil
  305. })
  306. if err != nil {
  307. log.Println("dm error db", err)
  308. }
  309. }
  310. }
  311. }
  312. continue
  313. }
  314. //
  315. // 弹幕数+1
  316. dmCntKey := fmt.Sprintf("dm_cnt:%s:%s", time.Now().Format("20060102"), accountId)
  317. dmDealKey := fmt.Sprintf("dm_deal:%s:%s", time.Now().Format("20060102"), accountId)
  318. dmCanExchangeKey := fmt.Sprintf("dm_level:%s:%s:%d", time.Now().Format("20060102"), accountId, dm.Data.RoomID)
  319. // 弹幕数+1
  320. cnt := rds.Incr(dmCntKey).Val()
  321. rds.Expire(dmCntKey, 86400*3*time.Second)
  322. if cnt >= 30 {
  323. if rds.Get(dmDealKey).Val() != "1" {
  324. //
  325. log.Println("accountId", accountId)
  326. // 进行处理,入数据库,且更新这个值
  327. err := db.Transaction(func(tx *gorm.DB) error {
  328. insertDB := CouponRecord{
  329. AccountId: accountId,
  330. AddCnt: 1,
  331. Type: 2,
  332. Day: time.Now().Format("20060102"),
  333. CreatedAt: time.Now(),
  334. UpdatedAt: time.Now(),
  335. }
  336. tx.Debug().Save(&insertDB)
  337. tx.Debug().Model(&Account{}).Where("id=?", accountId).Update("coupon", gorm.Expr("coupon + ?", 1))
  338. return nil
  339. })
  340. if err != nil {
  341. log.Println("dm error db", err)
  342. continue
  343. }
  344. // 礼劵数+1
  345. rds.Set(dmDealKey, "1", 86400*3*time.Second)
  346. log.Printf("弹幕超过30,增加礼劵成功 uid: %d accountID: %s\n", dm.Data.Uid, accountId)
  347. }
  348. }
  349. // 记录用户的粉丝牌,兑换礼包时候用
  350. if fansName, ok := mapFansName[dm.Data.RoomID]; ok {
  351. if fansName == dm.Data.FansMedalName && dm.Data.FansMedalLevel > 1 {
  352. rds.Set(dmCanExchangeKey, 1, 86400*3*time.Second)
  353. }
  354. }
  355. } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SEND_GIFT") {
  356. // 打赏
  357. sg := LiveGift{}
  358. err := json.Unmarshal(cmd, &sg)
  359. if err != nil {
  360. log.Println("error", string(cmd), err)
  361. continue
  362. }
  363. //
  364. accountId := rds.HGet(mapUserKey, strconv.Itoa(int(sg.Data.Uid))).Val()
  365. log.Printf("打赏 account:%s %#v\n", accountId, sg)
  366. if accountId == "" {
  367. continue
  368. }
  369. //
  370. if sg.Data.Price*sg.Data.GiftNum <= 0 {
  371. continue
  372. }
  373. //
  374. sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId)
  375. cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Price*sg.Data.GiftNum).Val())
  376. rds.Expire(sgPriceKey, 86400*3*time.Second)
  377. coupons := cnt / 3000
  378. if coupons > 0 {
  379. sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId)
  380. sgDealVal := rds.Get(sgDealKey).Val()
  381. if sgDealVal == "" {
  382. sgDealVal = "0"
  383. }
  384. //
  385. nt, _ := strconv.Atoi(sgDealVal)
  386. if coupons-nt > 0 {
  387. // 进行处理,入数据库,且更新这个值
  388. err := db.Transaction(func(tx *gorm.DB) error {
  389. insertDB := CouponRecord{
  390. AccountId: accountId,
  391. AddCnt: coupons - nt,
  392. Type: 3,
  393. Day: time.Now().Format("20060102"),
  394. CreatedAt: time.Now(),
  395. UpdatedAt: time.Now(),
  396. }
  397. tx.Debug().Save(&insertDB)
  398. tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt))
  399. return nil
  400. })
  401. if err != nil {
  402. log.Println("sg error db", err)
  403. continue
  404. }
  405. // 礼劵数+1
  406. rds.Set(sgDealKey, coupons, 86400*3*time.Second)
  407. log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId)
  408. }
  409. }
  410. } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SUPER_CHAT") {
  411. // 打赏
  412. sg := Liuyan{}
  413. err := json.Unmarshal(cmd, &sg)
  414. if err != nil {
  415. log.Println("error", string(cmd), err)
  416. continue
  417. }
  418. //
  419. accountId := rds.HGet(mapUserKey, strconv.Itoa(sg.Data.Uid)).Val()
  420. log.Printf("留言醒目 account:%s %#v\n", accountId, sg)
  421. if accountId == "" {
  422. continue
  423. }
  424. //
  425. if sg.Data.Rmb <= 0 {
  426. continue
  427. }
  428. //
  429. sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId)
  430. cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Rmb*100).Val())
  431. rds.Expire(sgPriceKey, 86400*3*time.Second)
  432. coupons := cnt / 3000
  433. if coupons > 0 {
  434. sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId)
  435. sgDealVal := rds.Get(sgDealKey).Val()
  436. if sgDealVal == "" {
  437. sgDealVal = "0"
  438. }
  439. //
  440. nt, _ := strconv.Atoi(sgDealVal)
  441. if coupons-nt > 0 {
  442. // 进行处理,入数据库,且更新这个值
  443. err := db.Transaction(func(tx *gorm.DB) error {
  444. insertDB := CouponRecord{
  445. AccountId: accountId,
  446. AddCnt: coupons - nt,
  447. Type: 3,
  448. Day: time.Now().Format("20060102"),
  449. CreatedAt: time.Now(),
  450. UpdatedAt: time.Now(),
  451. }
  452. tx.Debug().Save(&insertDB)
  453. tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt))
  454. return nil
  455. })
  456. if err != nil {
  457. log.Println("sg error db", err)
  458. continue
  459. }
  460. // 礼劵数+1
  461. rds.Set(sgDealKey, coupons, 86400*3*time.Second)
  462. log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId)
  463. }
  464. }
  465. }
  466. //
  467. }
  468. return nil
  469. }
  470. type ProtoLogic func(p *Proto) (err error)
  471. type protoDispather struct {
  472. dispather map[int32]ProtoLogic
  473. }
  474. func newMessageDispather() *protoDispather {
  475. return &protoDispather{
  476. dispather: map[int32]ProtoLogic{},
  477. }
  478. }
  479. func (m *protoDispather) Register(Op int32, f ProtoLogic) {
  480. if m.dispather[Op] != nil {
  481. panic(fmt.Sprintf("[MessageDispather | Register] Op:%d repeated", Op))
  482. }
  483. m.dispather[Op] = f
  484. }
  485. func (m *protoDispather) Do(p *Proto) (err error) {
  486. f, exist := m.dispather[p.Operation]
  487. if exist {
  488. err = f(p)
  489. if err != nil {
  490. errors.Wrapf(err, "[MessageDispather | Do] process err")
  491. }
  492. return
  493. }
  494. return fmt.Errorf("[MessageDispather | Do] Op:%d not found", p.Operation)
  495. }