本篇是继图解bufio包读取原理和写入原理之后的第三篇实战篇。本想着借用medium上一篇使用bufio的读取操作在25秒内处理完16G文件的具体应用来结束本系列文章的。但仔细阅读了代码后,发现对bufio.Reader的使用是错误的。究其原因猜测是其对bufio读取的内部实现机制并不了解造成的。所以作为一个反面示例来进行讲解。
下面我们来看看其具体的实现机制,是如何将16G的日志文件在25秒内读完的。
如上图所示,实现思路是这样的:利用bufio.Reader每次从文件中读取250KB大小的数据,然后将读取到的数据分配一个协程进行处理,每个协程将字节数组转换成字符串后,并按"n"分隔成多行,然后再按100行一组分配给一个协程进行处理。
具体代码如下:
代码语言:javascript复制var linesPool = sync.Pool{
New: func() interface{} {
lines := make([]byte, 250*1024)
return lines
},
}
var stringPool = sync.Pool{
New: func() interface{} {
lines := ""
return lines
},
}
func main() {
s := time.Now()
flag.Parse()
if *filename == "" {
panic("please input a filename")
}
file, err := os.Open(*filename)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
defer file.Close() //close after checking err
Process(file)
fmt.Println("nTime taken - ", time.Since(s))
}
func Process(f *os.File) error {
r := bufio.NewReader(f)
var wg sync.WaitGroup
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
// 这里做了简化,直接跳出循环
break
}
// 为了确保buf中都是整行整行的读取
nextUntillNewline, err := r.ReadBytes('n')
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
ProcessChunk(buf)
wg.Done()
}()
}
wg.Wait()
return nil
}
func ProcessChunk(chunk []byte) {
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk)
logsSlice := strings.Split(logs, "n")
stringPool.Put(logs)
// 每300行启用一个协程进行处理
chunkSize := 300
// 读取到的行数
lines := len(logsSlice)
// 总共需要的协程数
numberOfThreads := lines / chunkSize
//如果还有余数,则多加1个协程
if lines%chunkSize != 0 {
numberOfThreads
}
for i := 0; i < (numberOfThreads); i {
wg2.Add(1)
// startLine是从第几行开始处理
// endLine是结束的行
startLine := i * chunkSize
endLine := int(math.Min(float64((i 1)*chunkSize), float64(len(logsSlice))))
go func(startLine int, endLine int) {
defer wg2.Done() //to avaoid deadlocks
for i := startLine; i < endLine; i {
text := logsSlice[i]
if len(text) == 0 {
continue
}
//进行文本的处理分析
fmt.Println(text)
}
}(startLine, endLine)
}
wg2.Wait()
logsSlice = nil
}
在上面的代码中,我们看在第34行使用bufio.NewReader对缓冲区进行了初始化,该函数初始化时其缓冲区的大小是默认值,即4096字节,也就是4KB。在第39行使用Read函数进行了读取操作,期望读取的字节切片是从第3行的sync.Pool中获取的,大小是250*1024,即250KB。在读取原理篇我们讲到过当期望读取的字节大小大于缓冲区大小,并且缓冲区为空时,那么就会直接从文件中读取,而不经过缓冲区。如下图所示。所以,这里并没有减少系统调用的次数。
所以,大家在使用golang相关的包时,深入了解一下其内部的实现机制 还是很有必要的。
参考资料:
https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2
欢迎关注「Go学堂」,让知识活起来