串联的Channels(Pipeline)

Channels也可以用于将多个goroutine连接在一起,一个Channel的输出作为下一个Channel的输入。这种串联的Channels就是所谓的管道(pipeline)。下面的程序用两个channels将三个goroutine串联起来

第一个goroutine是一个计数器,用于生成0、1、2、……形式的整数序列,然后通过channel将该整数序列发送给第二个goroutine;第二个goroutine是一个求平方的程序,对收到的每个整数求平方,然后将平方后的结果通过第二个channel发送给第三个goroutine;第三个goroutine是一个打印程序,打印收到的每个整数。为了保持例子清晰,我们有意选择了非常简单的函数,当然三个goroutine的计算很简单,在现实中确实没有必要为如此简单的运算构建三个goroutine。

package main

import (
        "fmt"
        "time"
)

//Counter ---naturals---> Squarer ---squares---> Printer

func main() {
        naturals := make(chan int)
        squares := make(chan int)

        //Counter
        go func() {
                for x := 0; ; x++ {
                        naturals <- x
                        time.Sleep(1 * time.Second)
                }
        }()

        //Squarer
        go func() {
                for {
                        x := <-naturals
                        squares <- x * x
                }
        }()

        //Printer
        for {
                fmt.Println(<-squares)
        }
}

如您所料,上面的程序将生成0、1、4、9、……形式的无穷数列。像这样的串联Channels的管道(Pipelines)可以用在需要长时间运行的服务中,每个长时间运行的goroutine可能会包含一个死循环,在不同goroutine的死循环内部使用串联的Channels来通信。但是,如果我们希望通过Channels只发送有限的数列该如何处理呢?

如果发送者知道,没有更多的值需要发送到channel的话,那么让接收者也能及时知道没有多余的值可接收将是有用的,因为接收者可以停止不必要的接收等待。这可以通过内置的close函数来关闭channel实现:

close(naturals)

当一个channel被关闭后,再向该channel发送数据将导致panic异常。当一个被关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。关闭上面例子中的naturals变量对应的channel并不能终止循环,它依然会收到一个永无休止的零值序列,然后将它们发送给打印者goroutine。

没有办法直接测试一个channel是否被关闭,但是接收操作有一个变体形式:它多接收一个结果,多接收的第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。使用这个特性,我们可以修改squarer函数中的循环代码,当naturals对应的channel被关闭并没有值可接收时跳出循环,并且也关闭squares对应的channel.

修改代码暂时如下:

package main

import (
        "fmt"
        "time"
)

//Counter ---naturals---> Squarer ---squares---> Printer

func main() {
        naturals := make(chan int)
        squares := make(chan int)

        //Counter
        go func() {
                for x := 0; x < 5; x++ {
                        naturals <- x
                        time.Sleep(1 * time.Second)
                }
                close(naturals)
        }()

        //Squarer
        go func() {
                for {
                        x, ok := <-naturals
                        if !ok {
                                //Counter 已经关闭了naturals
                                break
                        }
                        squares <- x * x
                }
                close(squares)
        }()

        //Printer
        for {
                x, ok := <-squares
                if !ok {
                        //Squarer 已经关闭了naturals
                        break
                }
                fmt.Println(x)
        }
        fmt.Println("done!")
}

因为上面的语法是笨拙的,而且这种处理模式很常见,因此Go语言的range循环可直接在channels上面迭代。使用range循环是上面处理模式的简洁语法,它依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环。

最终修改代码如下:

package main

import (
        "fmt"
        "time"
)

//Counter ---naturals---> Squarer ---squares---> Printer

func main() {
        naturals := make(chan int)
        squares := make(chan int)

        //Counter
        go func() {
                for x := 0; x < 5; x++ {
                        naturals <- x
                        time.Sleep(1 * time.Second)
                }
                close(naturals)
        }()

        //Squarer
        go func() {

                for x := range naturals {
                        squares <- x * x
                }
                //如果for循环退出,代表naturals已经关闭
                close(squares)
        }()

        //Printer
        for x := range squares {
                fmt.Println(x)
        }
        fmt.Println("done!")
}

其实你并不需要关闭每一个channel。只有当需要告诉接收者goroutine,所有的数据已经全部发送时才需要关闭channel。不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。(不要将关闭一个打开文件的操作和关闭一个channel操作混淆。对于每个打开的文件,都需要在不使用的时候调用对应的Close方法来关闭文件。)

试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。关闭一个channels还会触发一个广播机制,后续讨论。

results matching ""

    No results matching ""