并发编程基础
- 并发 vs. 并行
并发(Concurrency)与并行(Parallelism)是两个常常混淆的概念。并发指的是在同一时间段内处理多个任务,而并行则是指在同一时刻同时执行多个任务。Go语言的并发模型更侧重于并发,通过goroutines和channels来管理任务之间的交互和通信。
- Goroutines
Goroutine是Go语言中实现并发的核心。它类似于轻量级的线程,由Go运行时管理。与操作系统线程不同,goroutines的启动和管理成本非常低,可以轻松创建成千上万个goroutines。
- Channels
Channels是Go语言中的一种管道,用于在多个goroutines之间传递数据。它们确保了数据传递的同步性和安全性,使得多个goroutines可以无缝地协作。
Goroutines详解
Goroutines的创建与运行
创建一个goroutine非常简单,只需使用go
关键字即可。
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello, World!")
}
func main() {
go sayHello() // 创建一个新的goroutine
time.Sleep(time.Second)
}
在上面的例子中,sayHello
函数在一个新的goroutine中执行。
Goroutines的调度
Go运行时包含一个调度器,用于管理和调度goroutines的执行。调度器会根据系统资源和goroutines的状态动态调整其执行顺序,以确保高效的资源利用。
Goroutines的最佳实践
- 避免长时间阻塞:Goroutines应尽量避免长时间的阻塞操作,这会影响其他goroutines的执行。
- 使用
sync.WaitGroup
:在需要等待多个goroutines完成任务时,可以使用sync.WaitGroup
来同步它们。
package main
import (
"fmt"
"sync"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d startingn", id)
// 模拟工作
fmt.Printf("Worker %d donen", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i {
wg.Add(1)
go worker(i, &wg)
}
wg.Wait()
}
Channels详解
Channel的基本操作
Channels用于在多个goroutines之间传递数据。创建一个channel非常简单,只需使用make
关键字。
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
ch <- 42 // 发送数据到channel
}()
value := <-ch // 从channel接收数据
fmt.Println(value)
}
Buffered Channels与Unbuffered Channels
Channels可以是有缓冲的(buffered)或无缓冲的(unbuffered)。无缓冲的channel在发送和接收操作完成前会阻塞,而有缓冲的channel则允许一定数量的数据存储在缓冲区中。
代码语言:go复制package main
import "fmt"
func main() {
ch := make(chan int, 2) // 创建一个有缓冲的channel,容量为2
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
Channel的高级用法
- 关闭Channel:当不再需要向channel发送数据时,可以关闭它。
package main
import "fmt"
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i {
ch <- i
}
close(ch)
}()
for value := range ch {
fmt.Println(value)
}
}
- Select语句:Select语句用于处理多个channel的操作,可以同时等待多个channel的发送和接收操作。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "two"
}()
for i := 0; i < 2; i {
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}
}
高级并发模型
Select语句
Select语句用于同时处理多个channel的操作,可以在多个channel之间进行选择,并根据最先完成的操作执行相应的代码。它是处理多个并发操作的强大工具。
Context包的使用
context
包提供了对请求上下文的管理,允许在goroutines之间传递截止时间、取消信号和其他请求范围的数据。它在处理超时和取消操作时非常有用。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("operation successful")
case <-ctx.Done():
fmt.Println("operation timeout")
}
}
Sync包与Mutex
在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync
包中的互斥锁(Mutex)来保护共享资源。
package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter)
}
Sync包与Mutex
在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync
包中的互斥锁(Mutex)来保护共享资源。
go复制代码package main
import (
"fmt"
"sync"
)
var (
counter int
mu sync.Mutex
)
func increment(wg *sync.WaitGroup) {
defer wg.Done()
mu.Lock()
counter
mu.Unlock()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i {
wg.Add(1)
go increment(&wg)
}
wg.Wait()
fmt.Println("Final Counter:", counter)
}
Pipelines
Pipelines是将一系列处理步骤通过channels连接起来,实现数据的流水线处理。每个步骤在一个goroutine中独立运行,并通过channels进行数据传递。
代码语言:go复制go复制代码package main
import "fmt"
// 生成器:产生从0到num的整数
func generator(num int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i <= num; i {
out <- i
}
close(out)
}()
return out
}
// 平方计算器:接收整数并输出它们的平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for num := range in {
out <- num * num
}
close(out)
}()
return out
}
func main() {
gen := generator(10)
sq := square(gen)
for result := range sq {
fmt.Println(result)
}
}
Fan-out和Fan-in
Fan-out和Fan-in是一种常见的并发模式。Fan-out是将任务分发到多个goroutines中执行,Fan-in则是将多个goroutines的结果汇集到一个channel中。
代码语言:go复制go复制代码package main
import (
"fmt"
"sync"
)
// 任务函数:模拟耗时操作
func task(id int, wg *sync.WaitGroup, results chan<- int) {
defer wg.Done()
results <- id * 2 // 返回任务结果
}
func main() {
var wg sync.WaitGroup
results := make(chan int, 10) // 缓冲区大小为10
for i := 1; i <= 10; i {
wg.Add(1)
go task(i, &wg, results) // Fan-out
}
go func() {
wg.Wait()
close(results)
}()
for result := range results { // Fan-in
fmt.Println(result)
}
}
Worker Pools
Worker Pools模式用于限制并发goroutines的数量,防止资源耗尽。它通过一组固定数量的工作goroutines来处理任务队列中的任务。
代码语言:go复制go复制代码package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
results <- job * 2
fmt.Printf("Worker %d processed job %dn", id, job)
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
const numWorkers = 3
for w := 1; w <= numWorkers; w {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
for j := 1; j <= numJobs; j {
jobs <- j
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Println(result)
}
}
项目实例:并发爬虫
- 项目介绍
实现一个简单的并发爬虫,抓取多个网页的内容,并统计每个网页中的单词数量。通过goroutines实现并发抓取,通过channels实现结果的传递。
- 代码实现
package main
import (
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
)
func fetch(url string, ch chan<- map[string]int, wg *sync.WaitGroup) {
defer wg.Done()
resp, err := http.Get(url)
if err != nil {
fmt.Println("Error fetching:", url, err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
words := strings.Fields(string(body))
wordCount := make(map[string]int)
for _, word := range words {
wordCount[word]
}
ch <- wordCount
}
func main() {
urls := []string{
"https://example.com",
"https://golang.org",
"https://golang.org/doc",
}
ch := make
(chan map[string]int)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go fetch(url, ch, &wg)
}
go func() {
wg.Wait()
close(ch)
}()
totalWordCount := make(map[string]int)
for wordCount := range ch {
for word, count := range wordCount {
totalWordCount[word] = count
}
}
fmt.Println("Total Word Count:", totalWordCount)
}
- 性能优化
通过增加goroutines的数量和使用更高效的算法,可以进一步提升爬虫的性能。例如,使用sync.Map
替代传统的map来提高并发访问的效率。
我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!