package main import ( "bytes" "encoding/binary" "encoding/json" "fmt" "log" "math/rand" "strconv" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/pkg/errors" "gorm.io/gorm" ) type BiliWsClientConfig struct { Name string Host string Port int64 AuthBody string } type BiliWsClient struct { *websocket.Conn conf *BiliWsClientConfig dispather *protoDispather decoder *DecodeManager bufPool sync.Pool sequenceId int32 closeFlag chan struct{} authed bool } func NewBiliWsClient(conf *BiliWsClientConfig) *BiliWsClient { if conf == nil { panic("[BiliWsClient | NewBiliWsClient] conf == nil") } c := &BiliWsClient{ conf: conf, dispather: newMessageDispather(), closeFlag: make(chan struct{}), decoder: NewDecodeManager(), } var err error wsAddr := fmt.Sprintf("ws://%s:%d/sub", c.conf.Host, c.conf.Port) c.Conn, _, err = websocket.DefaultDialer.Dial(wsAddr, nil) if err != nil { log.Fatal("[BiliWsClient | NewBiliWsClient] connect err", err) return nil } log.Println("[BiliWsClient | NewBiliWsClient] connect success") c.bufPool = sync.Pool{ New: func() interface{} { return new(bytes.Buffer) }, } c.registerProtoHandler(OP_AUTH_REPLY, c.AuthResp) c.registerProtoHandler(OP_HEARTBEAT_REPLY, c.HeartBeatResp) c.registerProtoHandler(OP_SEND_SMS_REPLY, c.MsgResp) err = c.sendAuth(c.conf.AuthBody) if err != nil { log.Fatal("[BiliWsClient | NewBiliWsClient] sendAuth err:", err) return nil } return c } func (c *BiliWsClient) sendAuth(authBody string) (err error) { p := &Proto{ Operation: OP_AUTH, Body: []byte(authBody), } return c.sendMsg(p) } func (c *BiliWsClient) Run() { wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() c.doReadLoop() }() go func() { defer wg.Done() c.doEventLoop() }() wg.Done() } func (c *BiliWsClient) sendHeartBeat() { if !c.authed { return } msg := &Proto{ Operation: OP_HEARTBEAT, SequenceId: c.sequenceId, } c.sequenceId++ err := c.sendMsg(msg) if err != nil { log.Fatal("[BiliWsClient | sendHeartBeat] err", err) return } log.Println("[BiliWsClient | sendHeartBeat] seq:", msg.SequenceId) } func (c *BiliWsClient) registerProtoHandler(cmd int32, logic ProtoLogic) { c.dispather.Register(cmd, logic) } func (c *BiliWsClient) Close() { } func (c *BiliWsClient) sendMsg(msg *Proto) (err error) { dataBuff := c.bufPool.Get().(*bytes.Buffer) packLen := int32(RawHeaderSize + len(msg.Body)) msg.HeaderLength = RawHeaderSize binary.Write(dataBuff, binary.BigEndian, packLen) binary.Write(dataBuff, binary.BigEndian, int16(RawHeaderSize)) binary.Write(dataBuff, binary.BigEndian, msg.Version) binary.Write(dataBuff, binary.BigEndian, msg.Operation) binary.Write(dataBuff, binary.BigEndian, msg.SequenceId) binary.Write(dataBuff, binary.BigEndian, msg.Body) err = c.Conn.WriteMessage(websocket.BinaryMessage, dataBuff.Bytes()) if err != nil { err = errors.Wrapf(err, "[BiliWsClient | SendMsg] WriteMessage err") return } return } func (c *BiliWsClient) ReadMsg() (p *Proto, err error) { _, buf, err := c.Conn.ReadMessage() if err != nil { err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] conn err") return } if len(buf) < RawHeaderSize { err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] buf:%d less", len(buf)) return } p = &Proto{ PacketLength: int32(binary.BigEndian.Uint32(buf[PackOffset:HeaderOffset])), HeaderLength: int16(binary.BigEndian.Uint16(buf[HeaderOffset:VerOffset])), Version: int16(binary.BigEndian.Uint16(buf[VerOffset:OperationOffset])), Operation: int32(binary.BigEndian.Uint32(buf[OperationOffset:SeqIdOffset])), SequenceId: int32(binary.BigEndian.Uint32(buf[SeqIdOffset:])), } if p.PacketLength < 0 || p.PacketLength > MaxPackSize { err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] PacketLength:%d err", p.PacketLength) return } if p.HeaderLength != RawHeaderSize { err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] HeaderLength:%d err", p.PacketLength) return } if bodyLen := int(p.PacketLength - int32(p.HeaderLength)); bodyLen > 0 { p.Body = buf[p.HeaderLength:p.PacketLength] } else { err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] BodyLength:%d err", bodyLen) return } p.BodyMuti, err = c.decoder.Decode(int64(p.Version), p.Body) if len(p.BodyMuti) > 0 { p.Body = p.BodyMuti[0] } return } func (c *BiliWsClient) doEventLoop() { ticker := time.NewTicker(time.Second * 5) for { select { case <-c.closeFlag: goto exit case <-ticker.C: c.sendHeartBeat() default: } } exit: c.Close() } func (c *BiliWsClient) doReadLoop() { for { msg, err := c.ReadMsg() if err != nil { log.Fatal("[BiliWsClient | ReadMsg] err:", err) goto exit } err = c.dispather.Do(msg) if err != nil { log.Fatal("[BiliWsClient | ReadMsg] dispather err:", err) goto exit } } exit: c.Close() } func (c *BiliWsClient) AuthResp(msg *Proto) (err error) { resp := &AuthRespParam{} if err = json.Unmarshal(msg.Body, resp); err != nil { err = errors.Wrapf(err, "[BiliWsClient | AuthResp] Unmarshal err") return } if resp.Code != 0 { err = fmt.Errorf("[BiliWsClient | AuthResp] code:%d", resp.Code) return } c.authed = true log.Println("[BiliWsClient | AuthResp] auth success") return } func (c *BiliWsClient) HeartBeatResp(msg *Proto) (err error) { log.Println("[BiliWsClient | HeartBeatResp] recv HeartBeat resp", msg.Body) return } //MsgResp 可以这里做回调 func (c *BiliWsClient) MsgResp(msg *Proto) (err error) { log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d \n", msg.Version) // rds, err := RdsConn() if err != nil { log.Println("err rds", err) return } defer rds.Close() // mapFansName := map[int64]string{ 14578426: "战姬众", 23225527: "NB人", 23303212: "夏韭菜", 23221095: "波波派", // 24087754: "", //罗兹Blazing } // mapUserKey := "map_user" db := DBConn() sqlDB, _ := db.DB() defer sqlDB.Close() // 通过一定的概率检测rds 缓存是否一致 if rand.Intn(10) >= 0 { var count int64 db.Table("accounts").Where("uid!=0").Count(&count) rdsLen := rds.HLen(mapUserKey).Val() log.Println("check rds cache", rdsLen, count) if rdsLen != count { tmpAccount := []Account{} db.Where("uid!=0").Select("id", "uid").Find(&tmpAccount) // 进行同步 all := rds.HGetAll(mapUserKey).Val() for _, v := range tmpAccount { tmpUid := strconv.Itoa(int(v.Uid)) tmpId := strconv.Itoa(int(v.Id)) if _, ok := all[tmpUid]; ok { delete(all, tmpUid) } else { rds.HSet(mapUserKey, tmpUid, tmpId) } } // 删除多余的 for tmpUid := range all { rds.HDel(mapUserKey, tmpUid) } log.Println("modify rds cache ok", len(tmpAccount), rdsLen) } } // for _, cmd := range msg.BodyMuti { strCmd := string(cmd) log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d cmd:%s \n", msg.Version, strCmd) // 这里进行具体逻辑的处理 if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_DM") { // 弹幕 dm := LiveDM{} err := json.Unmarshal(cmd, &dm) if err != nil { log.Println("json error", string(cmd), err) continue } // accountId := rds.HGet(mapUserKey, strconv.Itoa(int(dm.Data.Uid))).Val() log.Printf("弹幕 %#v accountID: %s\n", dm, accountId) if accountId == "" { if dm.Data.RoomID == 23225527 { // 执行验证码验证 if len(dm.Data.Msg) == 6 { codeUserKey := fmt.Sprintf("auth_code:%s", dm.Data.Msg) if aid := rds.Get(codeUserKey).Val(); aid != "" { db.Debug().Model(&Account{}).Where("id=?", aid).UpdateColumn("uid", dm.Data.Uid) rds.HSet(mapUserKey, strconv.Itoa(int(dm.Data.Uid)), aid) rds.Del(codeUserKey) // 获取新人礼包的礼劵数量 tmpCoupon := []string{} db.Debug().Table("configs").Where("`key`=?", "new_user_coupon").Pluck("val", &tmpCoupon) log.Printf("验证通过 uid: %d accountID: %s new_coupon: %v \n", dm.Data.Uid, aid, tmpCoupon) if len(tmpCoupon) == 0 { continue } tmpCouponNum, _ := strconv.Atoi(tmpCoupon[0]) // 送5个礼劵作为新人礼包 err := db.Transaction(func(tx *gorm.DB) error { insertDB := CouponRecord{ AccountId: aid, AddCnt: tmpCouponNum, Type: 6, Day: time.Now().Format("20060102"), CreatedAt: time.Now(), UpdatedAt: time.Now(), } tx.Debug().Save(&insertDB) tx.Debug().Model(&Account{}).Where("id=?", aid).Update("coupon", gorm.Expr("coupon + ?", tmpCouponNum)) return nil }) if err != nil { log.Println("dm error db", err) } } } } continue } // // 弹幕数+1 dmCntKey := fmt.Sprintf("dm_cnt:%s:%s", time.Now().Format("20060102"), accountId) dmDealKey := fmt.Sprintf("dm_deal:%s:%s", time.Now().Format("20060102"), accountId) dmCanExchangeKey := fmt.Sprintf("dm_level:%s:%s:%d", time.Now().Format("20060102"), accountId, dm.Data.RoomID) // 弹幕数+1 cnt := rds.Incr(dmCntKey).Val() rds.Expire(dmCntKey, 86400*3*time.Second) if cnt >= 30 { if rds.Get(dmDealKey).Val() != "1" { // log.Println("accountId", accountId) // 进行处理,入数据库,且更新这个值 err := db.Transaction(func(tx *gorm.DB) error { insertDB := CouponRecord{ AccountId: accountId, AddCnt: 1, Type: 2, Day: time.Now().Format("20060102"), CreatedAt: time.Now(), UpdatedAt: time.Now(), } tx.Debug().Save(&insertDB) tx.Debug().Model(&Account{}).Where("id=?", accountId).Update("coupon", gorm.Expr("coupon + ?", 1)) return nil }) if err != nil { log.Println("dm error db", err) continue } // 礼劵数+1 rds.Set(dmDealKey, "1", 86400*3*time.Second) log.Printf("弹幕超过30,增加礼劵成功 uid: %d accountID: %s\n", dm.Data.Uid, accountId) } } // 记录用户的粉丝牌,兑换礼包时候用 if fansName, ok := mapFansName[dm.Data.RoomID]; ok { if fansName == dm.Data.FansMedalName && dm.Data.FansMedalLevel > 1 { rds.Set(dmCanExchangeKey, 1, 86400*3*time.Second) } } } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SEND_GIFT") { // 打赏 sg := LiveGift{} err := json.Unmarshal(cmd, &sg) if err != nil { log.Println("error", string(cmd), err) continue } // accountId := rds.HGet(mapUserKey, strconv.Itoa(int(sg.Data.Uid))).Val() log.Printf("打赏 account:%s %#v\n", accountId, sg) if accountId == "" { continue } // if sg.Data.Price*sg.Data.GiftNum <= 0 { continue } // sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId) cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Price*sg.Data.GiftNum).Val()) rds.Expire(sgPriceKey, 86400*3*time.Second) coupons := cnt / 3000 if coupons > 0 { sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId) sgDealVal := rds.Get(sgDealKey).Val() if sgDealVal == "" { sgDealVal = "0" } // nt, _ := strconv.Atoi(sgDealVal) if coupons-nt > 0 { // 进行处理,入数据库,且更新这个值 err := db.Transaction(func(tx *gorm.DB) error { insertDB := CouponRecord{ AccountId: accountId, AddCnt: coupons - nt, Type: 3, Day: time.Now().Format("20060102"), CreatedAt: time.Now(), UpdatedAt: time.Now(), } tx.Debug().Save(&insertDB) tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt)) return nil }) if err != nil { log.Println("sg error db", err) continue } // 礼劵数+1 rds.Set(sgDealKey, coupons, 86400*3*time.Second) log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId) } } } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SUPER_CHAT") { // 打赏 sg := Liuyan{} err := json.Unmarshal(cmd, &sg) if err != nil { log.Println("error", string(cmd), err) continue } // accountId := rds.HGet(mapUserKey, strconv.Itoa(sg.Data.Uid)).Val() log.Printf("留言醒目 account:%s %#v\n", accountId, sg) if accountId == "" { continue } // if sg.Data.Rmb <= 0 { continue } // sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId) cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Rmb*100).Val()) rds.Expire(sgPriceKey, 86400*3*time.Second) coupons := cnt / 3000 if coupons > 0 { sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId) sgDealVal := rds.Get(sgDealKey).Val() if sgDealVal == "" { sgDealVal = "0" } // nt, _ := strconv.Atoi(sgDealVal) if coupons-nt > 0 { // 进行处理,入数据库,且更新这个值 err := db.Transaction(func(tx *gorm.DB) error { insertDB := CouponRecord{ AccountId: accountId, AddCnt: coupons - nt, Type: 3, Day: time.Now().Format("20060102"), CreatedAt: time.Now(), UpdatedAt: time.Now(), } tx.Debug().Save(&insertDB) tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt)) return nil }) if err != nil { log.Println("sg error db", err) continue } // 礼劵数+1 rds.Set(sgDealKey, coupons, 86400*3*time.Second) log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId) } } } // } return nil } type ProtoLogic func(p *Proto) (err error) type protoDispather struct { dispather map[int32]ProtoLogic } func newMessageDispather() *protoDispather { return &protoDispather{ dispather: map[int32]ProtoLogic{}, } } func (m *protoDispather) Register(Op int32, f ProtoLogic) { if m.dispather[Op] != nil { panic(fmt.Sprintf("[MessageDispather | Register] Op:%d repeated", Op)) } m.dispather[Op] = f } func (m *protoDispather) Do(p *Proto) (err error) { f, exist := m.dispather[p.Operation] if exist { err = f(p) if err != nil { errors.Wrapf(err, "[MessageDispather | Do] process err") } return } return fmt.Errorf("[MessageDispather | Do] Op:%d not found", p.Operation) }