9.3 链接的带缓冲的发包方法

我们之前给Connection提供了一个发消息的方法SendMsg(),这个是将数据发送到一个无缓冲的channel中msgChan。但是如果客户端链接比较多的话,如果对方处理不及时,可能会出现短暂的阻塞现象,我们可以做一个提供一定缓冲的发消息方法,做一些非阻塞的发送体验。

zinx/ziface/iconnection.go

//定义连接接口
type IConnection interface {
    //启动连接,让当前连接开始工作
    Start()
    //停止连接,结束当前连接状态M
    Stop()
    //从当前连接获取原始的socket TCPConn
    GetTCPConnection() *net.TCPConn
    //获取当前连接ID
    GetConnID() uint32
    //获取远程客户端地址信息
    RemoteAddr() net.Addr
    //直接将Message数据发送数据给远程的TCP客户端(无缓冲)
    SendMsg(msgId uint32, data []byte) error
    //直接将Message数据发送给远程的TCP客户端(有缓冲)
    SendBuffMsg(msgId uint32, data []byte) error   //添加带缓冲发送消息接口
}

zinx/znet/connection.go

type Connection struct {
    //当前Conn属于哪个Server
    TcpServer    ziface.IServer
    //当前连接的socket TCP套接字
    Conn *net.TCPConn
    //当前连接的ID 也可以称作为SessionID,ID全局唯一
    ConnID uint32
    //当前连接的关闭状态
    isClosed bool
    //消息管理MsgId和对应处理方法的消息管理模块
    MsgHandler ziface.IMsgHandle
    //告知该链接已经退出/停止的channel
    ExitBuffChan chan bool
    //无缓冲管道,用于读、写两个goroutine之间的消息通信
    msgChan        chan []byte
    //有关冲管道,用于读、写两个goroutine之间的消息通信
    msgBuffChan chan []byte                          //定义channel成员
}

//创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
    //初始化Conn属性
    c := &Connection{
        TcpServer:server,
        Conn:     conn,
        ConnID:   connID,
        isClosed: false,
        MsgHandler: msgHandler,
        ExitBuffChan: make(chan bool, 1),
        msgChan:make(chan []byte),
        msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化
    }

    //将新创建的Conn添加到链接管理中
    c.TcpServer.GetConnMgr().Add(c)
    return c
}

然后将SendBuffMsg()方法实现一下:

func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
    if c.isClosed == true {
        return errors.New("Connection closed when send buff msg")
    }
    //将data封包,并且发送
    dp := NewDataPack()
    msg, err := dp.Pack(NewMsgPackage(msgId, data))
    if err != nil {
        fmt.Println("Pack error msg id = ", msgId)
        return  errors.New("Pack error msg ")
    }

    //写回客户端
    c.msgBuffChan <- msg

    return nil
}

我们在Writer中也要有对msgBuffChan的数据监控:

/*
    写消息Goroutine, 用户将数据发送给客户端
 */
 func (c *Connection) StartWriter() {
     fmt.Println("[Writer Goroutine is running]")
     defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")

     for {
         select {
             case data := <-c.msgChan:
                 //有数据要写给客户端
                 if _, err := c.Conn.Write(data); err != nil {
                     fmt.Println("Send Data error:, ", err, " Conn Writer exit")
                     return
                }
            //针对有缓冲channel需要些的数据处理
             case data, ok:= <-c.msgBuffChan:
                 if ok {
                    //有数据要写给客户端
                    if _, err := c.Conn.Write(data); err != nil {
                        fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
                        return
                    }
                } else {
                    break
                    fmt.Println("msgBuffChan is Closed")
                }
             case <-c.ExitBuffChan:
                 return
        }
    }
 }

results matching ""

    No results matching ""