bufio包系列之一个误用bufio读取的示例

2023-01-31 16:11:00 浏览数 (3)

本篇是继图解bufio包读取原理和写入原理之后的第三篇实战篇。本想着借用medium上一篇使用bufio的读取操作在25秒内处理完16G文件的具体应用来结束本系列文章的。但仔细阅读了代码后,发现对bufio.Reader的使用是错误的。究其原因猜测是其对bufio读取的内部实现机制并不了解造成的。所以作为一个反面示例来进行讲解。

下面我们来看看其具体的实现机制,是如何将16G的日志文件在25秒内读完的。

如上图所示,实现思路是这样的:利用bufio.Reader每次从文件中读取250KB大小的数据,然后将读取到的数据分配一个协程进行处理,每个协程将字节数组转换成字符串后,并按"n"分隔成多行,然后再按100行一组分配给一个协程进行处理。

具体代码如下:

代码语言:javascript复制
var linesPool = sync.Pool{
  New: func() interface{} {
    lines := make([]byte, 250*1024)
    return lines
  },
}

var stringPool = sync.Pool{
  New: func() interface{} {
    lines := ""
    return lines
  },
}

func main() {
  s := time.Now()
  flag.Parse()
  if *filename == "" {
    panic("please input a filename")
  }
  file, err := os.Open(*filename)

  if err != nil {
    fmt.Println("cannot able to read the file", err)
    return
  }
  defer file.Close() //close after checking err

  Process(file)
  fmt.Println("nTime taken - ", time.Since(s))
}

func Process(f *os.File) error {
  r := bufio.NewReader(f)

  var wg sync.WaitGroup
  for {
    buf := linesPool.Get().([]byte)
    n, err := r.Read(buf)
    buf = buf[:n]

    if n == 0 {
      // 这里做了简化,直接跳出循环
      break
    }
    // 为了确保buf中都是整行整行的读取
    nextUntillNewline, err := r.ReadBytes('n')

    if err != io.EOF {
      buf = append(buf, nextUntillNewline...)
    }

    wg.Add(1)
    go func() {
      ProcessChunk(buf)
      wg.Done()
    }()
  }

  wg.Wait()
  return nil
}

func ProcessChunk(chunk []byte) {
  var wg2 sync.WaitGroup

  logs := stringPool.Get().(string)
  logs = string(chunk)
  linesPool.Put(chunk)

  logsSlice := strings.Split(logs, "n")

  stringPool.Put(logs)

  // 每300行启用一个协程进行处理
  chunkSize := 300
  // 读取到的行数
  lines := len(logsSlice)
  // 总共需要的协程数
  numberOfThreads := lines / chunkSize

  //如果还有余数,则多加1个协程
  if lines%chunkSize != 0 {
    numberOfThreads  
  }

  for i := 0; i < (numberOfThreads); i   {
    wg2.Add(1)
    // startLine是从第几行开始处理
    // endLine是结束的行
    startLine := i * chunkSize
    endLine := int(math.Min(float64((i 1)*chunkSize), float64(len(logsSlice))))
    go func(startLine int, endLine int) {
      defer wg2.Done() //to avaoid deadlocks
      for i := startLine; i < endLine; i   {
        text := logsSlice[i]
        if len(text) == 0 {
          continue
        }
        //进行文本的处理分析
        fmt.Println(text)
      }
    }(startLine, endLine)
  }

  wg2.Wait()
  logsSlice = nil
}

在上面的代码中,我们看在第34行使用bufio.NewReader对缓冲区进行了初始化,该函数初始化时其缓冲区的大小是默认值,即4096字节,也就是4KB。在第39行使用Read函数进行了读取操作,期望读取的字节切片是从第3行的sync.Pool中获取的,大小是250*1024,即250KB。在读取原理篇我们讲到过当期望读取的字节大小大于缓冲区大小,并且缓冲区为空时,那么就会直接从文件中读取,而不经过缓冲区。如下图所示。所以,这里并没有减少系统调用的次数。

所以,大家在使用golang相关的包时,深入了解一下其内部的实现机制 还是很有必要的。

参考资料:

https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2


欢迎关注「Go学堂」,让知识活起来

0 人点赞