MetaQ集群安装测试

2022-06-29 21:44:19 浏览数 (1)

1,ZooKeeper集群安装,可以参考 ZooKeeper集群环境搭建实践 http://www.linuxidc.com/Linux/2013-04/83562.htm

2,下载https://github.com/killme2008/Metamorphosis/tree/metamorphosis-all-1.4.6.2,如果不想自己编译可以直接下载http://fnil.net/downloads/index.html,我这里选择自己编译,主要是以后如果出现问题自己可以修改其源码,重新编译

3,maven编译,maven环境自己搜索配置好,下载all项目后需要编译其子项目metamorphosis-server-wrapper。dos环境进入其目录下mvn eclipse:eclipse,完成后导入到eclipse,用eclipse插件编译。或者直接dos该目录下执行mvn clean install -Dmaven.test.skip=true。完成后target目录下生产其jar包;

可以在工程创建lib文件夹,输入以下命令:mvn dependency:copy-dependencies -DoutputDirectory=lib (不加DoutputDirectory会默认输出到targed/dependency下)。再把install的jar包也copy到lib下。

4,完成编译后上传到服务器

需要修改conf/server.ini文件

[system]brokerId=2

numPartitions=1

serverPort=8123

ashboardHttpPort=8120

unflushThreshold=0

unflushInterval=10000

maxSegmentSize=1073741824

maxTransferSize=1048576

deletePolicy=delete,168

deleteWhen=0 0 6,18 * * ?

flushTxLogAtCommit=1

stat=true

dataPath=/data1/metaq/data

dataLogPath=/data1/metaq/log

[zookeeper]

zk.zkConnect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181

zk.zkSessionTimeoutMs=30000

zk.zkConnectionTimeoutMs=30000

zk.zkSyncTimeMs=5000

;; Topics section

[topic=test]

[topic=meta-test]

集群的话需要修改上面标红部分,brokerId保证每个服务器节点上不一样就行

dataPath,dataLogPath如果自己制定,需要每台服务器mkdir

分发到个节点,在每台节点的bin下都执行metaServer.sh start

需要停止时执行metaServer.sh stop

查看状态sh metaServer.sh status

5,应用例子

package com.test.metaq;

import Java.util.concurrent.Executor;

import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.consumer.ConsumerConfig; import com.taobao.metamorphosis.client.consumer.MessageConsumer; import com.taobao.metamorphosis.client.consumer.MessageListener; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class AsyncConsum {

 public static void main(String[] args) {   final MetaClientConfig metaClientConfig = new MetaClientConfig();          final ZKConfig zkConfig = new ZKConfig();          zkConfig.zkConnect = "10.168.140.48:2181";          metaClientConfig.setZkConfig(zkConfig);          MessageSessionFactory sessionFactory = null;   try {   sessionFactory = new MetaMessageSessionFactory(metaClientConfig);   } catch (MetaClientException e) {   // TODO Auto-generated catch block   e.printStackTrace();   }          final String topic = "test";          final String group = "meta-example";          MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));          try {   consumer.subscribe(topic, 1024 * 1024, new MessageListener() {        public void recieveMessages(Message message) {            System.out.println("Receive message " new String(message.getData()));        }        public Executor getExecutor() {            return null;        }    });   consumer.completeSubscribe();   } catch (MetaClientException e) {   // TODO Auto-generated catch block   e.printStackTrace();   }   }

}

package com.test.metaq;

import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.producer.MessageProducer; import com.taobao.metamorphosis.client.producer.SendResult; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

public class Products {

 public static void main(String[] args) {   final MetaClientConfig metaClientConfig = new MetaClientConfig();   final ZKConfig zkConfig = new ZKConfig();   zkConfig.zkConnect = "10.168.140.48:2181";   metaClientConfig.setZkConfig(zkConfig);   MessageSessionFactory sessionFactory = null;   try {   sessionFactory = new MetaMessageSessionFactory(metaClientConfig);   } catch (MetaClientException e) {   e.printStackTrace();   }   MessageProducer producer = sessionFactory.createProducer();   final String topic = "test";   producer.publish(topic);   BufferedReader reader = new BufferedReader(new InputStreamReader(     System.in));   String line = "qiujinyong";   try {   while ((line = reader.readLine()) != null) {     SendResult sendResult = producer.sendMessage(new Message(topic,       line.getBytes()));     if (!sendResult.isSuccess()) {     System.err.println("Send message failed,error message:"       sendResult.getErrorMessage());     } else {     System.out.println("Send message successfully,sent to "       sendResult.getPartition());     }   }   } catch (IOException e) {   // TODO Auto-generated catch block   e.printStackTrace();   } catch (MetaClientException e) {   // TODO Auto-generated catch block   e.printStackTrace();   } catch (InterruptedException e) {   // TODO Auto-generated catch block   e.printStackTrace();   }

 }

}

打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.Products 命令行输入message

打包test.jar后,传服务器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行会接收到message

0 人点赞