概论
有没有一种场景,你需要写kafka,但是你不想使用kafka的包,你想要一种简单的方式写入kafka
整体的结构
整体的结构是这样的
- 客户端调用域名发起请求
- 通过nginx stream代理到指定服务器
- 代理服务器写入kafka
如何写入代理服务器呢?
- 需要监听tcp协议
- 写入kafka
使用golang去做io密集性的事情真是简单又方便啊
具体实现
- 使用cobra 处理命令行模式
package cmd
import (
"ho/pkg/global"
"ho/pkg/kafka"
"ho/pkg/memcache"
"log"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
)
var mcCmd = &cobra.Command{
Use: "mc",
Short: "运行服务",
Long: "运行服务",
Run: func(cmd *cobra.Command, args []string) {},
}
var mcKafkaCmd = &cobra.Command{
Use: "kafka",
Short: "配置文件",
Long: "配置文件地址",
Run: func(cmd *cobra.Command, args []string) {
log.Println("mc to kafka")
log.Println("configFilePath:" configFilePath " configFileName:" configFileName)
//init config
global.InitConfig(configFilePath, configFileName)
//init kafka
kafka.InitKafka()
//init server
mcServer := memcache.GetMemcacheServer()
mcServer.RegisterFunc("set", memcache.McSendToKafka)
mcServer.Start()
//sign
signChan := make(chan os.Signal, 1)
signal.Notify(signChan, syscall.SIGINT, syscall.SIGTERM)
//wait
for {
select {
case <-signChan: //stop sign
global.LOGGER.Info("end")
mcServer.Stop()
kafka.StopAll()
return
}
}
},
}
var configFilePath string
var configFileName string
func init() {
mcCmd.AddCommand(mcKafkaCmd)
mcKafkaCmd.Flags().StringVarP(&configFilePath, "path", "p", "", `配置文件路径`)
mcKafkaCmd.Flags().StringVarP(&configFileName, "filename", "f", "", `配置文件名称`)
}
- 使用rpcxio/gomemcached 包解析tcp协议,解包和压缩包
func McSendToKafka(ctx context.Context, req *mc.Request, res *mc.Response) error {
key := req.Key
value := req.Data
sendData := string(value)
global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))
kafka.SendTokafka(key, sendData)
res.Response = mc.RespStored
return nil
}
- 使用https://github.com/NetEase-Media/ngo 的kafka库写入kafka
不过这个库我做了改造
- 需要支持安全认证
- 他的kafka必须有消费者这个是不合理的。
func McSendToKafka(ctx context.Context, req *mc.Request, res *mc.Response) error {
key := req.Key
value := req.Data
sendData := string(value)
global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))
kafka.SendTokafka(key, sendData)
res.Response = mc.RespStored
return nil
}
func SendTokafka(key, value string) {
kafkaProduce := GetProducer(key)
if kafkaProduce != nil {
kafkaProduce.Send(key, value, func(meta *RecordMetadata, err error) {
if err != nil {
global.LOGGER.Sugar().Errorf("error:%v", err)
} else {
global.LOGGER.Sugar().Info(meta)
}
})
} else {
global.LOGGER.Sugar().Errorf("topic %s not exist", key)
}
}
源码地址: https://github.com/beckbikang/ho