8.4 Zinx-V0.8代码实现

好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。

zinx/znet/server.go

//开启网络服务
func (s *Server) Start() {

    //...

    //开启一个go去做服务端Linster业务
    go func() {
        //0 启动worker工作池机制
        s.msgHandler.StartWorkerPool()

        //1 获取一个TCP的Addr
        addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
        if err != nil {
            fmt.Println("resolve tcp addr err: ", err)
            return
        }

        //...
        //...

        }
    }()
}

其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。

所以应该在Connection的StartReader()方法中修改:

zinx/znet/connection.go

/*
    读消息Goroutine,用于从客户端中读取数据
 */
func (c *Connection) StartReader() {
    fmt.Println("Reader Goroutine is  running")
    defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
    defer c.Stop()

    for  {
        // 创建拆包解包的对象...

        //读取客户端的Msg head...

        //拆包,得到msgid 和 datalen 放在msg中...

        //根据 dataLen 读取 data,放在msg.Data中...

        //得到当前客户端请求的Request数据
        req := Request{
            conn:c,
            msg:msg,
        }

        if utils.GlobalObject.WorkerPoolSize > 0 {
            //已经启动工作池机制,将消息交给Worker处理
            c.MsgHandler.SendMsgToTaskQueue(&req)
        } else {
            //从绑定好的消息和对应的处理方法中执行对应的Handle方法
            go c.MsgHandler.DoMsgHandler(&req)
        }
    }
}

这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。

results matching ""

    No results matching ""