基于select的多路复用

下面的程序会进行火箭发射的倒计时。time.Tick函数返回一个channel,程序会周期性地像一个节拍器一样向这个channel发送事件。每一个事件的值是一个时间戳,不过更有意思的是其传送方式。

package main

import (
        "fmt"
        "time"
)

func launch() {
        fmt.Println("发射!")
}

func main() {

        fmt.Println("准备发射火箭...")

        //time.Tick会返回一个channel,系统会定时向这个channel发信号
        tick := time.Tick(1 * time.Second)
        for countdown := 10; countdown > 0; countdown-- {
                fmt.Println(countdown)
                //从tick channel中取数据,没阻塞1s 返回一次
                <-tick
        }

        launch()

}

执行结果

准备发射火箭...
10
9
8
7
6
5
4
3
2
1
发射!

现在我们让这个程序支持在倒计时中,用户按下回车键时直接中断发射流程。首先,我们启动一个goroutine,这个goroutine会尝试从标准输入中读入一个单独的byte并且,如果成功了,会向名为abort的channel发送一个值。

abort := make(chan struct{})
go func() {
    os.Stdin.Read(make([]byte, 1)) // read a single byte
    abort <- struct{}{}
}()

现在每一次计数循环的迭代都需要等待两个channel中的其中一个返回事件了.

我们无法做到从每一个channel中接收信息,如果我们这么做的话,如果第一个channel中没有事件发过来那么程序就会立刻被阻塞,这样我们就无法收到第二个channel中发过来的事件。这时候我们需要多路复用(multiplex)这些操作了,为了能够多路复用,我们使用了select语句。

select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

上面是select语句的一般形式。和switch语句稍微有点相似,也会有几个case和最后的default选择分支。每一个case代表一个通信操作(在某个channel上进行发送或者接收),并且会包含一些语句组成的一个语句块。一个接收表达式可能只包含接收表达式自身(译注:不把接收到的值赋值给变量什么的),就像上面的第一个case,或者包含在一个简短的变量声明中,像第二个case里一样;第二种形式让你能够引用接收到的值。

select会等待case中有能够执行的case时去执行。当条件满足时,select才会去通信并执行case之后的语句;这时候其它通信是不会执行的。一个没有任何case的select语句写作select{},会永远地等待下去。

如果多个case同时就绪时,select会随机地选择一个执行,这样来保证每一个channel都有平等的被select的机会。增加前一个例子的buffer大小会使其输出变得不确定,因为当buffer既不为满也不为空时,select语句的执行情况就像是抛硬币的行为一样是随机的。

下面让我们的发射程序打印倒计时。这里的select语句会使每次循环迭代等待一秒来执行退出操作。

修改代码如下:

package main

import (
        "fmt"
        "os"
        "time"
)

func launch() {
        fmt.Println("发射!")
}

func main() {

        //创建abort channel
        abort := make(chan struct{})

        go func() {
                os.Stdin.Read(make([]byte, 1))
                abort <- struct{}{}
        }()

        fmt.Println("准备发射火箭...")

        //time.Tick会返回一个channel,系统会定时向这个channel发信号
        tick := time.Tick(1 * time.Second)
        for countdown := 10; countdown > 0; countdown-- {
                fmt.Println(countdown)
                select {
                case <-tick:
                        //从tick channel中取数据,没阻塞1s 返回一次
                        //...
                case <-abort:
                        fmt.Println("发射终止...")
                        return
                }
        }

        launch()

}

有时候我们希望能够从channel中发送或者接收值,并避免因为发送或者接收导致的阻塞,尤其是当channel没有准备好写或者读时。select语句就可以实现这样的功能。select会有一个default来设置当其它的操作都不能够马上被处理时程序需要执行哪些逻辑。

下面的select语句会在abort channel中有值时,从其中接收值;无值时什么都不做。这是一个非阻塞的接收操作;反复地做这样的操作叫做“轮询channel”。

select {
case <-abort:
    fmt.Printf("Launch aborted!\n")
    return
default:
    // do nothing
}

channel的零值是nil。也许会让你觉得比较奇怪,nil的channel有时候也是有一些用处的。因为对一个nil的channel发送和接收操作会永远阻塞,在select语句中操作nil的channel永远都不会被select到。

这使得我们可以用nil来激活或者禁用case,来达成处理其它输入或输出事件时超时和取消的逻辑。我们会在下一节中看到一个例子。

如果将我们并发echo服务器,加上超时机制,如果客户端10秒不发请求,那么服务器就主动断开连接.

package main

import (
        "bufio"
        "fmt"
        "log"
        "net"
        "strings"
        "time"
)

func main() {
        listener, err := net.Listen("tcp", "127.0.0.1:8001")
        if err != nil {
                log.Fatal(err)
        }

        for {
                conn, err := listener.Accept()
                if err != nil {
                        log.Print(err)
                        continue
                }
                fmt.Println("get new conn...")

                //处理客户端业务
                go handleConn(conn)
        }

}

func echo(c net.Conn, outstr string, delay time.Duration) {
        fmt.Fprintln(c, strings.ToUpper(outstr))
        time.Sleep(delay)
        fmt.Fprintln(c, outstr)
        time.Sleep(delay)
        fmt.Fprintln(c, strings.ToLower(outstr))
}

func handleConn(c net.Conn) {
        timeout := time.NewTicker(time.Second * 10)
        recvData := make(chan struct{})

        go func() {
                //input是一个Scanner类型,该类型可以通过Scan方法依次迭代从io设备中读数据,知道遇到eof为止
                input := bufio.NewScanner(c)
                for input.Scan() {
                        //Scan方法 如果缓冲有数据会返回true,否则返回false
                        //如果有数据 input.Text()可以取出
                        recvData <- struct{}{}
                        go echo(c, input.Text(), 1*time.Second)
                }

        }()

        for {
                select {
                case <-recvData:
                        //有数据发送过来
                        fmt.Println("get Data reset timeout to 5s")
                        timeout.Stop()
                        timeout = time.NewTicker(time.Second * 5)
                case <-timeout.C:
                        //客户端超时,需要断开连接
                        fmt.Println("time out channel...")
                        c.Close()
                        timeout.Stop()
                        return
                }
        }
}

results matching ""

    No results matching ""