导语:介于TDMQ还没有公网的访问功能,不可能买台CVM安装windows吧,VPN又只能支持协议类型: IKE/IPsec,意思是企业用户才能用,对于个人就只能再想办法了,但办法总比问题多。本地开发测试环境使用pulsar的单机版,生产使用TDMQ,这样怎么样,一起来看看怎么配置。
一、用CVM安装单机版的pulsar
1、安装JDK1.8
下载JDK1.8:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
a、将免安装的JDK拷贝到linux目录下
/etc/jdk1.8.0_271
b、更改环境变量
vim /etc/profile
代码语言:javascript复制export JAVA_HOME=/etc/jdk1.8.0_271
export PATH=.:$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
c、刷新配置
source /etc/profile
这样就可以了
2、安装pulsar
下载安装二进制版本pulsar:http://pulsar.apache.org/docs/zh-CN/next/standalone/
这里下载比较快:
解压之后启动:bin/pulsar-daemon start standalone
启动日志(简略版):
代码语言:javascript复制14:09:02.061 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting ZK server
14:09:02.467 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Server UP
14:09:02.467 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - ZooKeeper server up: true
14:09:02.467 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Instantiate ZK Client
14:09:02.586 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting Bookie(s)
14:09:02.980 [main] INFO org.apache.bookkeeper.meta.MetadataDrivers - BookKeeper metadata driver manager initialized
14:09:02.984 [main] INFO org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase - Initialize zookeeper metadata driver at metadata service uri zk null:/
/127.0.0.1:2181/ledgers : zkServers = 127.0.0.1:2181, ledgersRootPath = /ledgers.
14:09:02.987 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=10000 watcher=or
g.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@51751e5f
14:09:02.997 [main-EventThread] INFO org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase - ZooKeeper client is connected now.
14:09:03.140 [main] INFO org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Started Db Ledger Storage
14:09:03.140 [main] INFO org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - - Number of directories: 1
14:09:03.140 [main] INFO org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - - Write cache size: 1024 MB
14:09:03.140 [main] INFO org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - - Read Cache: 1024 MB
14:09:03.143 [main] INFO org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Creating single directory db ledger storage on data/standalone/b
ookkeeper0/current
注:如果使用命令在后台运行服务:pulsar-daemon start standalone;则可以通过以下命令终止服务:pulsar-daemon stop standalone
3、使用pulsar-client测试生产消费
Produce 消息:
向名称为 my-topic 的 topic 发送一条简单的消息 hello-pulsar:
bin/pulsar-client produce my-topic --messages "hello-pulsar"
Consume 消息:
在 first-subscription 订阅的my-topic消费消息:
bin/pulsar-client consume my-topic -s "first-subscription"
二、搭建本地开发环境
1、下载TDMQ的demo:
https://github.com/TencentCloud/tdmq-java-client
2、下载Pulsar的Java SDK 下载方式:
您 Java 工程的 全整pom.xml
代码语言:javascript复制<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tencent</groupId>
<artifactId>tdmq-demo-cloud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>tdmq-demo-cloud</name>
<url>http://maven.apache.org</url>
<properties>
<pulsar.version>2.6.0</pulsar.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<!--scope>test</scope-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<addMavenDescriptor>false</addMavenDescriptor>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
</manifest>
</archive>
<excludes>
<exclude>**/assembly/</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptors>
<descriptor>assembly.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
在 pom.xml 所在目录执行即可下载 Pulsar SDK。
代码语言:javascript复制mvn clean package
3、连接单机版的pulsar的测试代码:SimpleProducerAndCosnumer.java
代码语言:javascript复制
package com.tencent.tdmq.demo.cloud;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
//import org.apache.pulsar.client.api.NetModel;
/**
* 简单的生产和消息例子
*
*/
public class SimpleProducerAndCosnumer {
public static void invork() throws PulsarClientException {
Map<String, String> authParams = new HashMap<>();
//authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
//authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
//authParams.put("region", "ap-guangzhou");
PulsarClient client = PulsarClient.builder()
//.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
//.listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")
//.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))
.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
.build();
// 创建消费者对象
Consumer<byte[]> consumer = client.newConsumer()
//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//命名规则:appid/namespace/topic
.topic("my-topic")
.subscriptionName("sub-sfhuang")
.subscribe();
// 创建生产者对象
Producer<byte[]> producer = client.newProducer()
//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
.topic("my-topic")
.create();
for (int i = 0; i < 5; i ) {
String value = "my-sync-message-" i;
System.out.println("");
MessageId msgId = producer.newMessage().value(value.getBytes()).send();
System.out.println("produce sync msg id:" msgId ", value:" value);
}
producer.close();
for (int i = 0; i < 5; i ) {
Message<byte[]> msg = consumer.receive();
String msgId = msg.getMessageId().toString();
String value = new String(msg.getValue());
System.out.println("receive msg " msgId ",value:" value);
consumer.acknowledge(msg);
}
// 关闭
consumer.close();
client.close();
}
public static void main(String[] args) throws JoranException, PulsarClientException {
String logbackFile = "D:\programming\tdmq-java-client-master\conf\logback.xml";
//String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
if (logbackFile != null) {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(logbackFile);
}
invork();
}
}
还有logback.xml文件也要修改:
本地运行结果是:
三、测试连接TDMQ并生产消费
1、创建TDMQ的topic
2、创建接入点
3、创建角色
4、对环境进行权限配置
有读写的权限:
5、创建订阅者
订阅者:sub-sfhuang
6、修改本地代码:
代码语言:javascript复制package com.tencent.tdmq.demo.cloud;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
//import org.apache.pulsar.client.api.NetModel;
/**
* 简单的生产和消息例子
*
*/
public class SimpleProducerAndCosnumer {
public static void invork() throws PulsarClientException {
Map<String, String> authParams = new HashMap<>();
authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
authParams.put("region", "ap-guangzhou");
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
.listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")//路由ID
.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))//秘钥
//.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
.build();
// 创建消费者对象
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云TDMQ的topic
//.topic("my-topic")
.subscriptionName("sub-sfhuang")//这个要在控制台创建
.subscribe();
// 创建生产者对象
Producer<byte[]> producer = client.newProducer()
.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
//.topic("my-topic")
.create();
for (int i = 0; i < 5; i ) {
String value = "my-sync-message-" i;
System.out.println("");
MessageId msgId = producer.newMessage().value(value.getBytes()).send();
System.out.println("produce sync msg id:" msgId ", value:" value);
}
producer.close();
for (int i = 0; i < 5; i ) {
Message<byte[]> msg = consumer.receive();
String msgId = msg.getMessageId().toString();
String value = new String(msg.getValue());
System.out.println("receive msg " msgId ",value:" value);
consumer.acknowledge(msg);
}
// 关闭
consumer.close();
client.close();
}
public static void main(String[] args) throws JoranException, PulsarClientException {
//String logbackFile = "D:\programming\tdmq-java-client-master\conf\logback.xml";
String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
if (logbackFile != null) {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(logbackFile);
}
invork();
}
}
还有logback.xml文件也要修改:
7、在 pom.xml 所在目录执行即可打zip包。
代码语言:javascript复制mvn clean package
我本地所在的目录:
四、打包上传并测试
这里的生产环境也是要有JDK1.8,可以使用Pulsar单机版的机器测试
1、上传zip包
[root@VM-0-9-centos ~]# rz
Sent - tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip 5.25 MB/s Spend: 5 seconds
2、解压
[root@VM-0-9-centos ~]# unzip tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip
Archive: tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip
creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/
creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml
creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/runserver.sh
creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-2.6.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-api-2.6.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-transaction-common-2.6.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-3.5.1.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-util-3.5.1.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/guava-19.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/gson-2.7.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-shaded-2.1.0-incubating.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jsr305-3.0.2.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/checker-qual-2.0.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/error_prone_annotations-2.1.3.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/j2objc-annotations-1.1.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/animal-sniffer-annotations-1.14.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/aircompressor-0.16.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.ws.rs-api-2.1.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/bouncy-castle-bc-shaded-2.6.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.activation-1.2.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/slf4j-api-1.7.25.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/validation-api-1.1.0.Final.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jcip-annotations-1.0.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-classic-1.2.3.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-core-1.2.3.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/junit-3.8.1.jar
inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar
[root@VM-0-9-centos ~]# cd tdmq-demo-cloud-0.0.1-SNAPSHOT/
[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# ll
total 12
drwxr-xr-x 2 root root 4096 Nov 19 11:34 bin
drwxr-xr-x 2 root root 4096 Dec 3 11:16 conf
drwxrwxrwx 2 root root 4096 Dec 3 12:49 lib
3、授予执行权限
[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# chmod 755 bin/runserver.sh
4、运行代码
[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# bin/runserver.sh -classpath lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar com.tencent.tdmq.demo.cloud.SimpleProducerAndCosnumer
结果:成功了!!!
总结:使用线上的TDMQ省去了运维,扩展性也会更好,配置也不复杂,公测期间还免费,快快来体验一下吧。
后面会为大家分享TDMQ的其它使用。