1. 引言
服务端程序每一时刻都在经受着大量并发流量的考验,而如今,CPU 指令运行频率的提升已经面临瓶颈,只能通过核心数的增长来大幅提升其指令的执行能力。 因此,现代程序设计中,并发编程的支持就显得越来越重要。 GoLang 进行并发编程十分轻松,他有两种风格可供选择:
- goroutine 和通道
- 通过共享内存同步的传统多线程模型
本文,我们就来详细介绍一下 goroutine 与通道机制如何来使用。
2. goroutine
GoLang 中,goroutine 是最为简单的一种并发执行机制,每一个并发执行的活动都被称为 goroutine,每个 goroutine 类似于一个线程,但它与线程只有着非常大的差别,这将在下一篇文章中进行分析和讲述。 当程序启动时,用来执行 main 函数的 goroutine 被称为主 goroutine,此后,只要在调用函数时,前面加上关键词 go,就可以创建一个新的 goroutine:
f() // 调用函数 f(),并等待他返回 go f() // 并发调用函数 f(),不用等待
2.1. 示例
下面的例子展示了主 goroutine 运行的同时,并发执行打点 goroutine,每毫秒更新一次标准输出:
代码语言:javascript复制package main
import (
"fmt"
"time"
)
func fib(n int) int {
if n < 2 {
return n
}
return fib(n-1) fib(n-2)
}
func ticking(delay time.Duration) {
arrows := [...]string{"-", "\", "|", "/"}
for i := 1; true; i {
time.Sleep(delay)
fmt.Printf("r%s %dms", arrows[i % 4], i)
}
}
func main() {
timestamp := time.Now().UnixNano() / 1e6
go ticking(100 * time.Millisecond)
const n = 45
result := fib(n)
fmt.Printf("nFibonacci(%d) = %dn", n, result)
fmt.Printf("Use time: %.2fms", float64(time.Now().UnixNano() / 1e6 - timestamp)/100.0)
}
可以看到标准输出伴随着一个字符的直线在旋转的同时,他后面的数字在增加,直到斐波那契数列指定位数的值完成计算并输出:
- 108ms Fibonacci(45) = 1134903170 Use time: 108.24ms
一旦主 goroutine 运行结束,所有 goroutine 都会暴力地直接中止运行,然后程序退出。
3. 通过网络进行 goroutine 间的通信 — 标准库 net 包的使用
和 java 等很多语言中的线程一样,goroutine 也不能被其他 goroutine 中止,但多个 goroutine 之间可以进行通信。 通过网络进行通信是非常常用的并发通信机制,在 golang 中,net 包提供了 TCP、UDP、域套接字 的支持。
3.1. 通过 TCP 实现 goroutine 间通信
TCP 是一种非常常用的网络通信协议,关于 TCP 的详细介绍,可以参看主页君此前的文章: 传输控制协议 — TCP TCP连接的建立和终止
下面的代码展示了使用 net 包提供的方法进行 TCP 通信的示例:
代码语言:javascript复制package main
import (
"bufio"
"fmt"
"io"
"net"
"os"
"time"
)
func client(conn *net.TCPConn) {
defer func() {
_ = conn.Close()
}()
reader := bufio.NewReader(conn)
b := []byte(time.Now().String() " " conn.LocalAddr().String() " say hello to Servern")
_, _ = conn.Write(b)
msg, err := reader.ReadString('n')
if err != nil || err == io.EOF {
_, _ = fmt.Fprintf(os.Stderr, "client read failed: %v", err)
}
fmt.Println("client recieved: " msg)
}
func server(tcpListener *net.TCPListener) {
defer tcpListener.Close()
fmt.Println("Server ready to read ...")
serverConn, err := tcpListener.AcceptTCP()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "server accept failed: %v", err)
return
}
defer func() {
_ = serverConn.Close()
}()
reader := bufio.NewReader(serverConn)
message, err := reader.ReadString('n')
if nil != err || err == io.EOF {
_, _ = fmt.Fprintf(os.Stderr, "server read failed: %v", err)
return
}
fmt.Println("server recieved: " message)
b := []byte(serverConn.RemoteAddr().String() " Server say worldn")
_, _ = serverConn.Write(b)
}
func main() {
tcpAddrServer, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
tcpListener, _ := net.ListenTCP("tcp", tcpAddrServer)
go server(tcpListener)
tcpAddrCient, _ := net.ResolveTCPAddr("tcp", "localhost:8461")
clientConn, err := net.DialTCP("tcp", nil, tcpAddrCient)
if nil != err {
fmt.Println("Client connect error ! " err.Error())
return
}
fmt.Println(clientConn.LocalAddr().String() " : Client connected!")
go client(clientConn)
for {}
}
上面的代码由主 goroutine 分别依次启动了两个 goroutine — server 和 client。 server 等待 client 传来的字符串,打印并回传一句字符串。 client 传递一句字符串给 server 后将 server 回传的字符串打印出来。
执行代码,打印出了:
Server ready to read … 127.0.0.1:5777 : Client connected! server recieved: 2019-11-14 18:11:02.8257556 0800 CST m= 3.096348701 127.0.0.1:5777 say hello to Server client recieved: 127.0.0.1:5777 Server say world
上述代码显得较为繁琐,在实际的 goroutine 通信中,如果是在 unix 环境下,选择 unix 域套接字进行 goroutine 间通信是更好的选择。 关于使用 net 包进行网络通信,后续会有文章进行详细介绍,敬请期待。
4. 通道
上述通过 net 包实现的网络通信看上去非常复杂,别急,GoLang 提供了更为好用的连接 goroutine 的工具 — 通道。 通道实现了一个 goroutine 发送特定值到另一个 goroutine 的通信机制,它与 unix 环境中的管道非常类似。 此前,我们已经介绍过 unix 环境中的管道的使用,他是 unix 环境下最为常用的进程间通信方式: 管道
4.1. 通道的类型
通道的类型就是 chan xxx,其中 xxx 可以是任何类型名,例如:
chan int chan struct{}
上述这样类型的通道既可以用来发送也可以用来接收数据。 同样,我们也可以声明只用于发送或接收的单向通道:
chan<- int — 只用于发送的通道 <-chan int — 只用于接收的通道
4.2. 通道的创建和关闭
4.2.1. 通道的创建
和 map 一样,通道通过内置函数 make 就可以实现创建:
代码语言:javascript复制ch := make(chan int)
ch := make(chan int, 3)
make 的第二个参数是可选的,用来表示创建的缓冲区大小,默认表示无缓冲区。 如果缓冲区已满或没有缓冲区,那么在通道上的发送操作会被阻塞,直到另一个 goroutine 在该通道上接收数据或者发送操作被中止。 同样,当缓冲区为空或没有缓冲区,接收操作也会被阻塞,直到由发送方发送新的数据。
特别的,有时我们并不想通过通道传递任何数据,只是想要通过通道发送一个信号,此时,我们通常使用 chan struct{} 类型的通道,传递一个 struct{}{} 空对象。
4.2.2. 通道的关闭
内置函数同样提供了关闭通道的方法:
代码语言:javascript复制close(ch)
在通道关闭后,任何发送操作都会产生宕机,而接收操作会读取通道中所有剩余数据。 如果在通道关闭后,所有数据已经被接收,再次执行接收操作会立即返回对应类型的零值。
在 GoLang 中,如果在使用文件后没有执行 close 操作,将会造成无法回收的内存泄漏,但对于通道来说不会,垃圾回收器会根据通道是否可以被访问来决定是否回收相应的资源,无论通道是否进行过 close 操作。
4.3. 通道的发送和接收
代码语言:javascript复制ch <- x // 发送语句,将变量 x 的值发送到通道 ch
x = <-ch // 接收语句,接收通道 ch 中的数据并赋值给变量 x
<-ch // 接受语句,丢弃接收到的通道中的数据
4.4. 示例
4.4.1. 双向通道
有了通道,我们上面通过 TCP 实现通信的例子就可以十分简化了:
代码语言:javascript复制package main
import (
"fmt"
)
func client(ch chan string, sigover chan struct{}) {
str := "Client say hello"
ch <- str
str = <-ch
fmt.Println("client recieved: " str)
sigover <- struct{}{}
}
func server(ch chan string) {
str := <-ch
fmt.Println("server recieved: " str)
str = "Server say world"
ch <- str
close(ch)
}
func main() {
ch := make(chan string)
sigover := make(chan struct{})
go client(ch, sigover)
go server(ch)
<-sigover
}
打印出了:
server recieved: Client say hello client recieved: Server say world
这个例子展示了通过 chan struct{} 类型的通道报告 goroutine 运行结束的机制,这是非常常用的一种方法。
4.4.2. 单向通道
下面的例子通过单向通道计算了 1 到 10 的平方:
代码语言:javascript复制package main
import "fmt"
func counter(out chan<- int) {
for x := 1; x <= 10; x {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v*v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
nums := make(chan int)
squares := make(chan int)
go counter(nums)
go squarer(squares, nums)
printer(squares)
}
打印出了:
1 4 9 16 25 36 49 64 81 100
4.5. 缓冲通道
上面通道的创建操作中,我们已经讲述过具有缓冲的通道的创建和使用。 带有缓冲区的通道可以看作是一个队列,进行先入先出操作。
4.5.1. 获取缓冲通道的缓冲区容量和已缓冲元素数
代码语言:javascript复制cap(ch) // 获取缓冲通道缓冲区容量
len(ch) // 获取缓冲通道已缓冲元素数
通过上述两个方法,我们可以实现非阻塞的通道读写操作。
5. 通道的多路复用 — select
通常,操作系统中的 IO 操作同时只能对一个 fd 执行读取或写入操作,但对于服务端程序来说,多个客户端与服务端建立连接,任何时刻任何连接都有可能有数据到来,那么如果使用传统的阻塞式 IO,我们的进程一旦阻塞等待某个连接,其他连接都将无法被处理,而入股哦使用非阻塞式 IO,那么一遍遍轮询全部连接将大大降低执行效率。 现代操作系统提供了这样问题的理想解决方案 — IO 复用模型。 IO复用 & UNIX下的五种IO模型
GoLang 中,通道的使用也存在同样的问题,那就是按照上面描述的通道的使用,一个 goroutine 同时只能与另一个 goroutine 通信,那么,如果一个 goroutine 要同时接收多个通道中数据的到来,上面的使用方式就显得力不从心了。 GoLang 中提供了与操作系统中的 IO 复用模型类似的通道多路复用模型 — select。
5.1. 使用方式
select 的使用方式与 switch 语句非常相似:
代码语言:javascript复制select {
case value1 <- ch1:
// do something
case value2 <- ch2:
// do something
}
5.2. 示例
下面的例子是基于上面计算数字平方的修改:
代码语言:javascript复制package main
import (
"fmt"
"os"
"time"
)
func counter(out chan<- int, abort <-chan struct{}) {
tick := time.Tick(time.Second)
i := 1
for {
select {
case <-tick:
out <- i
i = 1
case <-abort:
close(out)
return
}
}
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v*v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func setabort(abort chan struct{}) {
_, _ = os.Stdin.Read(make([]byte, 1)) // 等待读取单个字节
abort <- struct{}{}
}
func main() {
nums := make(chan int)
squares := make(chan int)
abort := make(chan struct{})
fmt.Println("Calculate the square of each number per second, press any key to abort")
go setabort(abort)
go counter(nums, abort)
go squarer(squares, nums)
printer(squares)
}
我们将上面十个数的循环改成了无限循环,通过接受一个字符来向 abort 通道发送一个信号,从而实现流程的中止。 counter 函数同时从每秒生成心跳的 tick 通道和随时可能产生中止信号的 abort 通道读取数据,此时,select 多路复用就显得非常有用了。 执行程序并在适当时候输入字符 a 生成中止信号,程序打印出了:
1 4 9 a
5.3. 通过 select 实现非阻塞式通道读写
与 switch 语句一样,select 也可以加入 default 语句,如果所有的 case 条件中的通道均没有数据就绪,那么 select 语句不会阻塞等待,而是会去执行 default 语句,这就实现了通道的非阻塞式读写。 下面的例子展示了通道的非阻塞式读写:
代码语言:javascript复制package main
import "fmt"
func main() {
abort := make(chan struct{})
over := make(chan struct{})
go func(over chan struct{}) {
select {
case <-abort:
fmt.Println("Launch aborted!")
return
default:
fmt.Println("No abort signal launched")
}
over <- struct{}{}
}(over)
<-over
}
执行程序,因为没有 goroutine 向这个匿名 goroutine 传递 abort 信号,所以他打印出了:
No abort signal launched
6. 参考资料
《Go 语言程序设计》。