使用 Go 处理大文件

2023-11-30 16:31:03 浏览数 (1)

使用并发来加快大文件处理速度。

如何在 Go 中处理大文件。以下是我们将遵循的步骤:

  • 按顺序处理 CSV 数据文件
  • 同时处理 CSV 数据文件
  • 基准比较

文件

  • 用于测试的样本大小文件(40 行)
  • 用于测试的样本大小文件(4000 行)
  • 完整文件(21,729,970 行)

处理功能

首先,来看看文件处理的核心功能。下面的函数很简单,构造起来有点耗时;它从文件行中提取名字和月份。

代码语言:go复制
func processRow(text string) (firstName, fullName, month string) {
	row := strings.Split(text, "|")

	fullName = strings.Replace(strings.TrimSpace(row[7]), " ", "", -1)

	name := strings.TrimSpace(row[7])
	if name != "" {
		startOfName := strings.Index(name, ", ")   2
		if endOfName := strings.Index(name[startOfName:], " "); endOfName < 0 {
			firstName = name[startOfName:]
		} else {
			firstName = name[startOfName : startOfName endOfName]
		}
		if strings.HasSuffix(firstName, ",") {
			firstName = strings.Replace(firstName, ",", "", -1)
		}
	}

	date := strings.TrimSpace(row[13])
	if len(date) == 8 {
		month = date[:2]
	} else {
		month = "--"
	}

	return firstName, fullName, month
}

按顺序处理 CSV 数据文件

首先,让我们按顺序处理此文件。正如 Go (Golang) 中的并发性 — 第 1 部分一文中所讨论的,顺序处理是一种逐行处理方法。预计这会很慢,因为必须从第一行到最后一行处理 n 行。

正如稍后将在基准测试中看到的那样,这大约需要 ~20 秒!让我们看看是否可以通过同时处理文件的某些部分来降低这个数字。

代码语言:go复制
func sequential(file string) result {
	res := result{donationMonthFreq: map[string]int{}}

	f, err := os.Open(file)
	if err != nil {
		log.Fatal(err)
	}

	fullNamesRegister := make(map[string]bool)

	firstNameMap := make(map[string]int)

	scanner := bufio.NewScanner(f)
	for scanner.Scan() {
		row := scanner.Text()
		firstName, fullName, month := processRow(row)

		fullNamesRegister[fullName] = true

		firstNameMap[firstName]  
		if firstNameMap[firstName] > res.commonNameCount {
			res.commonName = firstName
			res.commonNameCount = firstNameMap[firstName]
		}
		res.donationMonthFreq[month]  
		res.numRows  
		res.peopleCount = len(fullNamesRegister)
	}

	return res
}

同时处理 CSV 数据文件

好了,让我们把其中一些 CPU 内核付诸实践。由于当今大多数计算机都具有多核处理器,因此我们知道我们可以映射-减少(拆分-应用-组合)这个过程。

数据文件 →Reader →处理器 →组合器→结果数据文件 →Reader →处理器 →组合器→结果

我们将使用渠道来构建管道!该管道将允许我们将该过程拆分为多个阶段。

我们的管道使用以下组件:

  • reader
  • worker
  • combiner

读取器将数据文件中的行拆分为批次,并将集合发送出去供处理器拾取。

代码语言:go复制
reader := func(ctx context.Context, rowsBatch *[]string) <-chan []string {
  out := make(chan []string)

  scanner := bufio.NewScanner(f)

  go func() {
    defer close(out) 

    for {
      scanned := scanner.Scan()

      select {
      case <-ctx.Done():
        return
      default:
        row := scanner.Text()
        if len(*rowsBatch) == batchSize || !scanned {
          out <- *rowsBatch
          *rowsBatch = []string{} 
        }
        *rowsBatch = append(*rowsBatch, row) 
      }

      if !scanned {
        return
      }
    }
  }()

  return out
}

变量 batchSize 是可配置的,用于确定发送的行批处理的大小。

Workers

workers从读卡器中拿起一批,处理每一批,然后发送处理后的数据。

我们设计这个阶段是为了并行,因为我们的目标是多核架构。我们初始化的工作线程越多,我们可以并行处理的文件部分就越多。

代码语言:go复制
worker := func(ctx context.Context, rowBatch <-chan []string) <-chan processed {
  out := make(chan processed)

  go func() {
    defer close(out)

    p := processed{}
    for rowBatch := range rowBatch {
      for _, row := range rowBatch {
        firstName, fullName, month := processRow(row)
        p.fullNames = append(p.fullNames, fullName)
        p.firstNames = append(p.firstNames, firstName)
        p.months = append(p.months, month)
        p.numRows  
      }
    }
    out <- p
  }()

  return out
}

Combiner

组合器合并来自工作线程的传入处理数据。

代码语言:go复制
combiner := func(ctx context.Context, inputs ...<-chan processed) <-chan processed {
  out := make(chan processed)

  var wg sync.WaitGroup
  multiplexer := func(p <-chan processed) {
    defer wg.Done()

    for in := range p {
      select {
      case <-ctx.Done():
      case out <- in:
      }
    }
  }

  wg.Add(len(inputs))
  for _, in := range inputs {
    go multiplexer(in)
  }

  go func() {
    wg.Wait()
    close(out)
  }()

  return out
}

reader → processor(s) → combiner

看一下封闭并发处理函数的函数签名。

代码语言:go复制
func concurrent_processing(file string, numWorkers, batchSize int) (res result) {...}

请注意两个参数和 batchSize 参数 numWorkers 。这些参数指定工作线程的数量以及每个线程一次应处理的行的大小。

是时候看看我们如何将三个 3 个阶段结合起来了!

代码语言:go复制
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rowsBatch := []string{}
rowsCh := reader(ctx, &rowsBatch)

workersCh := make([]<-chan processed, numWorkers)
for i := 0; i < numWorkers; i   {
  workersCh[i] = worker(ctx, rowsCh)
}

for processed := range combiner(ctx, workersCh...) {
}

基准比较

让我们对文件处理的顺序版本和并发版本进行基准测试和比较。

并发版本的性能将因工作线程数量和批大小而异。以下是我们的基准测试输出:

代码语言:go复制
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark/Sequential_000_workers_0000_batchSize-16   3   18624068210 ns/op       12646414149 B/op        65274211 allocs/op
Benchmark/Concurrent_001_workers_0001_batchSize-16   3   29715818665 ns/op       19055197437 B/op        87004417 allocs/op
Benchmark/Concurrent_001_workers_1000_batchSize-16   3   18332366416 ns/op       19803063789 B/op        65535242 allocs/op
Benchmark/Concurrent_010_workers_1000_batchSize-16   3   11115996750 ns/op       19833839914 B/op        65536413 allocs/op
Benchmark/Concurrent_010_workers_10000_batchSize-16  3   10157454288 ns/op       20097303218 B/op        65317016 allocs/op
Benchmark/Concurrent_010_workers_100000_batchSize-16 3   9267358169 ns/op        20681588597 B/op        65282218 allocs/op

我们的目标是减少处理时间;让我们回顾一下结果:

代码语言:shell复制
| type       | numProcessors | batchSize | time |
|------------|---------------|-----------|------|
| Sequential | n/a           | n/a       | ~19s |
| Concurrent | 1             | 1         | ~30s |
| Concurrent | 1             | 1,000     | ~18s |
| Concurrent | 10            | 1,000     | ~11s |
| Concurrent | 10            | 10,000    | ~10s |
| Concurrent | 10            | 100,000   | ~9s  |

我们使用 10 个worker和 100,000 个批量将并发流程的处理时间缩短了约 200%。因此,我们有一个运行速度快 2 倍的程序!

该功能 processRow 现在非常简单;如果我们要增加该操作的复杂性,那么并发过程将更有价值。

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

0 人点赞