go中间件:基于mc协议写kafka

2022-09-17 17:56:36 浏览数 (2)

概论

有没有一种场景,你需要写kafka,但是你不想使用kafka的包,你想要一种简单的方式写入kafka

整体的结构

整体的结构是这样的

  1. 客户端调用域名发起请求
  2. 通过nginx stream代理到指定服务器
  3. 代理服务器写入kafka

如何写入代理服务器呢?

  1. 需要监听tcp协议
  2. 写入kafka

使用golang去做io密集性的事情真是简单又方便啊

具体实现

  1. 使用cobra 处理命令行模式
代码语言:txt复制
package cmd

import (

 "ho/pkg/global"

 "ho/pkg/kafka"

 "ho/pkg/memcache"

 "log"

 "os"

 "os/signal"

 "syscall"

 "github.com/spf13/cobra"

)

var mcCmd = &cobra.Command{

 Use:   "mc",

 Short: "运行服务",

 Long:  "运行服务",

 Run:   func(cmd *cobra.Command, args []string) {},

}

var mcKafkaCmd = &cobra.Command{

 Use:   "kafka",

 Short: "配置文件",

 Long:  "配置文件地址",

 Run: func(cmd *cobra.Command, args []string) {

 log.Println("mc to kafka")

 log.Println("configFilePath:"   configFilePath   " configFileName:"   configFileName)

 //init config

 global.InitConfig(configFilePath, configFileName)

 //init kafka

 kafka.InitKafka()

 //init server

 mcServer := memcache.GetMemcacheServer()

 mcServer.RegisterFunc("set", memcache.McSendToKafka)

 mcServer.Start()

 //sign

 signChan := make(chan os.Signal, 1)

 signal.Notify(signChan, syscall.SIGINT, syscall.SIGTERM)

 //wait

 for {

 select {

 case <-signChan: //stop sign

 global.LOGGER.Info("end")

 mcServer.Stop()

 kafka.StopAll()

 return

            }

        }

    },

}

var configFilePath string

var configFileName string

func init() {

 mcCmd.AddCommand(mcKafkaCmd)

 mcKafkaCmd.Flags().StringVarP(&configFilePath, "path", "p", "", `配置文件路径`)

 mcKafkaCmd.Flags().StringVarP(&configFileName, "filename", "f", "", `配置文件名称`)

}
  1. 使用rpcxio/gomemcached 包解析tcp协议,解包和压缩包
代码语言:txt复制
func McSendToKafka(ctx context.Context, req *mc.Request, res *mc.Response) error {

 key := req.Key

 value := req.Data

 sendData := string(value)

 global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))

 kafka.SendTokafka(key, sendData)

 res.Response = mc.RespStored

 return nil

}
  1. 使用https://github.com/NetEase-Media/ngo 的kafka库写入kafka

不过这个库我做了改造

  1. 需要支持安全认证
  2. 他的kafka必须有消费者这个是不合理的。
代码语言:txt复制
func McSendToKafka(ctx context.Context, req *mc.Request, res *mc.Response) error {

 key := req.Key

 value := req.Data

 sendData := string(value)

 global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))

 kafka.SendTokafka(key, sendData)

 res.Response = mc.RespStored

 return nil

}





func SendTokafka(key, value string) {

 kafkaProduce := GetProducer(key)

 if kafkaProduce != nil {

 kafkaProduce.Send(key, value, func(meta *RecordMetadata, err error) {

 if err != nil {

 global.LOGGER.Sugar().Errorf("error:%v", err)

            } else {

 global.LOGGER.Sugar().Info(meta)

            }

        })

    } else {

 global.LOGGER.Sugar().Errorf("topic %s not exist", key)

    }

}

源码地址: https://github.com/beckbikang/ho

0 人点赞