channel练习题

2024-09-17 22:56:48 浏览数 (1)

例1

编写一个程序,其中两个 goroutine 来回传递一个整数十次。当每个 goroutine 接收到整数时打印。每次通过整数都增加。一旦整数等于 10,立刻终止程序。

代码语言:javascript复制
func main() {

	share := make(chan int)

	// Create the WaitGroup and add a count
	// of two, one for each goroutine.
	var wg sync.WaitGroup
	wg.Add(2)

	// Launch two goroutines.
	go func() {
		goroutine("Bill", share)
		wg.Done()
	}()

	go func() {
		goroutine("Joan", share)
		wg.Done()
	}()

	// Start the sharing.
	share <- 1

	// Wait for the program to finish.
	wg.Wait()
}

// goroutine simulates sharing a value.
func goroutine(name string, share chan int) {
	for {

		// Wait to receive a value.
		value, ok := <-share
		if !ok {

			// If the channel was closed, return.
			fmt.Printf("Goroutine %s Downn", name)
			return
		}

		// Display the value.
		fmt.Printf("Goroutine %s Inc %dn", name, value)

		// Terminate when the value is 10.
		if value == 10 {
			close(share)
			fmt.Printf("Goroutine %s Downn", name)
			return
		}

		// Increment the value and send it
		// over the channel.
		share <- value   1
	}
}

例2

编写一个程序,使用扇出模式同时生成 100 个随机数。让每个 goroutine 生成一个随机数,并通过缓冲channel将该数字返回给主 goroutine。设置缓冲区channel的大小,以便永远不会发送阻塞。不要分配比您需要的更多的缓冲区。让主 goroutine 显示它收到的每个随机数,然后终止程序。

代码语言:javascript复制
package main

import (
 "fmt"
 "math/rand"
 "time"
)

const (
 goroutines = 100
)

func init() {
 rand.Seed(time.Now().UnixNano())
}

func main() {

 // Create the buffer channel with a buffer for
 // each goroutine to be created.
 values := make(chan int, goroutines)

 // Iterate and launch each goroutine.
 for gr := 0; gr < goroutines; gr   {

  // Create an anonymous function for each goroutine that
  // generates a random number and sends it on the channel.
  go func() {
   values <- rand.Intn(1000)
  }()
 }

 // Create a variable to be used to track received messages.
 // Set the value to the number of goroutines created.
 wait := goroutines

 // Iterate receiving each value until they are all received.
 // Store them in a slice of ints.
 var nums []int
 for wait > 0 {
  nums = append(nums, <-values)
  wait--
 }

 // Print the values in our slice.
 fmt.Println(nums)
}

例3

编写一个程序,最多同时生成 100 个随机数。不要发送所有的 100 个值,因为发送/接收的数量是未知的。

代码语言:javascript复制
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

const (
	goroutines = 100
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {

	// Create the channel for sharing results.
	values := make(chan int)

	// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
	var wg sync.WaitGroup
	wg.Add(goroutines)

	// Iterate and launch each goroutine.
	for gr := 0; gr < goroutines; gr   {

		// Create an anonymous function for each goroutine.
		go func() {

			// Ensure the waitgroup is decremented when this function returns.
			defer wg.Done()

			// Generate a random number up to 1000.
			n := rand.Intn(1000)

			// Return early if the number is divisible by 2. n%2 == 0
			if n%2 == 0 {
				return
			}

			// Send the odd values through the channel.
			values <- n
		}()
	}

	// Create a goroutine that waits for the other goroutines to finish then
	// closes the channel.
	go func() {
		wg.Wait()
		close(values)
	}()

	// Receive from the channel until it is closed.
	// Store values in a slice of ints.
	var nums []int
	for n := range values {
		nums = append(nums, n)
	}

	// Print the values in our slice.
	fmt.Printf("Result count: %dn", len(nums))
	fmt.Println(nums)
}

例4

编写一个程序,使用work pool同时生成最多 100 个随机数。拒绝偶数值。如果已收集到 100 个奇数,就让协程停止运行。

代码语言:javascript复制
package main

import (
	"fmt"
	"math/rand"
	"runtime"
	"sync"
)

func main() {

	// Create the channel for sharing results.
	values := make(chan int)

	// Create a channel "shutdown" to tell goroutines when to terminate.
	shutdown := make(chan struct{})

	// Define the size of the worker pool. Use runtime.GOMAXPROCS(0) to size the pool based on number of processors.
	poolSize := runtime.GOMAXPROCS(0)

	// Create a sync.WaitGroup to monitor the Goroutine pool. Add the count.
	var wg sync.WaitGroup
	wg.Add(poolSize)

	// Create a fixed size pool of goroutines to generate random numbers.
	for i := 0; i < poolSize; i   {
		go func(id int) {

			// Start an infinite loop.
			for {

				// Generate a random number up to 1000.
				n := rand.Intn(1000)

				// Use a select to either send the number or receive the shutdown signal.
				select {

				// In one case send the random number.
				case values <- n:
					fmt.Printf("Worker %d sent %dn", id, n)

				// In another case receive from the shutdown channel.
				case <-shutdown:
					fmt.Printf("Worker %d shutting downn", id)
					wg.Done()
					return
				}
			}
		}(i)
	}

	// Create a slice to hold the random numbers.
	var nums []int
	for i := range values {

		// continue the loop if the value was even.
		if i%2 == 0 {
			fmt.Println("Discarding", i)
			continue
		}

		// Store the odd number.
		fmt.Println("Keeping", i)
		nums = append(nums, i)

		// break the loop once we have 100 results.
		if len(nums) == 100 {
			break
		}
	}

	// Send the shutdown signal by closing the channel.
	fmt.Println("Receiver sending shutdown signal")
	close(shutdown)

	// Wait for the Goroutines to finish.
	wg.Wait()

	// Print the values in our slice.
	fmt.Printf("Result count: %dn", len(nums))
	fmt.Println(nums)
}

例5

使用2个goroutine往n大小的通道中模拟任务生产。select中的case哪个可以读取则打印出数据,每隔5秒我们来看一下生产的消息还有多少没有被打印过。

代码语言:javascript复制
func main() {
    var t1 = makeTask("adoJob", 1000)
    var t2 = makeTask("xs25Job", 500)
    var tick = time.Tick(time.Second * 5)
    for {
        select {
        case task:=<-t1:
            log.Println(task)
        case task:=<-t2:
            log.Println(task)
        case <-tick:
            log.Println(fmt.Sprintf("队列挤压数量t1:%v个,t2:%v个", len(t1), len(t2)))
        }
        time.Sleep(time.Second * 1)
    }
}

// 生产数据
func makeTask(queueName string, n int) chan string {
    ch := make(chan string, n)
    go func() {
        i := 1
        for {
            // 假设生产任务占用时间
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) 
            ch <- fmt.Sprintf("%s,生产数据 %d", queueName, i)
            i  
        }
    }()
    return ch
}

每隔5秒钟有可能执行不到这个case。原因是多个case都满足时随机执行其中一个。

代码语言:javascript复制
2023/03/22 17:22:34 adoJob,生产数据 21
2023/03/22 17:22:35 xs25Job,生产数据 22
2023/03/22 17:22:36 队列挤压数量t1:29个,t2:33个
2023/03/22 17:22:37 xs25Job,生产数据 23
2023/03/22 17:22:38 xs25Job,生产数据 24
2023/03/22 17:22:39 adoJob,生产数据 22
2023/03/22 17:22:40 xs25Job,生产数据 25
2023/03/22 17:22:41 adoJob,生产数据 23
2023/03/22 17:22:42 xs25Job,生产数据 26
2023/03/22 17:22:43 xs25Job,生产数据 27
2023/03/22 17:22:44 队列挤压数量t1:35个,t2:34个
2023/03/22 17:22:45 xs25Job,生产数据 28
2023/03/22 17:22:46 adoJob,生产数据 24
2023/03/22 17:22:47 队列挤压数量t1:37个,t2:37个

例6

使用goroutine与无缓冲通道做一个消费端,将代码再改进一下。

代码语言:javascript复制
func main() {
    var t1 = makeTask("adoJob", 1000)
    var t2 = makeTask("xs25Job", 1000)
    var allTask []string                  //因为我想只做一个消费端,将2个生产端生产出来的消费都扔到一起
    var tick = time.Tick(time.Second * 5) //每隔一段时间报告队列积压情况
    var workerCh = worker()

    for {
        var taskInfo string //具体任务
        var ch chan<- string
        if len(allTask) > 0 {
            taskInfo = allTask[0] //从所有任务中取出每一个
            ch = workerCh
        }
        select {
        case task := <-t1:
            allTask = append(allTask, task)
        case task := <-t2:
            allTask = append(allTask, task)
        case ch <- taskInfo: //任务详情写入到要消费工作中
            allTask = allTask[1:]
        case <-tick:
            log.Println("队列挤压数量", len(allTask))
        }
    }
}

//生产数据
func makeTask(queueName string, n int) chan string {
    ch := make(chan string, n)
    go func() {
        i := 1
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000))) //假设生产任务占用时间
            ch <- fmt.Sprintf("%s,生产数据 %d", queueName, i)
            i  
        }
    }()
    return ch
}

//消费数据
func worker() chan<- string {
    ch := make(chan string) //无缓冲通道
    go func(tasks chan string) {
        for t := range tasks {
            time.Sleep(time.Second * 1) //假设我们每次消费任务需要花费1秒钟
            log.Printf("消费任务: %s n", t)
        }
    }(ch)
    return ch
}

例7

解耦生产方和消费方

服务启动时,启动 n 个 worker,作为工作协程池,这些协程工作在一个 for {} 无限循环里,从某个 channel 消费工作任务并执行:

代码语言:javascript复制
func main() {
    taskCh := make(chan int, 100)
    go worker(taskCh)

    // 塞任务
    for i := 0; i < 10; i   {
        taskCh <- i
    }

    // 等待 1 小时 
    select {
    case <-time.After(time.Hour):
    }
}

func worker(taskCh <-chan int) {
    const N = 5
    // 启动 5 个工作协程
    for i := 0; i < N; i   {
        go func(id int) {
            for {
                task := <- taskCh
                fmt.Printf("finish task: %d by worker %dn", task, id)
                time.Sleep(time.Second)
            }
        }(i)
    }
}

5 个工作协程在不断地从工作队列里取任务,生产方只管往 channel 发送任务即可,解耦生产方和消费方。

例8

控制并发数

代码语言:javascript复制
var limit = make(chan int, 3)

func main() {
    // …………
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    // …………
}

构建一个缓冲型的 channel,容量为 3。接着遍历任务列表,每个任务启动一个 goroutine 去完成。真正执行任务,访问第三方的动作在 w() 中完成,在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,才能执行 w(),并且在执行完任务,要将“许可证”归还。这样就可以控制同时运行的 goroutine 数。

一点要注意的是,如果 w() 发生 panic,那“许可证”可能就还不回去了,因此需要使用 defer 来保证。

Go Channel 应用模式

0 人点赞