深入理解Go语言的并发模型

2024-06-28 22:25:57 浏览数 (1)

并发编程基础

    1. 并发 vs. 并行

并发(Concurrency)与并行(Parallelism)是两个常常混淆的概念。并发指的是在同一时间段内处理多个任务,而并行则是指在同一时刻同时执行多个任务。Go语言的并发模型更侧重于并发,通过goroutines和channels来管理任务之间的交互和通信。

    1. Goroutines

Goroutine是Go语言中实现并发的核心。它类似于轻量级的线程,由Go运行时管理。与操作系统线程不同,goroutines的启动和管理成本非常低,可以轻松创建成千上万个goroutines。

    1. Channels

Channels是Go语言中的一种管道,用于在多个goroutines之间传递数据。它们确保了数据传递的同步性和安全性,使得多个goroutines可以无缝地协作。


Goroutines详解

Goroutines的创建与运行

创建一个goroutine非常简单,只需使用go关键字即可。

代码语言:go复制
package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello, World!")
}

func main() {
    go sayHello()  // 创建一个新的goroutine
    time.Sleep(time.Second)
}

在上面的例子中,sayHello函数在一个新的goroutine中执行。

Goroutines的调度

Go运行时包含一个调度器,用于管理和调度goroutines的执行。调度器会根据系统资源和goroutines的状态动态调整其执行顺序,以确保高效的资源利用。

Goroutines的最佳实践

  • 避免长时间阻塞:Goroutines应尽量避免长时间的阻塞操作,这会影响其他goroutines的执行。
  • 使用sync.WaitGroup:在需要等待多个goroutines完成任务时,可以使用sync.WaitGroup来同步它们。
代码语言:go复制
package main

import (
    "fmt"
    "sync"
)

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d startingn", id)
    // 模拟工作
    fmt.Printf("Worker %d donen", id)
}

func main() {
    var wg sync.WaitGroup
    for i := 1; i <= 5; i   {
        wg.Add(1)
        go worker(i, &wg)
    }
    wg.Wait()
}

Channels详解

Channel的基本操作

Channels用于在多个goroutines之间传递数据。创建一个channel非常简单,只需使用make关键字。

代码语言:go复制
package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        ch <- 42  // 发送数据到channel
    }()
    value := <-ch  // 从channel接收数据
    fmt.Println(value)
}

Buffered Channels与Unbuffered Channels

Channels可以是有缓冲的(buffered)或无缓冲的(unbuffered)。无缓冲的channel在发送和接收操作完成前会阻塞,而有缓冲的channel则允许一定数量的数据存储在缓冲区中。

代码语言:go复制
package main

import "fmt"

func main() {
    ch := make(chan int, 2)  // 创建一个有缓冲的channel,容量为2
    ch <- 1
    ch <- 2
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}

Channel的高级用法

  • 关闭Channel:当不再需要向channel发送数据时,可以关闭它。
代码语言:go复制
  package main
  
  import "fmt"
  
  func main() {
      ch := make(chan int)
      go func() {
          for i := 0; i < 5; i   {
              ch <- i
          }
          close(ch)
      }()
      for value := range ch {
          fmt.Println(value)
      }
  }
  • Select语句:Select语句用于处理多个channel的操作,可以同时等待多个channel的发送和接收操作。
代码语言:go复制
  package main
  
  import (
      "fmt"
      "time"
  )
  
  func main() {
      ch1 := make(chan string)
      ch2 := make(chan string)
      go func() {
          time.Sleep(1 * time.Second)
          ch1 <- "one"
      }()
      go func() {
          time.Sleep(2 * time.Second)
          ch2 <- "two"
      }()
      for i := 0; i < 2; i   {
          select {
          case msg1 := <-ch1:
              fmt.Println("Received", msg1)
          case msg2 := <-ch2:
              fmt.Println("Received", msg2)
          }
      }
  }

高级并发模型

Select语句

Select语句用于同时处理多个channel的操作,可以在多个channel之间进行选择,并根据最先完成的操作执行相应的代码。它是处理多个并发操作的强大工具。

Context包的使用

context包提供了对请求上下文的管理,允许在goroutines之间传递截止时间、取消信号和其他请求范围的数据。它在处理超时和取消操作时非常有用。

代码语言:go复制
package main

import (
    "context"
    "fmt"
    "time"
)

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    select {
    case <-time.After(1 * time.Second):
        fmt.Println("operation successful")
    case <-ctx.Done():
        fmt.Println("operation timeout")
    }
}

Sync包与Mutex

在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync包中的互斥锁(Mutex)来保护共享资源。

代码语言:go复制
package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter  
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i   {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

Sync包与Mutex

在多goroutines访问共享资源时,为了避免竞态条件,可以使用sync包中的互斥锁(Mutex)来保护共享资源。

代码语言:go复制
go复制代码package main

import (
    "fmt"
    "sync"
)

var (
    counter int
    mu      sync.Mutex
)

func increment(wg *sync.WaitGroup) {
    defer wg.Done()
    mu.Lock()
    counter  
    mu.Unlock()
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i   {
        wg.Add(1)
        go increment(&wg)
    }
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

Pipelines

Pipelines是将一系列处理步骤通过channels连接起来,实现数据的流水线处理。每个步骤在一个goroutine中独立运行,并通过channels进行数据传递。

代码语言:go复制
go复制代码package main

import "fmt"

// 生成器:产生从0到num的整数
func generator(num int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i <= num; i   {
            out <- i
        }
        close(out)
    }()
    return out
}

// 平方计算器:接收整数并输出它们的平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for num := range in {
            out <- num * num
        }
        close(out)
    }()
    return out
}

func main() {
    gen := generator(10)
    sq := square(gen)
    for result := range sq {
        fmt.Println(result)
    }
}

Fan-out和Fan-in

Fan-out和Fan-in是一种常见的并发模式。Fan-out是将任务分发到多个goroutines中执行,Fan-in则是将多个goroutines的结果汇集到一个channel中。

代码语言:go复制
go复制代码package main

import (
    "fmt"
    "sync"
)

// 任务函数:模拟耗时操作
func task(id int, wg *sync.WaitGroup, results chan<- int) {
    defer wg.Done()
    results <- id * 2  // 返回任务结果
}

func main() {
    var wg sync.WaitGroup
    results := make(chan int, 10)  // 缓冲区大小为10
    for i := 1; i <= 10; i   {
        wg.Add(1)
        go task(i, &wg, results)  // Fan-out
    }
    go func() {
        wg.Wait()
        close(results)
    }()
    for result := range results {  // Fan-in
        fmt.Println(result)
    }
}

Worker Pools

Worker Pools模式用于限制并发goroutines的数量,防止资源耗尽。它通过一组固定数量的工作goroutines来处理任务队列中的任务。

代码语言:go复制
go复制代码package main

import (
    "fmt"
    "sync"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        results <- job * 2
        fmt.Printf("Worker %d processed job %dn", id, job)
    }
}

func main() {
    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    var wg sync.WaitGroup
    const numWorkers = 3
    for w := 1; w <= numWorkers; w   {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    for j := 1; j <= numJobs; j   {
        jobs <- j
    }
    close(jobs)
    
    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println(result)
    }
}

项目实例:并发爬虫

  • 项目介绍

实现一个简单的并发爬虫,抓取多个网页的内容,并统计每个网页中的单词数量。通过goroutines实现并发抓取,通过channels实现结果的传递。

  • 代码实现
代码语言:go复制
package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "strings"
    "sync"
)

func fetch(url string, ch chan<- map[string]int, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get(url)
    if err != nil {
        fmt.Println("Error fetching:", url, err)
        return
    }
    defer resp.Body.Close()
    body, _ := ioutil.ReadAll(resp.Body)
    words := strings.Fields(string(body))
    wordCount := make(map[string]int)
    for _, word := range words {
        wordCount[word]  
    }
    ch <- wordCount
}

func main() {
    urls := []string{
        "https://example.com",
        "https://golang.org",
        "https://golang.org/doc",
    }

    ch := make

(chan map[string]int)
    var wg sync.WaitGroup

    for _, url := range urls {
        wg.Add(1)
        go fetch(url, ch, &wg)
    }

    go func() {
        wg.Wait()
        close(ch)
    }()

    totalWordCount := make(map[string]int)
    for wordCount := range ch {
        for word, count := range wordCount {
            totalWordCount[word]  = count
        }
    }

    fmt.Println("Total Word Count:", totalWordCount)
}
  • 性能优化

通过增加goroutines的数量和使用更高效的算法,可以进一步提升爬虫的性能。例如,使用sync.Map替代传统的map来提高并发访问的效率。


我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

0 人点赞