基于select的多路复用
下面的程序会进行火箭发射的倒计时。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件。每一个事件的值是一个时间戳,不过更有意思的是其传送方式。
package main
import (
"fmt"
"time"
)
func launch() {
fmt.Println("发射!")
}
func main() {
fmt.Println("准备发射火箭...")
//time.Tick会返回一个channel,系统会定时向这个channel发信号
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
//从tick channel中取数据,没阻塞1s 返回一次
<-tick
}
launch()
}
执行结果
准备发射火箭...
10
9
8
7
6
5
4
3
2
1
发射!
现在我们让这个程序支持在倒计时中,用户按下回车键时直接中断发射流程。首先,我们启动一个goroutine,这个goroutine会尝试从标准输入中读入一个单独的byte并且,如果成功了,会向名为abort的channel发送一个值。
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
现在每一次计数循环的迭代都需要等待两个channel中的其中一个返回事件了.
我们无法做到从每一个channel中接收信息,如果我们这么做的话,如果第一个channel中没有事件发过来那么程序就会立刻被阻塞,这样我们就无法收到第二个channel中发过来的事件。这时候我们需要多路复用(multiplex)这些操作了,为了能够多路复用,我们使用了select语句。
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
上面是select语句的一般形式。和switch语句稍微有点相似,也会有几个case和最后的default选择分支。每一个case代表一个通信操作(在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够引用接收到的值。
select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。
如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加前一个例子的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。
下面让我们的发射程序打印倒计时。这里的select语句会使每次循环迭代等待一秒来执行退出操作。
修改代码如下:
package main
import (
"fmt"
"os"
"time"
)
func launch() {
fmt.Println("发射!")
}
func main() {
//创建abort channel
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- struct{}{}
}()
fmt.Println("准备发射火箭...")
//time.Tick会返回一个channel,系统会定时向这个channel发信号
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
//从tick channel中取数据,没阻塞1s 返回一次
//...
case <-abort:
fmt.Println("发射终止...")
return
}
}
launch()
}
有时候我们希望能够从channel中发送或者接收值,并避免因为发送或者接收导致的阻塞,尤其是当channel没有准备好写或者读时。select语句就可以实现这样的功能。select会有一个default来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑。
下面的select语句会在abort channel中有值时,从其中接收值;无值时什么都不做。这是一个非阻塞的接收操作;反复地做这样的操作叫做“轮询channel”。
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。
这使得我们可以用nil来激活或者禁用case,来达成处理其它输入或输出事件时超时和取消的逻辑。我们会在下一节中看到一个例子。
如果将我们并发echo服务器,加上超时机制,如果客户端10秒不发请求,那么服务器就主动断开连接.
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:8001")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
fmt.Println("get new conn...")
//处理客户端业务
go handleConn(conn)
}
}
func echo(c net.Conn, outstr string, delay time.Duration) {
fmt.Fprintln(c, strings.ToUpper(outstr))
time.Sleep(delay)
fmt.Fprintln(c, outstr)
time.Sleep(delay)
fmt.Fprintln(c, strings.ToLower(outstr))
}
func handleConn(c net.Conn) {
timeout := time.NewTicker(time.Second * 10)
recvData := make(chan struct{})
go func() {
//input是一个Scanner类型,该类型可以通过Scan方法依次迭代从io设备中读数据,知道遇到eof为止
input := bufio.NewScanner(c)
for input.Scan() {
//Scan方法 如果缓冲有数据会返回true,否则返回false
//如果有数据 input.Text()可以取出
recvData <- struct{}{}
go echo(c, input.Text(), 1*time.Second)
}
}()
for {
select {
case <-recvData:
//有数据发送过来
fmt.Println("get Data reset timeout to 5s")
timeout.Stop()
timeout = time.NewTicker(time.Second * 5)
case <-timeout.C:
//客户端超时,需要断开连接
fmt.Println("time out channel...")
c.Close()
timeout.Stop()
return
}
}
}