goreplay(https://github.com/buger/goreplay)是基于libpcap的流量录制工具,它依赖包https://github.com/google/gopacket,而gopacket是对libpcap和npcap的go封装。可见其流量复制原理和tcpdump一样。我们通过一个简单的go服务器学习下如何使用。
代码语言:javascript复制git clone https://github.com/buger/goreplay
cd goreplay
go build -o goreplay .
代码语言:javascript复制package main
import (
"flag"
"fmt"
"net/http"
)
func main() {
var port int
flag.IntVar(&port, "port", 8080, "Input your port")
flag.Parse()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.Host)
fmt.Fprintf(w, "request:%s", r.RequestURI)
})
http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}
启动俩服务器
代码语言:javascript复制go run learn/goreplay/http/server.go -port 8081
go run learn/goreplay/http/server.go -port 8082
代码语言:javascript复制sudo ./goreplay/goreplay --input-raw :8082 --output-http "http://127.0.0.1:8081"
我们请求8082服务器,发现流量被复制到了8081
代码语言:javascript复制% ab -n 100 -c 10 http://127.0.0.1:8082/
goreplay 同时也支持将流量录制到文件,或者es
代码语言:javascript复制sudo ./goreplay/goreplay --input-raw :8082 --output-file %Y%m%d.log --output-file-append
首先我们启动es
代码语言:javascript复制docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --add-host=host.docker.internal:host-gateway elasticsearch:7.17.6
测试下es的链接和写入
代码语言:javascript复制package main
import (
"context"
"log"
"os"
"time"
"github.com/buger/goreplay/proto"
"github.com/olivere/elastic"
)
type ESRequestResponse struct {
ReqHost string `json:"Req_Host"`
ReqMethod string `json:"Req_Method"`
ReqURL string `json:"Req_URL"`
ReqBody string `json:"Req_Body"`
ReqUserAgent string `json:"Req_User-Agent"`
ReqXRealIP string `json:"Req_X-Real-IP"`
ReqXForwardedFor string `json:"Req_X-Forwarded-For"`
ReqConnection string `json:"Req_Connection,omitempty"`
ReqCookies string `json:"Req_Cookies,omitempty"`
RespStatusCode string `json:"Resp_Status-Code"`
RespBody string `json:"Resp_Body"`
RespProto string `json:"Resp_Proto,omitempty"`
RespContentLength string `json:"Resp_Content-Length,omitempty"`
RespContentType string `json:"Resp_Content-Type,omitempty"`
RespSetCookie string `json:"Resp_Set-Cookie,omitempty"`
Rtt int64 `json:"RTT"`
Timestamp time.Time
}
func main() {
client, err := elastic.NewSimpleClient(
elastic.SetURL("http://127.0.0.1:9200"),
// 设置错误日志
elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)),
// 设置info日志
elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)),
)
if err != nil {
log.Println(err)
}
exists, err := client.IndexExists("index1").Do(context.Background())
if err != nil {
log.Println(err)
}
if !exists {
_, err := client.CreateIndex("index1").Do(context.Background())
if err != nil {
log.Println(err)
}
}
log.Println("Initialized Elasticsearch Plugin")
req := make([]byte, 10240)
resp := make([]byte, 10240)
t := time.Now()
host := ESRequestResponse{
ReqHost: string(proto.Header(req, []byte("Host"))),
ReqMethod: string(proto.Method(req)),
ReqURL: string(proto.Path(req)),
ReqBody: string(proto.Body(req)),
ReqUserAgent: string(proto.Header(req, []byte("User-Agent"))),
ReqXRealIP: string(proto.Header(req, []byte("X-Real-IP"))),
ReqXForwardedFor: string(proto.Header(req, []byte("X-Forwarded-For"))),
ReqConnection: string(proto.Header(req, []byte("Connection"))),
ReqCookies: string(proto.Header(req, []byte("Cookie"))),
RespStatusCode: string(proto.Status(resp)),
RespProto: string(proto.Method(resp)),
RespBody: string(proto.Body(resp)),
RespContentLength: string(proto.Header(resp, []byte("Content-Length"))),
RespContentType: string(proto.Header(resp, []byte("Content-Type"))),
RespSetCookie: string(proto.Header(resp, []byte("Set-Cookie"))),
Timestamp: t,
Rtt: 0,
}
h, err := client.Index().Index("index1").Type("ESRequestResponse").BodyJson(host).Do(context.Background())
if err != nil {
log.Println(err)
}
log.Printf("Indexed data with ID %s to index %s, type %sn", h.Id, h.Index, h.Type)
return
}
代码语言:javascript复制go run learn/goreplay/es/main.go
代码语言:javascript复制 curl -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/_cat/indices
green open .geoip_databases 3pp6kUf0Td6wJ0AvFrm-tQ 1 0 41 0 39.2mb 39.2mb
yellow open index1 mVl0z97WQvqvATHmT_vXnQ 1 1 0 0 226b 226b
发现我们能够创建成功。goreplay录制流量入es非常简单
代码语言:javascript复制sudo ./goreplay/goreplay --input-raw-track-response --output-http-track-response --input-raw :8082 --output-http "http://127.0.0.1:8081/" --output-http-elasticsearch 'http://127.0.0.1:9200/gor'
实际测试的时候发现不能成功,原因是使用的写入es的插件,客户端版本太低了,针对es7,笔者实现了一个版本,可以用来替换下goreplay/elasticsearch.go
代码语言:javascript复制package main
import (
"context"
"log"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/buger/goreplay/proto"
elastigo "github.com/mattbaird/elastigo/lib"
"github.com/olivere/elastic"
)
type ESUriErorr struct{}
func (e *ESUriErorr) Error() string {
return "Wrong ElasticSearch URL format. Expected to be: scheme://host/index_name"
}
type ESPlugin struct {
Url string
Active bool
ApiPort string
eConn *elastigo.Conn
Host string
Index string
indexor *elastigo.BulkIndexer
done chan bool
client *elastic.Client
}
func (p *ESPlugin) RttDurationToMs(d time.Duration) int64 {
sec := d / time.Second
nsec := d % time.Second
fl := float64(sec) float64(nsec)*1e-6
return int64(fl)
}
type ESRequestResponse struct {
ReqHost string `json:"Req_Host"`
ReqMethod string `json:"Req_Method"`
ReqURL string `json:"Req_URL"`
ReqBody string `json:"Req_Body"`
ReqUserAgent string `json:"Req_User-Agent"`
ReqXRealIP string `json:"Req_X-Real-IP"`
ReqXForwardedFor string `json:"Req_X-Forwarded-For"`
ReqConnection string `json:"Req_Connection,omitempty"`
ReqCookies string `json:"Req_Cookies,omitempty"`
RespStatusCode string `json:"Resp_Status-Code"`
RespBody string `json:"Resp_Body"`
RespProto string `json:"Resp_Proto,omitempty"`
RespContentLength string `json:"Resp_Content-Length,omitempty"`
RespContentType string `json:"Resp_Content-Type,omitempty"`
RespSetCookie string `json:"Resp_Set-Cookie,omitempty"`
Rtt int64 `json:"RTT"`
Timestamp time.Time
}
// Parse ElasticSearch URI
//
// Proper format is: scheme://[userinfo@]host/index_name
// userinfo is: user[:password]
// net/url.Parse() does not fail if scheme is not provided but actually does not
// handle URI properly.
// So we must 'validate' URI format to match requirements to use net/url.Parse()
func parseURI(URI string) (err error, host, index string) {
parsedUrl, parseErr := url.Parse(URI)
if parseErr != nil {
err = new(ESUriErorr)
return
}
// check URL validity by extracting host and index values.
host = parsedUrl.Host
urlPathParts := strings.Split(parsedUrl.Path, "/")
index = urlPathParts[len(urlPathParts)-1]
// force index specification in uri : ie no implicit index
if host == "" || index == "" {
err = new(ESUriErorr)
}
return
}
var initOnce sync.Once
func (p *ESPlugin) Init(URI string) {
p.Url = URI
var err error
err, p.Host, p.Index = parseURI(URI)
log.Println("Initializing Elasticsearch Plugin", p.Index, p.Host)
t := time.Now()
if p.Index == "" {
p.Index = "gor-" t.Format("2006-01-02")
}
if err != nil {
log.Fatal("Can't initialize ElasticSearch plugin.", err)
}
initOnce.Do(func() {
p.client, err = elastic.NewSimpleClient(
elastic.SetURL("http://" p.Host),
// 设置错误日志
elastic.SetErrorLog(log.New(os.Stderr, "ES-ERROR ", log.LstdFlags)),
elastic.SetBasicAuth("elastic", "elastic"), // 账号密码
// 设置info日志
elastic.SetInfoLog(log.New(os.Stdout, "ES-INFO ", log.LstdFlags)),
)
if err != nil {
log.Println(err)
}
})
exists, err := p.client.IndexExists(p.Index).Do(context.Background())
if err != nil {
log.Println(err)
}
if !exists {
_, err := p.client.CreateIndex(p.Index).Do(context.Background())
if err != nil {
log.Println(err)
}
}
log.Println("Initialized Elasticsearch Plugin")
return
}
func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {
if len(resp) == 0 && len(req) == 0 {
// nil http response - skipped elasticsearch export for this request
log.Println("ResponseAnalyze ", resp, req)
return
}
t := time.Now()
rtt := p.RttDurationToMs(stop.Sub(start))
req = payloadBody(req)
host := ESRequestResponse{
ReqHost: string(proto.Header(req, []byte("Host"))),
ReqMethod: string(proto.Method(req)),
ReqURL: string(proto.Path(req)),
ReqBody: string(proto.Body(req)),
ReqUserAgent: string(proto.Header(req, []byte("User-Agent"))),
ReqXRealIP: string(proto.Header(req, []byte("X-Real-IP"))),
ReqXForwardedFor: string(proto.Header(req, []byte("X-Forwarded-For"))),
ReqConnection: string(proto.Header(req, []byte("Connection"))),
ReqCookies: string(proto.Header(req, []byte("Cookie"))),
RespStatusCode: string(proto.Status(resp)),
RespProto: string(proto.Method(resp)),
RespBody: string(proto.Body(resp)),
RespContentLength: string(proto.Header(resp, []byte("Content-Length"))),
RespContentType: string(proto.Header(resp, []byte("Content-Type"))),
RespSetCookie: string(proto.Header(resp, []byte("Set-Cookie"))),
Timestamp: t,
Rtt: rtt,
}
h, err := p.client.Index().Index(p.Index).BodyJson(host).Type("_doc").Do(context.Background()) //Type("ESRequestResponse").
if err != nil {
log.Println(err)
return
}
log.Printf("Indexed data with ID %s to index %s, type %sn", h.Id, h.Index, h.Type)
return
}
替换后重新编译下,开始测试:
代码语言:javascript复制 % ab -n 100 -c 10 http://127.0.0.1:8082/data/$RANDOM
查询下流量
代码语言:javascript复制 % curl -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/gor/_doc/fzsUr4QB4C7FYlos41fc
{"_index":"gor","_type":"_doc","_id":"fzsUr4QB4C7FYlos41fc","_version":1,"_seq_no":2198,"_primary_term":1,"found":true,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"","Req_User-Agent":"ApacheBench/2.3","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":18,"Timestamp":"2022-11-25T21:58:10.711494 08:00"}}
代码语言:javascript复制% curl -H "Content-Type:application/json" -XGET http://elastic:elastic@127.0.0.1:9200/gor/_search -d '{"query":{"match":{"Req_Body":"1213"}}}'
{"took":13,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":4,"relation":"eq"},"max_score":0.4084168,"hits":[{"_index":"gor","_type":"ESRequestResponse","_id":"gjsXr4QB4C7FYlosPVfB","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:17225,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":3,"Timestamp":"2022-11-25T22:00:46.769234 08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"gzsXr4QB4C7FYlosXVcM","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:10453,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":1,"Timestamp":"2022-11-25T22:00:54.769936 08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"hDsXr4QB4C7FYlosZFfd","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:16202,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":0,"Timestamp":"2022-11-25T22:00:56.769345 08:00"}},{"_index":"gor","_type":"ESRequestResponse","_id":"hTsZr4QB4C7FYlosQVe0","_score":0.4084168,"_source":{"Req_Host":"127.0.0.1:8082","Req_Method":"Host:","Req_URL":"","Req_Body":"{data:23396,qe:1213}","Req_User-Agent":"curl/7.79.1","Req_X-Real-IP":"","Req_X-Forwarded-For":"","Resp_Status-Code":"200","Resp_Body":"request:/","Resp_Proto":"HTTP/1.1","Resp_Content-Length":"9","Resp_Content-Type":"text/plain; charset=utf-8","RTT":2,"Timestamp":"2022-11-25T22:02:58.83474 08:00"}}]}}
我们可以通过es快速查询我们需要的流量,用来回放或者测试。