| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- package main
- import (
- "bytes"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "log"
- "math/rand"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "github.com/pkg/errors"
- "gorm.io/gorm"
- )
- type BiliWsClientConfig struct {
- Name string
- Host string
- Port int64
- AuthBody string
- }
- type BiliWsClient struct {
- *websocket.Conn
- conf *BiliWsClientConfig
- dispather *protoDispather
- decoder *DecodeManager
- bufPool sync.Pool
- sequenceId int32
- closeFlag chan struct{}
- authed bool
- }
- func NewBiliWsClient(conf *BiliWsClientConfig) *BiliWsClient {
- if conf == nil {
- panic("[BiliWsClient | NewBiliWsClient] conf == nil")
- }
- c := &BiliWsClient{
- conf: conf,
- dispather: newMessageDispather(),
- closeFlag: make(chan struct{}),
- decoder: NewDecodeManager(),
- }
- var err error
- wsAddr := fmt.Sprintf("ws://%s:%d/sub", c.conf.Host, c.conf.Port)
- c.Conn, _, err = websocket.DefaultDialer.Dial(wsAddr, nil)
- if err != nil {
- log.Fatal("[BiliWsClient | NewBiliWsClient] connect err", err)
- return nil
- }
- log.Println("[BiliWsClient | NewBiliWsClient] connect success")
- c.bufPool = sync.Pool{
- New: func() interface{} {
- return new(bytes.Buffer)
- },
- }
- c.registerProtoHandler(OP_AUTH_REPLY, c.AuthResp)
- c.registerProtoHandler(OP_HEARTBEAT_REPLY, c.HeartBeatResp)
- c.registerProtoHandler(OP_SEND_SMS_REPLY, c.MsgResp)
- err = c.sendAuth(c.conf.AuthBody)
- if err != nil {
- log.Fatal("[BiliWsClient | NewBiliWsClient] sendAuth err:", err)
- return nil
- }
- return c
- }
- func (c *BiliWsClient) sendAuth(authBody string) (err error) {
- p := &Proto{
- Operation: OP_AUTH,
- Body: []byte(authBody),
- }
- return c.sendMsg(p)
- }
- func (c *BiliWsClient) Run() {
- wg := sync.WaitGroup{}
- wg.Add(2)
- go func() {
- defer wg.Done()
- c.doReadLoop()
- }()
- go func() {
- defer wg.Done()
- c.doEventLoop()
- }()
- wg.Done()
- }
- func (c *BiliWsClient) sendHeartBeat() {
- if !c.authed {
- return
- }
- msg := &Proto{
- Operation: OP_HEARTBEAT,
- SequenceId: c.sequenceId,
- }
- c.sequenceId++
- err := c.sendMsg(msg)
- if err != nil {
- log.Fatal("[BiliWsClient | sendHeartBeat] err", err)
- return
- }
- log.Println("[BiliWsClient | sendHeartBeat] seq:", msg.SequenceId)
- }
- func (c *BiliWsClient) registerProtoHandler(cmd int32, logic ProtoLogic) {
- c.dispather.Register(cmd, logic)
- }
- func (c *BiliWsClient) Close() {
- }
- func (c *BiliWsClient) sendMsg(msg *Proto) (err error) {
- dataBuff := c.bufPool.Get().(*bytes.Buffer)
- packLen := int32(RawHeaderSize + len(msg.Body))
- msg.HeaderLength = RawHeaderSize
- binary.Write(dataBuff, binary.BigEndian, packLen)
- binary.Write(dataBuff, binary.BigEndian, int16(RawHeaderSize))
- binary.Write(dataBuff, binary.BigEndian, msg.Version)
- binary.Write(dataBuff, binary.BigEndian, msg.Operation)
- binary.Write(dataBuff, binary.BigEndian, msg.SequenceId)
- binary.Write(dataBuff, binary.BigEndian, msg.Body)
- err = c.Conn.WriteMessage(websocket.BinaryMessage, dataBuff.Bytes())
- if err != nil {
- err = errors.Wrapf(err, "[BiliWsClient | SendMsg] WriteMessage err")
- return
- }
- return
- }
- func (c *BiliWsClient) ReadMsg() (p *Proto, err error) {
- _, buf, err := c.Conn.ReadMessage()
- if err != nil {
- err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] conn err")
- return
- }
- if len(buf) < RawHeaderSize {
- err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] buf:%d less", len(buf))
- return
- }
- p = &Proto{
- PacketLength: int32(binary.BigEndian.Uint32(buf[PackOffset:HeaderOffset])),
- HeaderLength: int16(binary.BigEndian.Uint16(buf[HeaderOffset:VerOffset])),
- Version: int16(binary.BigEndian.Uint16(buf[VerOffset:OperationOffset])),
- Operation: int32(binary.BigEndian.Uint32(buf[OperationOffset:SeqIdOffset])),
- SequenceId: int32(binary.BigEndian.Uint32(buf[SeqIdOffset:])),
- }
- if p.PacketLength < 0 || p.PacketLength > MaxPackSize {
- err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] PacketLength:%d err", p.PacketLength)
- return
- }
- if p.HeaderLength != RawHeaderSize {
- err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] HeaderLength:%d err", p.PacketLength)
- return
- }
- if bodyLen := int(p.PacketLength - int32(p.HeaderLength)); bodyLen > 0 {
- p.Body = buf[p.HeaderLength:p.PacketLength]
- } else {
- err = errors.Wrapf(err, "[BiliWsClient | ReadMsg] BodyLength:%d err", bodyLen)
- return
- }
- p.BodyMuti, err = c.decoder.Decode(int64(p.Version), p.Body)
- if len(p.BodyMuti) > 0 {
- p.Body = p.BodyMuti[0]
- }
- return
- }
- func (c *BiliWsClient) doEventLoop() {
- ticker := time.NewTicker(time.Second * 5)
- for {
- select {
- case <-c.closeFlag:
- goto exit
- case <-ticker.C:
- c.sendHeartBeat()
- default:
- }
- }
- exit:
- c.Close()
- }
- func (c *BiliWsClient) doReadLoop() {
- for {
- msg, err := c.ReadMsg()
- if err != nil {
- log.Fatal("[BiliWsClient | ReadMsg] err:", err)
- goto exit
- }
- err = c.dispather.Do(msg)
- if err != nil {
- log.Fatal("[BiliWsClient | ReadMsg] dispather err:", err)
- goto exit
- }
- }
- exit:
- c.Close()
- }
- func (c *BiliWsClient) AuthResp(msg *Proto) (err error) {
- resp := &AuthRespParam{}
- if err = json.Unmarshal(msg.Body, resp); err != nil {
- err = errors.Wrapf(err, "[BiliWsClient | AuthResp] Unmarshal err")
- return
- }
- if resp.Code != 0 {
- err = fmt.Errorf("[BiliWsClient | AuthResp] code:%d", resp.Code)
- return
- }
- c.authed = true
- log.Println("[BiliWsClient | AuthResp] auth success")
- return
- }
- func (c *BiliWsClient) HeartBeatResp(msg *Proto) (err error) {
- log.Println("[BiliWsClient | HeartBeatResp] recv HeartBeat resp", msg.Body)
- return
- }
- //MsgResp 可以这里做回调
- func (c *BiliWsClient) MsgResp(msg *Proto) (err error) {
- log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d \n", msg.Version)
- //
- rds, err := RdsConn()
- if err != nil {
- log.Println("err rds", err)
- return
- }
- defer rds.Close()
- //
- mapFansName := map[int64]string{
- 14578426: "战姬众",
- 23225527: "NB人",
- 23303212: "夏韭菜",
- 23221095: "波波派",
- // 24087754: "", //罗兹Blazing
- }
- //
- mapUserKey := "map_user"
- db := DBConn()
- sqlDB, _ := db.DB()
- defer sqlDB.Close()
- // 通过一定的概率检测rds 缓存是否一致
- if rand.Intn(10) >= 0 {
- var count int64
- db.Table("accounts").Where("uid!=0").Count(&count)
- rdsLen := rds.HLen(mapUserKey).Val()
- log.Println("check rds cache", rdsLen, count)
- if rdsLen != count {
- tmpAccount := []Account{}
- db.Where("uid!=0").Select("id", "uid").Find(&tmpAccount)
- // 进行同步
- all := rds.HGetAll(mapUserKey).Val()
- for _, v := range tmpAccount {
- tmpUid := strconv.Itoa(int(v.Uid))
- tmpId := strconv.Itoa(int(v.Id))
- if _, ok := all[tmpUid]; ok {
- delete(all, tmpUid)
- } else {
- rds.HSet(mapUserKey, tmpUid, tmpId)
- }
- }
- // 删除多余的
- for tmpUid := range all {
- rds.HDel(mapUserKey, tmpUid)
- }
- log.Println("modify rds cache ok", len(tmpAccount), rdsLen)
- }
- }
- //
- for _, cmd := range msg.BodyMuti {
- strCmd := string(cmd)
- log.Printf("[BiliWsClient | HeartBeatResp] recv MsgResp ver:%d cmd:%s \n", msg.Version, strCmd)
- // 这里进行具体逻辑的处理
- if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_DM") {
- // 弹幕
- dm := LiveDM{}
- err := json.Unmarshal(cmd, &dm)
- if err != nil {
- log.Println("json error", string(cmd), err)
- continue
- }
- //
- accountId := rds.HGet(mapUserKey, strconv.Itoa(int(dm.Data.Uid))).Val()
- log.Printf("弹幕 %#v accountID: %s\n", dm, accountId)
- if accountId == "" {
- if dm.Data.RoomID == 23225527 {
- // 执行验证码验证
- if len(dm.Data.Msg) == 6 {
- codeUserKey := fmt.Sprintf("auth_code:%s", dm.Data.Msg)
- if aid := rds.Get(codeUserKey).Val(); aid != "" {
- db.Debug().Model(&Account{}).Where("id=?", aid).UpdateColumn("uid", dm.Data.Uid)
- rds.HSet(mapUserKey, strconv.Itoa(int(dm.Data.Uid)), aid)
- rds.Del(codeUserKey)
- // 获取新人礼包的礼劵数量
- tmpCoupon := []string{}
- db.Debug().Table("configs").Where("`key`=?", "new_user_coupon").Pluck("val", &tmpCoupon)
- log.Printf("验证通过 uid: %d accountID: %s new_coupon: %v \n", dm.Data.Uid, aid, tmpCoupon)
- if len(tmpCoupon) == 0 {
- continue
- }
- tmpCouponNum, _ := strconv.Atoi(tmpCoupon[0])
- // 送5个礼劵作为新人礼包
- err := db.Transaction(func(tx *gorm.DB) error {
- insertDB := CouponRecord{
- AccountId: aid,
- AddCnt: tmpCouponNum,
- Type: 6,
- Day: time.Now().Format("20060102"),
- CreatedAt: time.Now(),
- UpdatedAt: time.Now(),
- }
- tx.Debug().Save(&insertDB)
- tx.Debug().Model(&Account{}).Where("id=?", aid).Update("coupon", gorm.Expr("coupon + ?", tmpCouponNum))
- return nil
- })
- if err != nil {
- log.Println("dm error db", err)
- }
- }
- }
- }
- continue
- }
- //
- // 弹幕数+1
- dmCntKey := fmt.Sprintf("dm_cnt:%s:%s", time.Now().Format("20060102"), accountId)
- dmDealKey := fmt.Sprintf("dm_deal:%s:%s", time.Now().Format("20060102"), accountId)
- dmCanExchangeKey := fmt.Sprintf("dm_level:%s:%s:%d", time.Now().Format("20060102"), accountId, dm.Data.RoomID)
- // 弹幕数+1
- cnt := rds.Incr(dmCntKey).Val()
- rds.Expire(dmCntKey, 86400*3*time.Second)
- if cnt >= 30 {
- if rds.Get(dmDealKey).Val() != "1" {
- //
- log.Println("accountId", accountId)
- // 进行处理,入数据库,且更新这个值
- err := db.Transaction(func(tx *gorm.DB) error {
- insertDB := CouponRecord{
- AccountId: accountId,
- AddCnt: 1,
- Type: 2,
- Day: time.Now().Format("20060102"),
- CreatedAt: time.Now(),
- UpdatedAt: time.Now(),
- }
- tx.Debug().Save(&insertDB)
- tx.Debug().Model(&Account{}).Where("id=?", accountId).Update("coupon", gorm.Expr("coupon + ?", 1))
- return nil
- })
- if err != nil {
- log.Println("dm error db", err)
- continue
- }
- // 礼劵数+1
- rds.Set(dmDealKey, "1", 86400*3*time.Second)
- log.Printf("弹幕超过30,增加礼劵成功 uid: %d accountID: %s\n", dm.Data.Uid, accountId)
- }
- }
- // 记录用户的粉丝牌,兑换礼包时候用
- if fansName, ok := mapFansName[dm.Data.RoomID]; ok {
- if fansName == dm.Data.FansMedalName && dm.Data.FansMedalLevel > 1 {
- rds.Set(dmCanExchangeKey, 1, 86400*3*time.Second)
- }
- }
- } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SEND_GIFT") {
- // 打赏
- sg := LiveGift{}
- err := json.Unmarshal(cmd, &sg)
- if err != nil {
- log.Println("error", string(cmd), err)
- continue
- }
- //
- accountId := rds.HGet(mapUserKey, strconv.Itoa(int(sg.Data.Uid))).Val()
- log.Printf("打赏 account:%s %#v\n", accountId, sg)
- if accountId == "" {
- continue
- }
- //
- if sg.Data.Price*sg.Data.GiftNum <= 0 {
- continue
- }
- //
- sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId)
- cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Price*sg.Data.GiftNum).Val())
- rds.Expire(sgPriceKey, 86400*3*time.Second)
- coupons := cnt / 3000
- if coupons > 0 {
- sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId)
- sgDealVal := rds.Get(sgDealKey).Val()
- if sgDealVal == "" {
- sgDealVal = "0"
- }
- //
- nt, _ := strconv.Atoi(sgDealVal)
- if coupons-nt > 0 {
- // 进行处理,入数据库,且更新这个值
- err := db.Transaction(func(tx *gorm.DB) error {
- insertDB := CouponRecord{
- AccountId: accountId,
- AddCnt: coupons - nt,
- Type: 3,
- Day: time.Now().Format("20060102"),
- CreatedAt: time.Now(),
- UpdatedAt: time.Now(),
- }
- tx.Debug().Save(&insertDB)
- tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt))
- return nil
- })
- if err != nil {
- log.Println("sg error db", err)
- continue
- }
- // 礼劵数+1
- rds.Set(sgDealKey, coupons, 86400*3*time.Second)
- log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId)
- }
- }
- } else if strings.Contains(strCmd, "LIVE_OPEN_PLATFORM_SUPER_CHAT") {
- // 打赏
- sg := Liuyan{}
- err := json.Unmarshal(cmd, &sg)
- if err != nil {
- log.Println("error", string(cmd), err)
- continue
- }
- //
- accountId := rds.HGet(mapUserKey, strconv.Itoa(sg.Data.Uid)).Val()
- log.Printf("留言醒目 account:%s %#v\n", accountId, sg)
- if accountId == "" {
- continue
- }
- //
- if sg.Data.Rmb <= 0 {
- continue
- }
- //
- sgPriceKey := fmt.Sprintf("sg_cnt:%s:%s", time.Now().Format("20060102"), accountId)
- cnt := int(rds.IncrBy(sgPriceKey, sg.Data.Rmb*100).Val())
- rds.Expire(sgPriceKey, 86400*3*time.Second)
- coupons := cnt / 3000
- if coupons > 0 {
- sgDealKey := fmt.Sprintf("sg_deal:%s:%s", time.Now().Format("20060102"), accountId)
- sgDealVal := rds.Get(sgDealKey).Val()
- if sgDealVal == "" {
- sgDealVal = "0"
- }
- //
- nt, _ := strconv.Atoi(sgDealVal)
- if coupons-nt > 0 {
- // 进行处理,入数据库,且更新这个值
- err := db.Transaction(func(tx *gorm.DB) error {
- insertDB := CouponRecord{
- AccountId: accountId,
- AddCnt: coupons - nt,
- Type: 3,
- Day: time.Now().Format("20060102"),
- CreatedAt: time.Now(),
- UpdatedAt: time.Now(),
- }
- tx.Debug().Save(&insertDB)
- tx.Debug().Model(&Account{}).Where("id=?", accountId).UpdateColumn("coupon", gorm.Expr("coupon + ?", coupons-nt))
- return nil
- })
- if err != nil {
- log.Println("sg error db", err)
- continue
- }
- // 礼劵数+1
- rds.Set(sgDealKey, coupons, 86400*3*time.Second)
- log.Printf("礼物超过30,增加礼劵成功 uid: %d accountID: %s\n", sg.Data.Uid, accountId)
- }
- }
- }
- //
- }
- return nil
- }
- type ProtoLogic func(p *Proto) (err error)
- type protoDispather struct {
- dispather map[int32]ProtoLogic
- }
- func newMessageDispather() *protoDispather {
- return &protoDispather{
- dispather: map[int32]ProtoLogic{},
- }
- }
- func (m *protoDispather) Register(Op int32, f ProtoLogic) {
- if m.dispather[Op] != nil {
- panic(fmt.Sprintf("[MessageDispather | Register] Op:%d repeated", Op))
- }
- m.dispather[Op] = f
- }
- func (m *protoDispather) Do(p *Proto) (err error) {
- f, exist := m.dispather[p.Operation]
- if exist {
- err = f(p)
- if err != nil {
- errors.Wrapf(err, "[MessageDispather | Do] process err")
- }
- return
- }
- return fmt.Errorf("[MessageDispather | Do] Op:%d not found", p.Operation)
- }
|