GoLang 的并发编程与通信(一) -- goroutine 与通道

2022-06-27 14:11:11 浏览数 (1)

1. 引言

服务端程序每一时刻都在经受着大量并发流量的考验,而如今,CPU 指令运行频率的提升已经面临瓶颈,只能通过核心数的增长来大幅提升其指令的执行能力。 因此,现代程序设计中,并发编程的支持就显得越来越重要。 GoLang 进行并发编程十分轻松,他有两种风格可供选择:

  1. goroutine 和通道
  2. 通过共享内存同步的传统多线程模型

本文,我们就来详细介绍一下 goroutine 与通道机制如何来使用。

2. goroutine

GoLang 中,goroutine 是最为简单的一种并发执行机制,每一个并发执行的活动都被称为 goroutine,每个 goroutine 类似于一个线程,但它与线程只有着非常大的差别,这将在下一篇文章中进行分析和讲述。 当程序启动时,用来执行 main 函数的 goroutine 被称为主 goroutine,此后,只要在调用函数时,前面加上关键词 go,就可以创建一个新的 goroutine:

f() // 调用函数 f(),并等待他返回 go f() // 并发调用函数 f(),不用等待

2.1. 示例

下面的例子展示了主 goroutine 运行的同时,并发执行打点 goroutine,每毫秒更新一次标准输出:

代码语言:javascript复制
package main

import (
    "fmt"
    "time"
)

func fib(n int) int {
    if n < 2 {
        return n
    }
    return fib(n-1)   fib(n-2)
}

func ticking(delay time.Duration) {
    arrows := [...]string{"-", "\", "|", "/"}
    for i := 1; true; i   {
        time.Sleep(delay)
        fmt.Printf("r%s %dms", arrows[i % 4], i)
    }
}

func main() {
    timestamp := time.Now().UnixNano() / 1e6
    go ticking(100 * time.Millisecond)
    const n = 45
    result := fib(n)
    fmt.Printf("nFibonacci(%d) = %dn", n, result)
    fmt.Printf("Use time: %.2fms", float64(time.Now().UnixNano() / 1e6 - timestamp)/100.0)
}

可以看到标准输出伴随着一个字符的直线在旋转的同时,他后面的数字在增加,直到斐波那契数列指定位数的值完成计算并输出:

- 108ms Fibonacci(45) = 1134903170 Use time: 108.24ms

一旦主 goroutine 运行结束,所有 goroutine 都会暴力地直接中止运行,然后程序退出。

3. 通过网络进行 goroutine 间的通信 — 标准库 net 包的使用

和 java 等很多语言中的线程一样,goroutine 也不能被其他 goroutine 中止,但多个 goroutine 之间可以进行通信。 通过网络进行通信是非常常用的并发通信机制,在 golang 中,net 包提供了 TCP、UDP、域套接字 的支持。

3.1. 通过 TCP 实现 goroutine 间通信

TCP 是一种非常常用的网络通信协议,关于 TCP 的详细介绍,可以参看主页君此前的文章: 传输控制协议 — TCP TCP连接的建立和终止

下面的代码展示了使用 net 包提供的方法进行 TCP 通信的示例:

代码语言:javascript复制
package main

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "os"
    "time"
)

func client(conn *net.TCPConn) {
    defer func() {
        _ = conn.Close()
    }()

    reader := bufio.NewReader(conn)
    b := []byte(time.Now().String()   " "   conn.LocalAddr().String()   " say hello to Servern")
    _, _ = conn.Write(b)

    msg, err := reader.ReadString('n')
    if err != nil || err == io.EOF {
        _, _ = fmt.Fprintf(os.Stderr, "client read failed: %v", err)
    }
    fmt.Println("client recieved: "   msg)
}

func server(tcpListener *net.TCPListener) {
    defer tcpListener.Close()
    fmt.Println("Server ready to read ...")
    serverConn, err := tcpListener.AcceptTCP()
    if err != nil {
        _, _ = fmt.Fprintf(os.Stderr, "server accept failed: %v", err)
        return
    }
    defer func() {
        _ = serverConn.Close()
    }()

    reader := bufio.NewReader(serverConn)
    message, err := reader.ReadString('n')
    if nil != err || err == io.EOF {
        _, _ = fmt.Fprintf(os.Stderr, "server read failed: %v", err)
        return
    }
    fmt.Println("server recieved: "   message)
    b := []byte(serverConn.RemoteAddr().String()   " Server say worldn")
    _, _ = serverConn.Write(b)
}

func main() {
    tcpAddrServer, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
    tcpListener, _ := net.ListenTCP("tcp", tcpAddrServer)
    go server(tcpListener)

    tcpAddrCient, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
    clientConn, err := net.DialTCP("tcp", nil, tcpAddrCient)
    if nil != err {
        fmt.Println("Client connect error ! "   err.Error())
        return
    }

    fmt.Println(clientConn.LocalAddr().String()   " : Client connected!")
    go client(clientConn)
    for {}
}

上面的代码由主 goroutine 分别依次启动了两个 goroutine — server 和 client。 server 等待 client 传来的字符串,打印并回传一句字符串。 client 传递一句字符串给 server 后将 server 回传的字符串打印出来。

执行代码,打印出了:

Server ready to read … 127.0.0.1:5777 : Client connected! server recieved: 2019-11-14 18:11:02.8257556 0800 CST m= 3.096348701 127.0.0.1:5777 say hello to Server client recieved: 127.0.0.1:5777 Server say world

上述代码显得较为繁琐,在实际的 goroutine 通信中,如果是在 unix 环境下,选择 unix 域套接字进行 goroutine 间通信是更好的选择。 关于使用 net 包进行网络通信,后续会有文章进行详细介绍,敬请期待。

4. 通道

上述通过 net 包实现的网络通信看上去非常复杂,别急,GoLang 提供了更为好用的连接 goroutine 的工具 — 通道。 通道实现了一个 goroutine 发送特定值到另一个 goroutine 的通信机制,它与 unix 环境中的管道非常类似。 此前,我们已经介绍过 unix 环境中的管道的使用,他是 unix 环境下最为常用的进程间通信方式: 管道

4.1. 通道的类型

通道的类型就是 chan xxx,其中 xxx 可以是任何类型名,例如:

chan int chan struct{}

上述这样类型的通道既可以用来发送也可以用来接收数据。 同样,我们也可以声明只用于发送或接收的单向通道:

chan<- int — 只用于发送的通道 <-chan int — 只用于接收的通道

4.2. 通道的创建和关闭

4.2.1. 通道的创建

和 map 一样,通道通过内置函数 make 就可以实现创建:

代码语言:javascript复制
ch := make(chan int)
ch := make(chan int, 3)

make 的第二个参数是可选的,用来表示创建的缓冲区大小,默认表示无缓冲区。 如果缓冲区已满或没有缓冲区,那么在通道上的发送操作会被阻塞,直到另一个 goroutine 在该通道上接收数据或者发送操作被中止。 同样,当缓冲区为空或没有缓冲区,接收操作也会被阻塞,直到由发送方发送新的数据。

特别的,有时我们并不想通过通道传递任何数据,只是想要通过通道发送一个信号,此时,我们通常使用 chan struct{} 类型的通道,传递一个 struct{}{} 空对象。

4.2.2. 通道的关闭

内置函数同样提供了关闭通道的方法:

代码语言:javascript复制
close(ch)

在通道关闭后,任何发送操作都会产生宕机,而接收操作会读取通道中所有剩余数据。 如果在通道关闭后,所有数据已经被接收,再次执行接收操作会立即返回对应类型的零值。

在 GoLang 中,如果在使用文件后没有执行 close 操作,将会造成无法回收的内存泄漏,但对于通道来说不会,垃圾回收器会根据通道是否可以被访问来决定是否回收相应的资源,无论通道是否进行过 close 操作。

4.3. 通道的发送和接收

代码语言:javascript复制
ch <- x    // 发送语句,将变量 x 的值发送到通道 ch
x = <-ch   // 接收语句,接收通道 ch 中的数据并赋值给变量 x
<-ch       // 接受语句,丢弃接收到的通道中的数据

4.4. 示例

4.4.1. 双向通道

有了通道,我们上面通过 TCP 实现通信的例子就可以十分简化了:

代码语言:javascript复制
package main

import (
    "fmt"
)

func client(ch chan string, sigover chan struct{}) {
    str := "Client say hello"
    ch <- str
    str = <-ch
    fmt.Println("client recieved: "   str)
    sigover <- struct{}{}
}

func server(ch chan string) {
    str := <-ch
    fmt.Println("server recieved: "   str)
    str = "Server say world"
    ch <- str
    close(ch)
}

func main() {
    ch := make(chan string)
    sigover := make(chan struct{})
    go client(ch, sigover)
    go server(ch)
    <-sigover
}

打印出了:

server recieved: Client say hello client recieved: Server say world

这个例子展示了通过 chan struct{} 类型的通道报告 goroutine 运行结束的机制,这是非常常用的一种方法。

4.4.2. 单向通道

下面的例子通过单向通道计算了 1 到 10 的平方:

代码语言:javascript复制
package main

import "fmt"

func counter(out chan<- int) {
    for x := 1; x <= 10; x   {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v*v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    nums := make(chan int)
    squares := make(chan int)

    go counter(nums)
    go squarer(squares, nums)
    printer(squares)
}

打印出了:

1 4 9 16 25 36 49 64 81 100

4.5. 缓冲通道

上面通道的创建操作中,我们已经讲述过具有缓冲的通道的创建和使用。 带有缓冲区的通道可以看作是一个队列,进行先入先出操作。

4.5.1. 获取缓冲通道的缓冲区容量和已缓冲元素数

代码语言:javascript复制
cap(ch)  // 获取缓冲通道缓冲区容量
len(ch)  // 获取缓冲通道已缓冲元素数

通过上述两个方法,我们可以实现非阻塞的通道读写操作。

5. 通道的多路复用 — select

通常,操作系统中的 IO 操作同时只能对一个 fd 执行读取或写入操作,但对于服务端程序来说,多个客户端与服务端建立连接,任何时刻任何连接都有可能有数据到来,那么如果使用传统的阻塞式 IO,我们的进程一旦阻塞等待某个连接,其他连接都将无法被处理,而入股哦使用非阻塞式 IO,那么一遍遍轮询全部连接将大大降低执行效率。 现代操作系统提供了这样问题的理想解决方案 — IO 复用模型。 IO复用 & UNIX下的五种IO模型

GoLang 中,通道的使用也存在同样的问题,那就是按照上面描述的通道的使用,一个 goroutine 同时只能与另一个 goroutine 通信,那么,如果一个 goroutine 要同时接收多个通道中数据的到来,上面的使用方式就显得力不从心了。 GoLang 中提供了与操作系统中的 IO 复用模型类似的通道多路复用模型 — select。

5.1. 使用方式

select 的使用方式与 switch 语句非常相似:

代码语言:javascript复制
select {
case value1 <- ch1:
    // do something
case value2 <- ch2:
    // do something
}

5.2. 示例

下面的例子是基于上面计算数字平方的修改:

代码语言:javascript复制
package main

import (
    "fmt"
    "os"
    "time"
)

func counter(out chan<- int, abort <-chan struct{}) {
    tick := time.Tick(time.Second)
    i := 1
    for {
        select {
        case <-tick:
            out <- i
            i  = 1
        case <-abort:
            close(out)
            return
        }
    }
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v*v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func setabort(abort chan struct{}) {
    _, _ = os.Stdin.Read(make([]byte, 1)) // 等待读取单个字节
    abort <- struct{}{}
}

func main() {
    nums := make(chan int)
    squares := make(chan int)
    abort := make(chan struct{})

    fmt.Println("Calculate the square of each number per second, press any key to abort")
    go setabort(abort)
    go counter(nums, abort)
    go squarer(squares, nums)
    printer(squares)
}

我们将上面十个数的循环改成了无限循环,通过接受一个字符来向 abort 通道发送一个信号,从而实现流程的中止。 counter 函数同时从每秒生成心跳的 tick 通道和随时可能产生中止信号的 abort 通道读取数据,此时,select 多路复用就显得非常有用了。 执行程序并在适当时候输入字符 a 生成中止信号,程序打印出了:

1 4 9 a

5.3. 通过 select 实现非阻塞式通道读写

与 switch 语句一样,select 也可以加入 default 语句,如果所有的 case 条件中的通道均没有数据就绪,那么 select 语句不会阻塞等待,而是会去执行 default 语句,这就实现了通道的非阻塞式读写。 下面的例子展示了通道的非阻塞式读写:

代码语言:javascript复制
package main

import "fmt"

func main() {
    abort := make(chan struct{})
    over := make(chan struct{})
    go func(over chan struct{}) {
        select {
        case <-abort:
            fmt.Println("Launch aborted!")
            return
        default:
            fmt.Println("No abort signal launched")
        }
        over <- struct{}{}
    }(over)
    <-over
}

执行程序,因为没有 goroutine 向这个匿名 goroutine 传递 abort 信号,所以他打印出了:

No abort signal launched

6. 参考资料

《Go 语言程序设计》。

0 人点赞