8.4 Zinx-V0.8代码实现
好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()
方法中,在服务端Accept之前,启动Worker工作池。
zinx/znet/server.go
//开启网络服务
func (s *Server) Start() {
//...
//开启一个go去做服务端Linster业务
go func() {
//0 启动worker工作池机制
s.msgHandler.StartWorkerPool()
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//...
//...
}
}()
}
其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。
所以应该在Connection的StartReader()
方法中修改:
zinx/znet/connection.go
/*
读消息Goroutine,用于从客户端中读取数据
*/
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
// 创建拆包解包的对象...
//读取客户端的Msg head...
//拆包,得到msgid 和 datalen 放在msg中...
//根据 dataLen 读取 data,放在msg.Data中...
//得到当前客户端请求的Request数据
req := Request{
conn:c,
msg:msg,
}
if utils.GlobalObject.WorkerPoolSize > 0 {
//已经启动工作池机制,将消息交给Worker处理
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req)
}
}
}
这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize
的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。