mac 上学习k8s系列(51)延迟队列pulsar

2022-12-17 16:26:18 浏览数 (2)

kafka不支持延迟消息,rocketmq支持的延迟消息粒度有限,pulsar(https://github.com/apache/pulsar)采用优先队列的方式实现,支持任意粒度的延迟消息,不过,对于大量延迟比较久的消息,内存消耗会比较严重。本文学习下如何在mac上搭建pulsar,并通过go 的sdk实现消息的发布和订阅。

代码语言:javascript复制
docker pull apachepulsar/pulsar:latest
latest: Pulling from apachepulsar/pulsar
d7bfe07ed847: Pull complete
75dfa6203d6c: Pull complete
797c4fa83169: Pull complete
5f506e0917d3: Pull complete
2bf3a856127c: Pull complete
378ca9dc24a7: Pull complete
281b3f6d1348: Pull complete
c69f798d8aaf: Pull complete
7b05439db137: Pull complete
Digest: sha256:4a952b3c662b94247ffc4ff17be16ef176c293baaf346db13a095970f43adfd6
Status: Downloaded newer image for apachepulsar/pulsar:latest
docker.io/apachepulsar/pulsar:latest
代码语言:javascript复制
docker run -d -it -p 6650:6650 -p 8080:8080 --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone
935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67
docker cp 935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67:/pulsar/conf learn/pulsar 
docker cp 935d2e372d0cf924967c6c6b9851b51a12b684e6c50acf9e273f7f8ea1f0de67:/pulsar/data learn/pulsar

然后启动

代码语言:javascript复制
docker run -d -it -p 6650:6650 -p 8080:8080 -v ~/learn/pulsar/data:/pulsar/data -v ~/learn/pulsar/conf:/pulsar/conf   --add-host=host.docker.internal:host-gateway --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone

测试下生产一个消息

代码语言:javascript复制
docker exec -it pulsar-standalone bash bin/pulsar-client produce my-topic --messages "hello-pulsar"
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.pulsar.common.util.netty.DnsResolverUtil (file:/pulsar/lib/org.apache.pulsar-pulsar-common-2.10.1.jar) to method sun.net.InetAddressCachePolicy.get()
WARNING: Please consider reporting this to the maintainers of org.apache.pulsar.common.util.netty.DnsResolverUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2022-11-06T09:24:09,639 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]] Connected to server
2022-11-06T09:24:13,141 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"my-topic","producerName":null,"sendTimeoutMs":30000,"blockIfQueueFull":false,"maxPendingMessages":1000,"maxPendingMessagesAcrossPartitions":50000,"messageRoutingMode":"RoundRobinPartition","hashingScheme":"JavaStringHash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":1000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"NONE","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{},"initialSubscriptionName":null}
2022-11-06T09:24:13,425 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"*****","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-11-06T09:24:13,515 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [null] Creating producer on cnx [id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]
2022-11-06T09:24:13,625 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-1-3] Created producer on cnx [id: 0xe306c336, L:/127.0.0.1:40538 - R:localhost/127.0.0.1:6650]
2022-11-06T09:24:14,118 0000 [main] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
2022-11-06T09:24:14,258 0000 [main] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
2022-11-06T09:24:14,299 0000 [main] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [my-topic] [standalone-1-3] Pending messages: 0 --- Publish throughput: 1.20 msg/s --- 0.00 Mbit/s --- Latency: med: 0.000 ms - 95pct: 0.000 ms - 99pct: 0.000 ms - 99.9pct: 0.000 ms - max: -? ms --- BatchSize: med: 1.000 - 95pct: 1.000 - 99pct: 1.000 - 99.9pct: 1.000 - max: 1.000 --- MsgSize: med: 12.000 bytes - 95pct: 12.000 bytes - 99pct: 12.000 bytes - 99.9pct: 12.000 bytes - max: 12.000 bytes --- Ack received rate: 0.00 ack/s --- Failed messages: 0 --- Pending messages: 0
2022-11-06T09:24:14,342 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [my-topic] [standalone-1-3] Closed Producer
2022-11-06T09:24:14,434 0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xe306c336, L:/127.0.0.1:40538 ! R:localhost/127.0.0.1:6650] Disconnected
2022-11-06T09:24:16,530 0000 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced

当然也可以部署下管理后台

代码语言:javascript复制
 docker run -it 
  -p 9527:9527 -p 7750:7750 
  -e SPRING_CONFIGURATION_FILE=~/learn/pulsar/pulsar-manager/application.properties  
  apachepulsar/pulsar-manager

然后启动我们的消费者

代码语言:javascript复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
  client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:               "pulsar://localhost:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
    OperationTimeout:  60 * time.Second,
    ConnectionTimeout: 60 * time.Second,
  })

  defer client.Close()

  if err != nil {
    log.Fatalf("Could not instantiate Pulsar client: %v", err)
  }

  consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "my-topic",
    SubscriptionName: "my-sub",
    Type:             pulsar.Shared,
  })
  if err != nil {
    log.Fatal(err)
  }
  defer consumer.Close()

  for i := 0; i < 10; i   {
    msg, err := consumer.Receive(context.Background())
    if err != nil {
      log.Fatal(err)
    }

    fmt.Printf("Received message msgId: %#v -- content: '%s'n",
      msg.ID(), string(msg.Payload()))

    consumer.Ack(msg)
  }

  if err := consumer.Unsubscribe(); err != nil {
    log.Fatal(err)
  }
}
代码语言:javascript复制
% go run learn/pulsar/consumer/main.go
INFO[0000] Connecting to broker                          remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established                    local_addr="[::1]:55467" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready                           local_addr="[::1]:55467" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected consumer                            consumerID=1 name=tskmj subscription=my-sub topic="persistent://public/default/my-topic"
INFO[0000] Created consumer                              consumerID=1 name=tskmj subscription=my-sub topic="persistent://public/default/my-topic"

Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:25, entryID:1, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0002b8000), receivedTime:time.Time{wall:0xc0d1eb16ee415ab8, ext:120175243016, loc:(*time.Location)(0x4c06960)}} -- content: 'hello'

生产一个消息

代码语言:javascript复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
  client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL:               "pulsar://localhost:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
    OperationTimeout:  60 * time.Second,
    ConnectionTimeout: 60 * time.Second,
  })

  defer client.Close()

  if err != nil {
    log.Fatalf("Could not instantiate Pulsar client: %v", err)
  }

  fmt.Println(client.TopicPartitions("my-topic"))

  producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: "my-topic",
  })

  if err != nil {
    log.Fatal(err)
  }

  _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
    Payload: []byte("hello"),
  })

  defer producer.Close()

  if err != nil {
    fmt.Println("Failed to publish message", err)
  }
  fmt.Println("Published message")

}
代码语言:javascript复制
go run learn/pulsar/producer/main.go
INFO[0000] Connecting to broker                          remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established                    local_addr="[::1]:55454" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready                           local_addr="[::1]:55454" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected producer                            cnx="[::1]:55454 -> [::1]:6650" epoch=0 topic="persistent://public/default/my-topic"
INFO[0000] Created producer                              cnx="[::1]:55454 -> [::1]:6650" producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"
Published message
INFO[0000] Closing producer                              producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"
INFO[0000] Closed producer                               producerID=1 producer_name=standalone-2-0 topic="persistent://public/default/my-topic"

0 人点赞