golang 源码分析:ghz && mysql压测工具实现

2022-08-03 13:51:02 浏览数 (1)

grpc压测工具golang实现的版本ghz:

代码语言:javascript复制
https://github.com/bojand/ghz

通过对源码进行分析,实现了类似mysqlap的mysql压测工具:

代码语言:javascript复制
https://github.com/xiazemin/mysqlslap-go

当然,在源码分析的过程中也发现了ghz的一个bug:

代码语言:javascript复制
https://github.com/bojand/ghz/pull/323/files

ghz的使用:

代码语言:javascript复制
./ghz --skipTLS --insecure --protoset ./bundle.protoset 
-B ./grpc_payload --call tensorflow.serving.PredictionService/Predict  
127.0.0.1:8500

具体用法可以参考wiki:https://ghz.sh/docs/examples,也可以使用-D参数来加载json格式的参数list,请求的时候会通过round robin的形式依次取对应数据,下面我们开始看下源码:

入口是:cmd/ghz/main.go,首先是参数解析:

代码语言:javascript复制
isCallSet = false
call      = kingpin.Flag("call", `A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.`).
PlaceHolder(" ").IsSetByUser(&isCallSet).String()
// Concurrency
isCSet = false
c      = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50.").
Short('c').Default("50").IsSetByUser(&isCSet).Uint()
isCPUSet = false
cpus     = kingpin.Flag("cpus", "Number of cpu cores to use.").
Default(strconv.FormatUint(uint64(nCPUs), 10)).IsSetByUser(&isCPUSet).Uint()

解析完命令行参数,就开始执行命令:

代码语言:javascript复制
report, err := runner.Run(cfg.Call, cfg.Host, options...)

具体代码实现在runner/run.go

代码语言:javascript复制
func Run(call, host string, options ...Option) (*Report, error) {
    c, err := NewConfig(call, host, options...)
    oldCPUs := runtime.NumCPU()
    runtime.GOMAXPROCS(c.cpus)
    defer runtime.GOMAXPROCS(oldCPUs)
     
      reqr, err := NewRequester(c)
      mtd, err = protodesc.GetMethodDescFromProto(c.call, c.proto, c.importPaths)
      mtd, err = protodesc.GetMethodDescFromProtoSet(c.call, c.protoset)
      cc, err = reqr.newClientConn(false)
      refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))
      mtd, err = protodesc.GetMethodDescFromReflect(c.call, refClient)
       
     go func() {
      <-cancel
       reqr.Stop(ReasonCancel)
     }()
        
    if c.z > 0 {
      go func() {
      time.Sleep(c.z)
      reqr.Stop(ReasonTimeout)
         }()
      }
      rep, err := reqr.Run()  

通过配置初始化option,获取对应请求,如果没有指定proto文件或者proto文件目录,通过grpc reflection 获取指定方法的describe。

runner/options.go里定义压测的选项参数:

代码语言:javascript复制
func NewConfig(call, host string, options ...Option) (*RunConfig, error)
    type RunConfig struct {
  // call settings
  call              string
  host              string
  proto             string
  importPaths       []string
  protoset          string
  enableCompression bool


  // security settings
  creds      credentials.TransportCredentials
  cacert     string
  cert       string
  key        string
  cname      string
  skipVerify bool
  insecure   bool
  authority  string


  // load
  rps              int
  loadStart        uint
  loadEnd          uint
  loadStep         int
  loadSchedule     string
  loadDuration     time.Duration
  loadStepDuration time.Duration


  pacer load.Pacer


  // concurrency
  c             int
  cStart        uint
  cEnd          uint
  cStep         int
  cSchedule     string
  cMaxDuration  time.Duration
  cStepDuration time.Duration


  workerTicker load.WorkerTicker


  // test
  n     int
  async bool


  // number of connections
  nConns int


  // timeouts
  z             time.Duration
  timeout       time.Duration
  dialTimeout   time.Duration
  keepaliveTime time.Duration


  zstop string


  streamInterval        time.Duration
  streamCallDuration    time.Duration
  streamCallCount       uint
  streamDynamicMessages bool


  // lbStrategy
  lbStrategy string


  // TODO consolidate these actual value fields to be implemented via provider funcs
  // data & metadata
  data     []byte
  metadata []byte
  binary   bool


  dataFunc         BinaryDataFunc
  dataProviderFunc DataProviderFunc
  dataStreamFunc   StreamMessageProviderFunc
  mdProviderFunc   MetadataProviderFunc


  funcs template.FuncMap


  // reflection metadata
  rmd map[string]string


  // debug
  hasLog bool
  log    Logger


  // misc
  name        string
  cpus        int
  tags        []byte
  skipFirst   int
  countErrors bool
  recvMsgFunc StreamRecvMsgInterceptFunc
}

在runner/requester.go里处理参数的组装和发送请求相关的工作:

代码语言:javascript复制
func NewRequester(c *RunConfig) (*Requester, error) 
      md := mtd.GetInputType()
      payloadMessage := dynamic.NewMessage(md)
      reqr.dataProvider = c.dataProviderFunc
      defaultDataProvider, err := newDataProvider(reqr.mtd, c.binary, c.dataFunc, c.data, c.funcs)
      reqr.metadataProvider = c.mdProviderFunc
      defaultMDProvider, err := newMetadataProvider(reqr.mtd, c.metadata, c.funcs)

request的定义如下

代码语言:javascript复制
type Requester struct {

  conns    []*grpc.ClientConn
  stubs    []grpcdynamic.Stub
  handlers []*statsHandler


  mtd      *desc.MethodDescriptor
  reporter *Reporter


  config *RunConfig


  results chan *callResult
  stopCh  chan bool
  start   time.Time


  dataProvider     DataProviderFunc
  metadataProvider MetadataProviderFunc


  lock       sync.Mutex
  stopReason StopReason
  workers    []*Worker
}

Run函数是核心函数,发送请求,记录返回结果,并且最终会返回一个报告信息:

代码语言:javascript复制
func (b *Requester) Run() (*Report, error) 
      cc, err := b.openClientConns()
      start := time.Now()
      stub := grpcdynamic.NewStub(cc[n])
      b.reporter = newReporter(b.results, b.config)
        go func() {
    b.reporter.Run()
  }()
      wt := createWorkerTicker(b.config)
      p := createPacer(b.config)
      err = b.runWorkers(wt, p)
      report := b.Finish()
      b.closeClientConns()

打开grpc连接请求的链路如下:

代码语言:javascript复制
func (b *Requester) openClientConns() ([]*grpc.ClientConn, error) 

      c, err := b.newClientConn(true)  
代码语言:javascript复制
func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, error) 

      return grpc.DialContext(ctx, b.config.host, opts...)

创建定时器,定时将返回结果写入一个chan里面,供reporter消费:

代码语言:javascript复制
func createWorkerTicker(config *RunConfig) load.WorkerTicker 
func createPacer(config *RunConfig) load.Pacer 

runWorkers执行最后的具体工作,最终调用w.runWorker():

代码语言:javascript复制
func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error 
   wct := wt.Ticker()
  go func()
    wt.Run()
  }()
      go func() {
        for tv := range wct {
          b.workers = append(b.workers, &w)
                 go func() {
                     errC <- w.runWorker()
                  }()
                  for _, wrk := range b.workers {
                    wrk.Stop()
                    
      go func() {
        b.workers[i].Stop()
        for {
          wait, stop := p.Pace(time.Since(began), counter.Get())

Finish渲染压测报告:

代码语言:javascript复制
func (b *Requester) Finish() *Report 

      total := time.Since(b.start)
      <-b.reporter.done
      return b.reporter.Finalize(r, total)

中间需要做json到proto的转换,代码在protodesc/protodesc.go

代码语言:javascript复制
func GetMethodDescFromReflect(call string, client *grpcreflect.Client) (*desc.MethodDescriptor, error)
      call = strings.Replace(call, "/", ".", -1)
      file, err := client.FileContainingSymbol(call)
      return getMethodDesc(call, files)  
代码语言:javascript复制
func getMethodDesc(call string, files map[string]*desc.FileDescriptor) (*desc.MethodDescriptor, error)

      svc, mth, err := parseServiceMethod(call)
      dsc, err := findServiceSymbol(files, svc)
      mtd := sd.FindMethodByName(mth)  
代码语言:javascript复制
func GetMethodDescFromProto(call, proto string, imports []string) (*desc.MethodDescriptor, error)

      fds, err := p.ParseFiles(filename)
      return getMethodDesc(call, files)

最终的数据格式runner/data.go

代码语言:javascript复制
func newDataProvider(mtd *desc.MethodDescriptor,
  binary bool, dataFunc BinaryDataFunc, data []byte,
  funcs template.FuncMap) (*dataProvider, error) 
      ctd := newCallData(mtd, funcs, "", 0)
      ha, err := ctd.hasAction(string(dp.data))
代码语言:javascript复制
type dataProvider struct {

  binary   bool
  data     []byte
  mtd      *desc.MethodDescriptor
  dataFunc BinaryDataFunc


  arrayJSONData []string
  hasActions    bool


  // cached messages only for binary
  mutex          sync.RWMutex
  cachedMessages []*dynamic.Message
}

在runner/calldata.go里组装最终的请求上下文信息:

代码语言:javascript复制
type CallData struct {
  WorkerID           string // unique worker ID
  RequestNumber      int64  // unique incremented request number for each request
  FullyQualifiedName string // fully-qualified name of the method call
  MethodName         string // shorter call method name
  ServiceName        string // the service name
  InputName          string // name of the input message type
  OutputName         string // name of the output message type
  IsClientStreaming  bool   // whether this call is client streaming
  IsServerStreaming  bool   // whether this call is server streaming
  Timestamp          string // timestamp of the call in RFC3339 format
  TimestampUnix      int64  // timestamp of the call as unix time in seconds
  TimestampUnixMilli int64  // timestamp of the call as unix time in milliseconds
  TimestampUnixNano  int64  // timestamp of the call as unix time in nanoseconds
  UUID               string // generated UUIDv4 for each call


  t *template.Template
}  
代码语言:javascript复制
func newCallData(

  mtd *desc.MethodDescriptor,
  funcs template.FuncMap,
  workerID string, reqNum int64) *CallData 

定时器实现在load/worker_ticker.go

代码语言:javascript复制
type WorkerTicker interface {
  // Ticker returns a channel which sends TickValues
  // When a value is received the number of workers should be appropriately
  // increased or decreased given by the delta property.
  Ticker() <-chan TickValue


  // Run starts the worker ticker
  Run()


  // Finish closes the channel
  Finish()
}

load/pacer.go里定义了分发任务的接口:

代码语言:javascript复制
type Pacer interface {
  // Pace returns the duration the attacker should wait until
  // making next hit, given an already elapsed duration and
  // completed hits. If the second return value is true, an attacker
  // should stop sending hits.
  Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)


  // Rate returns a Pacer's instantaneous hit rate (per seconds)
  // at the given elapsed duration of an attack.
  Rate(elapsed time.Duration) float64
}

最终完成请求调用的是runner/worker.go

代码语言:javascript复制
type Worker struct {
  stub grpcdynamic.Stub
  mtd  *desc.MethodDescriptor


  config   *RunConfig
  workerID string
  active   bool
  stopCh   chan bool
  ticks    <-chan TickValue


  dataProvider     DataProviderFunc
  metadataProvider MetadataProviderFunc
  msgProvider      StreamMessageProviderFunc


  streamRecv StreamRecvMsgInterceptFunc
}

发送请求:

代码语言:javascript复制
func (w *Worker) runWorker() error

      g := new(errgroup.Group)
      for {
        return g.Wait()
                g.Go(func() error {
          return w.makeRequest(tv)
        })
        rErr := w.makeRequest(tv)

组装请求参数

代码语言:javascript复制
func (w *Worker) makeRequest(tv TickValue) error

      ctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)
      inputs, err := w.dataProvider(ctd)
      mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)
      mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)
      _ = w.makeBidiRequest(&ctx, ctd, msgProvider)
      _ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)
      _ = w.makeServerStreamingRequest(&ctx, inputs[0])
      _ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])

InvokeRpc请求的地方:

代码语言:javascript复制
func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error

      res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)

中间的group是对waitgroup的一个简单封装:

pkg/mod/golang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go

代码语言:javascript复制
func (g *Group) Go(f func() error) 
      g.wg.Add(1)
      defer g.wg.Done()

压测报告的实现代码在:runner/reporter.go

代码语言:javascript复制
func newReporter(results chan *callResult, c *RunConfig) *Reporter  
代码语言:javascript复制
type Reporter struct {

  config *RunConfig


  results chan *callResult
  done    chan bool


  totalLatenciesSec float64


  details []ResultDetail


  errorDist      map[string]int
  statusCodeDist map[string]int
  totalCount     uint64
}

最大允许存储的结果是写死的

代码语言:javascript复制
  const maxResult = 1000000
代码语言:javascript复制
func (r *Reporter) Run()

      for res := range r.results 
        r.totalCount  
        r.totalLatenciesSec  = res.duration.Seconds()
              r.details = append(r.details, ResultDetail{
        Latency:   res.duration,
        Timestamp: res.timestamp,
        Status:    res.status,
        Error:     errStr,
      })
代码语言:javascript复制
type ResultDetail struct {

  Timestamp time.Time     `json:"timestamp"`
  Latency   time.Duration `json:"latency"`
  Error     string        `json:"error"`
  Status    string        `json:"status"`
}

生成最终的报告

代码语言:javascript复制
func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report

      rep := &Report
      rep.Options = Options
      if len(r.details) > 0 
        average := r.totalLatenciesSec / float64(r.totalCount)
        rep.Average = time.Duration(average * float64(time.Second))
        rep.Rps = float64(r.totalCount) / total.Seconds()
        for _, d := range r.details {
          okLats = append(okLats, d.Latency.Seconds())
        sort.Float64s(okLats)
        fastestNum = okLats[0]
        slowestNum = okLats[len(okLats)-1]
        rep.Histogram = histogram(okLats, slowestNum, fastestNum)
        rep.LatencyDistribution = latencies(okLats)

报告的内容:

代码语言:javascript复制
type Report struct {

  Name      string     `json:"name,omitempty"`
  EndReason StopReason `json:"endReason,omitempty"`


  Options Options   `json:"options,omitempty"`
  Date    time.Time `json:"date"`


  Count   uint64        `json:"count"`
  Total   time.Duration `json:"total"`
  Average time.Duration `json:"average"`
  Fastest time.Duration `json:"fastest"`
  Slowest time.Duration `json:"slowest"`
  Rps     float64       `json:"rps"`


  ErrorDist      map[string]int `json:"errorDistribution"`
  StatusCodeDist map[string]int `json:"statusCodeDistribution"`


  LatencyDistribution []LatencyDistribution `json:"latencyDistribution"`
  Histogram           []Bucket              `json:"histogram"`
  Details             []ResultDetail        `json:"details"`


  Tags map[string]string `json:"tags,omitempty"`
}

计算直方图:

代码语言:javascript复制
func histogram(latencies []float64, slowest, fastest float64) []Bucket 
        bc := 10
  buckets := make([]float64, bc 1)
  counts := make([]int, bc 1)
  bs := (slowest - fastest) / float64(bc)
  for i := 0; i < bc; i   {
    buckets[i] = fastest   bs*float64(i)
  }
        buckets[bc] = slowest
  var bi int
  var max int
  for i := 0; i < len(latencies); {
    if latencies[i] <= buckets[bi] {
      i  
      counts[bi]  
      if max < counts[bi] {
        max = counts[bi]
      }
    } else if bi < len(buckets)-1 {
      bi  
    }
  }
            Mark:      buckets[i],
      Count:     counts[i],
      Frequency: float64(counts[i]) / float64(len(latencies)),

计算耗时分布:

代码语言:javascript复制
func latencies(latencies []float64) []LatencyDistribution 

      pctls := []int{10, 25, 50, 75, 90, 95, 99}
      data := make([]float64, len(pctls))
      lt := float64(len(latencies))
        for i, p := range pctls {
    ip := (float64(p) / 100.0) * lt
    di := int(ip)
            if ip == float64(di) {
      di = di - 1
    }


    if di < 0 {
      di = 0
    }
        data[i] = latencies[di]
      res := make([]LatencyDistribution, len(pctls))
      for i := 0; i < len(pctls); i   {
        if data[i] > 0 {
                lat := time.Duration(data[i] * float64(time.Second))
      res[i] = LatencyDistribution{Percentage: pctls[i], Latency: lat}

就是将排序好的延迟按照百分比切割

如果对mysql压测工具感兴趣可以体验下:

https://github.com/xiazemin/mysqlslap-go

useage:

代码语言:javascript复制
mysqlslap  -Hhost -uuser -ppassword -P3306 -ddatabase -q"select id from deviceattr where name='attr10' or name='attr20' group by id;" -ffilename -c 50 -i 10

输出报告:

代码语言:javascript复制
Summary:
 Name:         mysqlslap
 Count:        40
 Total:        1.01 s
 Slowest:      1.00 s
 Fastest:      1.01 s
 Average:      1.00 s
 Requests/sec: 39.76

Response time histogram(ms):
 1003.000 [1]  |∎∎
 1003.300 [0]  |
 1003.600 [0]  |
 1003.900 [0]  |
 1004.200 [24] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
 1004.500 [0]  |
 1004.800 [0]  |
 1005.100 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
 1005.400 [0]  |
 1005.700 [0]  |
 1006.000 [3]  |∎∎∎∎∎

Latency distribution:
 10 % in 1.00 s 
 25 % in 1.00 s 
 50 % in 1.00 s 
 75 % in 1.00 s 
 90 % in 1.00 s 
 95 % in 1.01 s 
 99 % in 1.01 s 

Status code distribution:
 [success]   40 responses   
 [failed]    0 responses 

0 人点赞