使用并发来加快大文件处理速度。
如何在 Go 中处理大文件。以下是我们将遵循的步骤:
- 按顺序处理 CSV 数据文件
- 同时处理 CSV 数据文件
- 基准比较
文件
- 用于测试的样本大小文件(40 行)
- 用于测试的样本大小文件(4000 行)
- 完整文件(21,729,970 行)
处理功能
首先,来看看文件处理的核心功能。下面的函数很简单,构造起来有点耗时;它从文件行中提取名字和月份。
代码语言:go复制func processRow(text string) (firstName, fullName, month string) {
row := strings.Split(text, "|")
fullName = strings.Replace(strings.TrimSpace(row[7]), " ", "", -1)
name := strings.TrimSpace(row[7])
if name != "" {
startOfName := strings.Index(name, ", ") 2
if endOfName := strings.Index(name[startOfName:], " "); endOfName < 0 {
firstName = name[startOfName:]
} else {
firstName = name[startOfName : startOfName endOfName]
}
if strings.HasSuffix(firstName, ",") {
firstName = strings.Replace(firstName, ",", "", -1)
}
}
date := strings.TrimSpace(row[13])
if len(date) == 8 {
month = date[:2]
} else {
month = "--"
}
return firstName, fullName, month
}
按顺序处理 CSV 数据文件
首先,让我们按顺序处理此文件。正如 Go (Golang) 中的并发性 — 第 1 部分一文中所讨论的,顺序处理是一种逐行处理方法。预计这会很慢,因为必须从第一行到最后一行处理 n 行。
正如稍后将在基准测试中看到的那样,这大约需要 ~20 秒!让我们看看是否可以通过同时处理文件的某些部分来降低这个数字。
代码语言:go复制func sequential(file string) result {
res := result{donationMonthFreq: map[string]int{}}
f, err := os.Open(file)
if err != nil {
log.Fatal(err)
}
fullNamesRegister := make(map[string]bool)
firstNameMap := make(map[string]int)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
row := scanner.Text()
firstName, fullName, month := processRow(row)
fullNamesRegister[fullName] = true
firstNameMap[firstName]
if firstNameMap[firstName] > res.commonNameCount {
res.commonName = firstName
res.commonNameCount = firstNameMap[firstName]
}
res.donationMonthFreq[month]
res.numRows
res.peopleCount = len(fullNamesRegister)
}
return res
}
同时处理 CSV 数据文件
好了,让我们把其中一些 CPU 内核付诸实践。由于当今大多数计算机都具有多核处理器,因此我们知道我们可以映射-减少(拆分-应用-组合)这个过程。
我们将使用渠道来构建管道!该管道将允许我们将该过程拆分为多个阶段。
我们的管道使用以下组件:
- reader
- worker
- combiner
读取器将数据文件中的行拆分为批次,并将集合发送出去供处理器拾取。
代码语言:go复制reader := func(ctx context.Context, rowsBatch *[]string) <-chan []string {
out := make(chan []string)
scanner := bufio.NewScanner(f)
go func() {
defer close(out)
for {
scanned := scanner.Scan()
select {
case <-ctx.Done():
return
default:
row := scanner.Text()
if len(*rowsBatch) == batchSize || !scanned {
out <- *rowsBatch
*rowsBatch = []string{}
}
*rowsBatch = append(*rowsBatch, row)
}
if !scanned {
return
}
}
}()
return out
}
变量 batchSize 是可配置的,用于确定发送的行批处理的大小。
Workers
workers从读卡器中拿起一批,处理每一批,然后发送处理后的数据。
我们设计这个阶段是为了并行,因为我们的目标是多核架构。我们初始化的工作线程越多,我们可以并行处理的文件部分就越多。
代码语言:go复制worker := func(ctx context.Context, rowBatch <-chan []string) <-chan processed {
out := make(chan processed)
go func() {
defer close(out)
p := processed{}
for rowBatch := range rowBatch {
for _, row := range rowBatch {
firstName, fullName, month := processRow(row)
p.fullNames = append(p.fullNames, fullName)
p.firstNames = append(p.firstNames, firstName)
p.months = append(p.months, month)
p.numRows
}
}
out <- p
}()
return out
}
Combiner
组合器合并来自工作线程的传入处理数据。
代码语言:go复制combiner := func(ctx context.Context, inputs ...<-chan processed) <-chan processed {
out := make(chan processed)
var wg sync.WaitGroup
multiplexer := func(p <-chan processed) {
defer wg.Done()
for in := range p {
select {
case <-ctx.Done():
case out <- in:
}
}
}
wg.Add(len(inputs))
for _, in := range inputs {
go multiplexer(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
reader → processor(s) → combiner
看一下封闭并发处理函数的函数签名。
代码语言:go复制func concurrent_processing(file string, numWorkers, batchSize int) (res result) {...}
请注意两个参数和 batchSize 参数 numWorkers 。这些参数指定工作线程的数量以及每个线程一次应处理的行的大小。
是时候看看我们如何将三个 3 个阶段结合起来了!
代码语言:go复制ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rowsBatch := []string{}
rowsCh := reader(ctx, &rowsBatch)
workersCh := make([]<-chan processed, numWorkers)
for i := 0; i < numWorkers; i {
workersCh[i] = worker(ctx, rowsCh)
}
for processed := range combiner(ctx, workersCh...) {
}
基准比较
让我们对文件处理的顺序版本和并发版本进行基准测试和比较。
并发版本的性能将因工作线程数量和批大小而异。以下是我们的基准测试输出:
代码语言:go复制goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark/Sequential_000_workers_0000_batchSize-16 3 18624068210 ns/op 12646414149 B/op 65274211 allocs/op
Benchmark/Concurrent_001_workers_0001_batchSize-16 3 29715818665 ns/op 19055197437 B/op 87004417 allocs/op
Benchmark/Concurrent_001_workers_1000_batchSize-16 3 18332366416 ns/op 19803063789 B/op 65535242 allocs/op
Benchmark/Concurrent_010_workers_1000_batchSize-16 3 11115996750 ns/op 19833839914 B/op 65536413 allocs/op
Benchmark/Concurrent_010_workers_10000_batchSize-16 3 10157454288 ns/op 20097303218 B/op 65317016 allocs/op
Benchmark/Concurrent_010_workers_100000_batchSize-16 3 9267358169 ns/op 20681588597 B/op 65282218 allocs/op
我们的目标是减少处理时间;让我们回顾一下结果:
代码语言:shell复制| type | numProcessors | batchSize | time |
|------------|---------------|-----------|------|
| Sequential | n/a | n/a | ~19s |
| Concurrent | 1 | 1 | ~30s |
| Concurrent | 1 | 1,000 | ~18s |
| Concurrent | 10 | 1,000 | ~11s |
| Concurrent | 10 | 10,000 | ~10s |
| Concurrent | 10 | 100,000 | ~9s |
我们使用 10 个worker和 100,000 个批量将并发流程的处理时间缩短了约 200%。因此,我们有一个运行速度快 2 倍的程序!
该功能 processRow 现在非常简单;如果我们要增加该操作的复杂性,那么并发过程将更有价值。
我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!