kratos源码分析系列(5)

2023-09-06 19:13:52 浏览数 (1)

10.middleware

接口定义在:middleware/middleware.go

代码语言:javascript复制
type Handler func(ctx context.Context, req interface{}) (interface{}, error)
func Chain(m ...Middleware) Middleware {
  return func(next Handler) Handler {
    for i := len(m) - 1; i >= 0; i-- {
      next = m[i](next)
    }
    return next
  }
}

基于jwt开放标准(RFC 7519)实现的auth,https://github.com/golang-jwt/jwt,包括客户端的生成和服务端的验证:

代码语言:javascript复制
func Server(keyFunc jwt.Keyfunc, opts ...Option) middleware.Middleware {
    return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
      tokenInfo, err = jwt.ParseWithClaims(jwtToken, o.claims(), keyFunc)
      tokenInfo, err = jwt.Parse(jwtToken, keyFunc)
代码语言:javascript复制
func Client(keyProvider jwt.Keyfunc, opts ...Option) middleware.Middleware {
        return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        token := jwt.NewWithClaims(o.signingMethod, o.claims())
          if clientContext, ok := transport.FromClientContext(ctx); ok {
        clientContext.RequestHeader().Set(authorizationKey, fmt.Sprintf(bearerFormat, tokenStr))

circuitbreaker断路器:https://github.com/go-kratos/aegis

代码语言:javascript复制
func Client(opts ...Option) middleware.Middleware {
    return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
      breaker := opt.group.Get(info.Operation()).(circuitbreaker.CircuitBreaker)

logging,服务端和客户端、日志的必选字段:

代码语言:javascript复制
func Server(logger log.Logger) middleware.Middleware {
  return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
    _ = log.WithContext(ctx, logger).Log(level,
        "kind", "server",
        "component", kind,
        "operation", operation,
        "args", extractArgs(req),
        "code", code,
        "reason", reason,
        "stack", stack,
        "latency", time.Since(startTime).Seconds(),
      )
func Client(logger log.Logger) middleware.Middleware {
  return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {

metadata的解析和传递

代码语言:javascript复制
func Server(opts ...Option) middleware.Middleware {
      return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
      tr, ok := transport.FromServerContext(ctx)
      if !ok {
        return handler(ctx, req)
      }
      ctx = metadata.NewServerContext(ctx, md)
      return handler(ctx, req)
func Client(opts ...Option) middleware.Middleware {
      return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
      tr, ok := transport.FromClientContext(ctx)
        if md, ok := metadata.FromClientContext(ctx); ok {
        for k, v := range md {
          header.Set(k, v)

metrics

代码语言:javascript复制
func Server(opts ...Option) middleware.Middleware {
      return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        op.requests.With(kind, operation, strconv.Itoa(code), reason).Inc()
        op.seconds.With(kind, operation).Observe(time.Since(startTime).Seconds())
代码语言:javascript复制
type options struct {
  // counter: <client/server>_requests_code_total{kind, operation, code, reason}
  requests metrics.Counter
  // histogram: <client/server>_requests_seconds_bucket{kind, operation}
  seconds metrics.Observer
}
代码语言:javascript复制
func Client(opts ...Option) middleware.Middleware {
        return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        op.requests.With(kind, operation, strconv.Itoa(code), reason).Inc()
        op.seconds.With(kind, operation).Observe(time.Since(startTime).Seconds())

ratelimit

代码语言:javascript复制
func Server(opts ...Option) middleware.Middleware {
        return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
        done, e := options.limiter.Allow()
        reply, err = handler(ctx, req)

revovery,并打印调用栈

代码语言:javascript复制
func Recovery(opts ...Option) middleware.Middleware {
      return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
        defer func() {
        if rerr := recover(); rerr != nil {
          buf := make([]byte, 64<<10) //nolint:gomnd
          n := runtime.Stack(buf, false)

selector

代码语言:javascript复制
func Server(ms ...middleware.Middleware) *Builder {
  return &Builder{ms: ms}
}
 func Client(ms ...middleware.Middleware) *Builder {
  return &Builder{client: true, ms: ms}
}
func (b *Builder) Build() middleware.Middleware {
      return selector(transporter, b.matches, b.ms...)
func selector(transporter transporter, match func(context.Context, transporter) bool, ms ...middleware.Middleware) middleware.Middleware {
      return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
        if !match(ctx, transporter) {
        return handler(ctx, req)
        return middleware.Chain(ms...)(handler)(ctx, req)

tracing:https://github.com/open-telemetry/opentelemetry-go

代码语言:javascript复制
func (b Metadata) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {    
func (b Metadata) Extract(parent context.Context, carrier propagation.TextMapCarrier) context.Context {
func setClientSpan(ctx context.Context, span trace.Span, m interface{}) {
          case transport.KindHTTP:
      if ht, ok := tr.(http.Transporter); ok {
        method := ht.Request().Method
        route := ht.PathTemplate()
        path := ht.Request().URL.Path
func setServerSpan(ctx context.Context, span trace.Span, m interface{}) {
          case transport.KindHTTP:
      if ht, ok := tr.(http.Transporter); ok {
func (c *ClientHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
  return ctx
}
func (t *Tracer) Start(ctx context.Context, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) {
func Server(opts ...Option) middleware.Middleware {
  tracer := NewTracer(trace.SpanKindServer, opts...)
  return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
      ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader())
        setServerSpan(ctx, span, req)
func Client(opts ...Option) middleware.Middleware {
  tracer := NewTracer(trace.SpanKindClient, opts...)
  return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
      ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader())
        setClientSpan(ctx, span, req)
func TraceID() log.Valuer {

validate

代码语言:javascript复制
func Validator() middleware.Middleware {
  return func(handler middleware.Handler) middleware.Handler {
    return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
      if v, ok := req.(validator); ok {
        if err := v.Validate(); err != nil {

11,registry

代码语言:javascript复制
type Registrar interface {
  // Register the registration.
  Register(ctx context.Context, service *ServiceInstance) error
  // Deregister the registration.
  Deregister(ctx context.Context, service *ServiceInstance) error
}
代码语言:javascript复制
type Discovery interface {
  // GetService return the service instances in memory according to the service name.
  GetService(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
  // Watch creates a watcher according to the service name.
  Watch(ctx context.Context, serviceName string) (Watcher, error)
}
代码语言:javascript复制
type ServiceInstance struct {
  // ID is the unique instance ID as registered.
  ID string `json:"id"`
  // Name is the service name as registered.
  Name string `json:"name"`
  // Version is the version of the compiled.
  Version string `json:"version"`
  // Metadata is the kv pair metadata associated with the service instance.
  Metadata map[string]string `json:"metadata"`
  // Endpoints are endpoint addresses of the service instance.
  // schema:
  //   http://127.0.0.1:8000?isSecure=false
  //   grpc://127.0.0.1:9000?isSecure=false
  Endpoints []string `json:"endpoints"`
}

0 人点赞