手把手教学--从Pulsar到TDMQ

2020-12-04 09:27:35 浏览数 (1)

导语:介于TDMQ还没有公网的访问功能,不可能买台CVM安装windows吧,VPN又只能支持协议类型:IKE/IPsec,意思是企业用户才能用,对于个人就只能再想办法了,但办法总比问题多。本地开发测试环境使用pulsar的单机版,生产使用TDMQ,这样怎么样,一起来看看怎么配置。

一、用CVM安装单机版的pulsar

1、安装JDK1.8

下载JDK1.8:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

a、将免安装的JDK拷贝到linux目录下

/etc/jdk1.8.0_271

b、更改环境变量

vim /etc/profile

代码语言:javascript复制
export JAVA_HOME=/etc/jdk1.8.0_271
export PATH=.:$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

c、刷新配置

source /etc/profile

这样就可以了

2、安装pulsar

下载安装二进制版本pulsar:http://pulsar.apache.org/docs/zh-CN/next/standalone/

这里下载比较快:

解压之后启动:bin/pulsar-daemon start standalone

启动日志(简略版):

代码语言:javascript复制
14:09:02.061 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting ZK server
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Server UP
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - ZooKeeper server up: true
14:09:02.467 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Instantiate ZK Client
14:09:02.586 [main] INFO  org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Starting Bookie(s)
14:09:02.980 [main] INFO  org.apache.bookkeeper.meta.MetadataDrivers - BookKeeper metadata driver manager initialized
14:09:02.984 [main] INFO  org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase - Initialize zookeeper metadata driver at metadata service uri zk null:/
/127.0.0.1:2181/ledgers : zkServers = 127.0.0.1:2181, ledgersRootPath = /ledgers.
14:09:02.987 [main] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=10000 watcher=or
g.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@51751e5f
14:09:02.997 [main-EventThread] INFO  org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase - ZooKeeper client is connected now.
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Started Db Ledger Storage
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Number of directories: 1
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Write cache size: 1024 MB
14:09:03.140 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage -  - Read Cache: 1024 MB
14:09:03.143 [main] INFO  org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage - Creating single directory db ledger storage on data/standalone/b
ookkeeper0/current

注:如果使用命令在后台运行服务:pulsar-daemon start standalone;则可以通过以下命令终止服务:pulsar-daemon stop standalone

3、使用pulsar-client测试生产消费

Produce 消息:

向名称为 my-topic 的 topic 发送一条简单的消息 hello-pulsar:

bin/pulsar-client produce my-topic --messages "hello-pulsar"

Consume 消息:

在 first-subscription 订阅的my-topic消费消息:

bin/pulsar-client consume my-topic -s "first-subscription"

这样是消费到消息了,单机版的pulsar是可以使用的。这样是消费到消息了,单机版的pulsar是可以使用的。

二、搭建本地开发环境

1、下载TDMQ的demo:

https://github.com/TencentCloud/tdmq-java-client

2、下载Pulsar的Java SDK 下载方式:

您 Java 工程的 全整pom.xml

代码语言:javascript复制
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.tencent</groupId>
	<artifactId>tdmq-demo-cloud</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>tdmq-demo-cloud</name>
	<url>http://maven.apache.org</url>

	<properties>
		<pulsar.version>2.6.0</pulsar.version>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.pulsar</groupId>
			<artifactId>pulsar-client</artifactId>
			<version>${pulsar.version}</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<!--scope>test</scope-->
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<archive>
						<addMavenDescriptor>false</addMavenDescriptor>
						<manifest>
							<addClasspath>true</addClasspath>
							<classpathPrefix>lib/</classpathPrefix>
						</manifest>
					</archive>
					<excludes>
						<exclude>**/assembly/</exclude>
					</excludes>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				  <version>2.3.2</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
					<encoding>UTF-8</encoding>
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.2.1</version>
				<configuration>
					<descriptors>
						<descriptor>assembly.xml</descriptor>
					</descriptors>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

在 pom.xml 所在目录执行即可下载 Pulsar SDK。

代码语言:javascript复制
mvn clean package

3、连接单机版的pulsar的测试代码:SimpleProducerAndCosnumer.java

代码语言:javascript复制
package com.tencent.tdmq.demo.cloud;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

//import org.apache.pulsar.client.api.NetModel;

/**
 * 简单的生产和消息例子
 *
 */

public class SimpleProducerAndCosnumer {

	public static void invork() throws PulsarClientException {
		Map<String, String> authParams = new HashMap<>();
		//authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
		//authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
		//authParams.put("region", "ap-guangzhou");
		PulsarClient client = PulsarClient.builder()
				//.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
		        //.listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")
				//.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))
				.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
				.build();
		// 创建消费者对象
		Consumer<byte[]> consumer = client.newConsumer()
				//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//命名规则:appid/namespace/topic
				.topic("my-topic")
				.subscriptionName("sub-sfhuang")
				.subscribe();
		// 创建生产者对象
		Producer<byte[]> producer = client.newProducer()
				//.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
				.topic("my-topic")
				.create();

		for (int i = 0; i < 5; i  ) {
			String value = "my-sync-message-"   i;
			System.out.println("");
			MessageId msgId = producer.newMessage().value(value.getBytes()).send();
			System.out.println("produce sync msg id:"   msgId   ", value:"   value);
		}
		producer.close();
		for (int i = 0; i < 5; i  ) {
			Message<byte[]> msg = consumer.receive();
			String msgId = msg.getMessageId().toString();
			String value = new String(msg.getValue());
			System.out.println("receive msg "   msgId   ",value:"   value);
			consumer.acknowledge(msg);
		}
		// 关闭
		consumer.close();
		client.close();
	}

	public static void main(String[] args) throws JoranException, PulsarClientException {
		String logbackFile = "D:\programming\tdmq-java-client-master\conf\logback.xml";
		//String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
		if (logbackFile != null) {
			LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
			JoranConfigurator configurator = new JoranConfigurator();
			configurator.setContext(lc);
			lc.reset();
			configurator.doConfigure(logbackFile);
		}
		invork();

	}

}

还有logback.xml文件也要修改:

本地运行结果是:

也是可以生产和消费搭建在CVM上的Pulsar也是可以生产和消费搭建在CVM上的Pulsar

三、测试连接TDMQ并生产消费

1、创建TDMQ的topic

默认创建在default环境默认创建在default环境

2、创建接入点

这里要选CVM所在的VPC子网,才通连通,地址和路由ID都会在代码中用到。这里要选CVM所在的VPC子网,才通连通,地址和路由ID都会在代码中用到。

3、创建角色

秘钥是在这里,代码中也会用到秘钥是在这里,代码中也会用到

4、对环境进行权限配置

有读写的权限:

5、创建订阅者

这里的订阅者需要在控制台创建,系统会生产两个topic,一个是重试队列,一个是死信队列这里的订阅者需要在控制台创建,系统会生产两个topic,一个是重试队列,一个是死信队列

订阅者:sub-sfhuang

6、修改本地代码:

代码语言:javascript复制
package com.tencent.tdmq.demo.cloud;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import org.apache.pulsar.client.api.*;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

//import org.apache.pulsar.client.api.NetModel;

/**
 * 简单的生产和消息例子
 *
 */

public class SimpleProducerAndCosnumer {

	public static void invork() throws PulsarClientException {
		Map<String, String> authParams = new HashMap<>();
		authParams.put("secretId", "AKID6VfaFeTq12nJadkMnOhMFveVR7XjfMnQ");
		authParams.put("secretKey", "aS2MbkLafbiXO2RM1nTv51h8uW9XQb1t");
		authParams.put("region", "ap-guangzhou");
		PulsarClient client = PulsarClient.builder()
				.serviceUrl("pulsar://10.0.0.4:6000")// 填写腾讯云TDMQ接入点的地址
		        .listenerName("custom:pulsar-geookmr4pz/vpc-paywuhyf/subnet-6jummj9y")//路由ID
				.authentication(AuthenticationFactory.token("eyJrZXlJZCI6In9va21yNHB6IiB1bHNhci1nZWwiYWxnIjoiSFMyNTYifQ.eyJzdWIiOiJkZGRkIn0.otXNHuw3FJhQ0l4msNQe_zAH2Bh7lB8kVrfoU4XRTqs"))//秘钥
				//.serviceUrl("pulsar://110.29.14.63:6650")// 填写测试环境pulsar的地址
				.build();
		// 创建消费者对象
		Consumer<byte[]> consumer = client.newConsumer()
				.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云TDMQ的topic
				//.topic("my-topic")
				.subscriptionName("sub-sfhuang")//这个要在控制台创建
				.subscribe();
		// 创建生产者对象
		Producer<byte[]> producer = client.newProducer()
				.topic("persistent://pulsar-geookmr4pz/default/sfhuang")//这个是腾讯云的topic
				//.topic("my-topic")
				.create();

		for (int i = 0; i < 5; i  ) {
			String value = "my-sync-message-"   i;
			System.out.println("");
			MessageId msgId = producer.newMessage().value(value.getBytes()).send();
			System.out.println("produce sync msg id:"   msgId   ", value:"   value);
		}
		producer.close();
		for (int i = 0; i < 5; i  ) {
			Message<byte[]> msg = consumer.receive();
			String msgId = msg.getMessageId().toString();
			String value = new String(msg.getValue());
			System.out.println("receive msg "   msgId   ",value:"   value);
			consumer.acknowledge(msg);
		}
		// 关闭
		consumer.close();
		client.close();
	}

	public static void main(String[] args) throws JoranException, PulsarClientException {
		//String logbackFile = "D:\programming\tdmq-java-client-master\conf\logback.xml";
		String logbackFile = "/root/tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml";
		if (logbackFile != null) {
			LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
			JoranConfigurator configurator = new JoranConfigurator();
			configurator.setContext(lc);
			lc.reset();
			configurator.doConfigure(logbackFile);
		}
		invork();

	}

}

还有logback.xml文件也要修改:

7、在 pom.xml 所在目录执行即可打zip包。

代码语言:javascript复制
mvn clean package

zip包在我本地所在的目录:

四、打包上传并测试

这里的生产环境也是要有JDK1.8,可以使用Pulsar单机版的机器测试

1、上传zip包

[root@VM-0-9-centos ~]# rz

Sent - tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip 5.25 MB/s Spend: 5 seconds

2、解压

[root@VM-0-9-centos ~]# unzip tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip

Archive: tdmq-demo-cloud-0.0.1-SNAPSHOT-bin.zip

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/conf/logback.xml

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/bin/runserver.sh

creating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-client-api-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/pulsar-transaction-common-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-3.5.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-java-util-3.5.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/guava-19.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/gson-2.7.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/protobuf-shaded-2.1.0-incubating.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jsr305-3.0.2.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/checker-qual-2.0.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/error_prone_annotations-2.1.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/j2objc-annotations-1.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/animal-sniffer-annotations-1.14.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/aircompressor-0.16.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.ws.rs-api-2.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/bouncy-castle-bc-shaded-2.6.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/javax.activation-1.2.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/slf4j-api-1.7.25.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/validation-api-1.1.0.Final.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/jcip-annotations-1.0.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-classic-1.2.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/logback-core-1.2.3.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/junit-3.8.1.jar

inflating: tdmq-demo-cloud-0.0.1-SNAPSHOT/lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar

[root@VM-0-9-centos ~]# cd tdmq-demo-cloud-0.0.1-SNAPSHOT/

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# ll

total 12

drwxr-xr-x 2 root root 4096 Nov 19 11:34 bin

drwxr-xr-x 2 root root 4096 Dec 3 11:16 conf

drwxrwxrwx 2 root root 4096 Dec 3 12:49 lib

3、授予执行权限

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# chmod 755 bin/runserver.sh

4、运行代码

[root@VM-0-9-centos tdmq-demo-cloud-0.0.1-SNAPSHOT]# bin/runserver.sh -classpath lib/tdmq-demo-cloud-0.0.1-SNAPSHOT.jar com.tencent.tdmq.demo.cloud.SimpleProducerAndCosnumer

结果:成功了!!!

总结:使用线上的TDMQ省去了运维,扩展性也会更好,配置也不复杂,公测期间还免费,快快来体验一下吧。

后面会为大家分享TDMQ的其它使用。

0 人点赞