CKafka系列学习文章 - 用java调用Ckafka实例相关接口(十二)

2019-09-16 11:13:08 浏览数 (1)

导语:创建后的实例信息,后期想进行调整,可以调用API进行管理。

一、获取实例列表

1、接口描述

接口请求域名:ckafka.api.zijiebao.com 本接口(ListInstance)用于在用户账户下获取消息队列 CKafka 实例列表。

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤。

searchWord

String

(过滤条件)按照实例名称过滤,支持模糊查询。

status.n

Int

(过滤条件)实例的状态。0:创建中,1:运行中,2:删除中,不填默认返回全部。

offset

Int

偏移量,不填默认为0。

limit

Int

返回数量,不填则默认为10,最大值为20。

3、java实现

代码语言:javascript复制
 /* 获取实例列表 */
 public static String getListIinstance(String nonce,String timestamp) {
  Map<String, Object> param = new HashMap<>();
 param.put("Action", "ListInstance");
 param.put("Nonce", nonce);
 param.put("Timestamp",timestamp);
 param.put("SignatureMethod", signatureMethod);
 param.put("Region", "ap-guangzhou");
 param.put("SecretId", secretId);
  return getResult(param,url);
 }

4、调用

代码语言:javascript复制
 //获取实例列表
  String str3=getListIinstance(nonce,timestamp); 

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"totalCount":2,"instanceList":[{"instanceId":"ckafka-vekvsdp","instanceName":"bowenqiu_ckafka","status":1,"ifCommunity":true},{"instanceId":"ckafka-qwkshhnx","instanceName":"bearsi_cdn_log_test","status":1,"ifCommunity":true}]}}

二、获取实例属性

1、接口描述

接口请求域名:ckafka.api.zijiebao.com 本接口(GetInstanceAttributes)用于在用户账户下获取消息队列 CKafka 实例属性。

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

实例 ID

3、java实现

代码语言:javascript复制
 /* 获取实例属性 */
 public static String GetInstanceAttributes(String nonce,String timestamp) {
  Map<String, Object> param = new HashMap<>();
 param.put("Action", " GetInstanceAttributes ");
 param.put("Nonce", nonce);
 param.put("instanceId",instanceId);
 param.put("Timestamp",timestamp);
 param.put("SignatureMethod", signatureMethod);
 param.put("Region", "ap-guangzhou");
 param.put("SecretId", secretId);
  return getResult(param,url);
 }

4、调用

代码语言:javascript复制
//获取实例属性
  String str4=GetInstanceAttributes(nonce,timestamp);  

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"tags":[],"instanceId":"ckafka-3vekvsdp","instanceName":"Jensen_ckafka","vipList":[{"vip":"10.66.249.112","vport":"9092"}],"vip":"10.66.249.112","vport":"9092","status":1,"bandwidth":320,"diskSize":300,"zoneId":100003,"vpcId":"","subnetId":"","healthy":1,"healthyMessage":"","createTime":1566388895,"expireTime":1569067295,"msgRetentionTime":2880,"config":{"auto.create.topics.enable":"true","num.partitions":2,"default.replication.factor":2},"remainderPartitions":43,"remainderTopics":22,"createdPartitions":17,"createdTopics":3}}

三、设置实例属性

1、接口描述

接口请求域名:ckafka.api.zijiebao.com 本接口(SetInstanceAttributes)用于设置消息队列 CKafka 实例属性。

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 文档。

参数名称

是否必选

类型

描述

instanceId

String

实例 ID。

instanceName

String

待修改的实例名称,实例名称长度不超过64字节。

msgRetentionTime

Int

实例日志的最长保留时间,单位:分钟,最小为1min,最大为30天。

autoCreateTopicEnable

Int

是否开启自动创建 topic,1:开启 ,0:不开启。

defaultNumPartitions

Int

自动创建主题分区个数,如果 autoCreateTopicEnable 设置为 true, 没有设置该值时,默认设置为3。

defaultReplicationFactor

Int

自动创建主题副本数,如果 autoCreateTopicEnable 设置为 true, 没有设置该值时,默认设置为2。

3、java实现

代码语言:javascript复制
/* 设置实例属性 */
    public static String SetInstanceAttributes(String nonce,String timestamp) {

    	Map<String, Object> param = new HashMap<>();
        param.put("Action", " SetInstanceAttributes");
        param.put("Nonce", nonce);
        param.put("instanceId",instanceId);
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
        //将实例的名称修改为Jensen_ckafka
        param.put("instanceName", "Jensen_ckafka");
        //将实例日志的最长保留时间(单位:分钟),2天:60*24*2
        param.put("msgRetentionTime", 2880);
        //是否开启自动创建 topic,1:开启 ,0:不开启
        param.put("autoCreateTopicEnable ", 1); //这个参数没设置时,下面的两个参数是不生效的。
        //自动创建主题分区个数,如果 autoCreateTopicEnable 设置为 true, 没有设置该值时,默认设置为3。
        param.put("defaultNumPartitions", 2);
        //自动创建主题副本数,如果 autoCreateTopicEnable 设置为 true, 没有设置该值时,默认设置为2。
        param.put("defaultReplicationFactor",2);
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//设置实例属性
    	String str5=SetInstanceAttributes(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success"}

四、查询消费分组信息

1、接口描述

本接口(ListConsumerGroup)用于在用户账户下获取 CKafka 消费分组信息。

接口请求域名:ckafka.api.zijiebao.com

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤

groupName

String

消费分组名称(精确匹配)

topicName

String

主题名称,若不填 groupName,则该参数会被忽略

offset

Int

偏移量,不填默认为0

limit

Int

返回数量,不填则默认50,最大值50

3、java实现

代码语言:javascript复制
/* 
	 *  查询消费分组信息 
	 * 查询所有的消费分组信息
	 */
    public static String GetListConsumerGroup(String nonce,String timestamp) {

    	Map<String, Object> param = new HashMap<>();
        param.put("Action", "ListConsumerGroup");
        param.put("Nonce", nonce);
        param.put("instanceId",instanceId);
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//查询消费分组信息 
    	String str6=GetListConsumerGroup(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"totalCount":11,"topicList":
{"topic_test1":"topic-7p2pzlo4","bowenqiu_test1":"inter-topic-kfjiqvgo","bowenqiu_topic1":"topic-3yueh1ge"},"groupList":{
"console-consumer-17873":{"topic_test1":[349333]},
"console-consumer-18492":{"topic_test1":[349358]},
"console-consumer-2529":{"bowenqiu_test1":[1,0,0],"topic_test1":[349344]},
"console-consumer-31005":{"topic_test1":[349333]},
"console-consumer-39207":{"topic_test1":[349335]},
"console-consumer-44775":{"topic_test1":[349390]},
"console-consumer-54312":{"topic_test1":[349353]},
"console-consumer-56107":{"topic_test1":[349341]},
"console-consumer-56480":{"topic_test1":[349362]},
"console-consumer-92575":{"topic_test1":[349368]},
"console-consumer-92728":{"topic_test1":[349374]}},
"totalPartition":0,"partitionListForMonitor":[],"totalTopic":2,"topicListForMonitor":[{"topicName":"topic_test1","topicId":"topic-7p2pzlo4"},{"topicName":"bowenqiu_test1","topicId":"inter-topic-kfjiqvgo"}],"groupListForMonitor":[{"groupName":"console-consumer-17873"},{"groupName":"console-consumer-18492"},{"groupName":"console-consumer-2529"},{"groupName":"console-consumer-31005"},{"groupName":"console-consumer-39207"},{"groupName":"console-consumer-44775"},{"groupName":"console-consumer-54312"},{"groupName":"console-consumer-56107"},{"groupName":"console-consumer-56480"},{"groupName":"console-consumer-92575"},{"groupName":"console-consumer-92728"}]}}

五、查询消费分组信息(精简版)

1、接口描述

本接口 (ListGroup) 用于在用户账户下获取 CKafka 消费分组信息,该接口为精简版接口,返回消费分组信息较少,可以调用 GetGroupInfo 查询消费分组详细信息。

接口请求域名:ckafka.api.zijiebao.com

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤。

group

String

消费分组(精确匹配)。

searchWord

String

模糊匹配 group。

offset

Int

偏移量,不填默认为 0。

limit

Int

返回数量,不填则默认 20,最大值 50。

3、java实现

代码语言:javascript复制
	/* 查询消费分组信息(精简版) */
    public static String GetListGroup(String nonce,String timestamp) {

    	Map<String, Object> param = new HashMap<>();
        param.put("Action", "ListGroup");
        param.put("Nonce", nonce);
        param.put("instanceId",instanceId);//不加,会报4000错误
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//查询消费分组信息(精简版)
    	String str7=GetListGroup(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"totalCount":11,"groupList":[{"group":"console-consumer-92728","protocol":"consumer"},{"group":"console-consumer-92575","protocol":"consumer"},{"group":"console-consumer-2529","protocol":"consumer"},{"group":"console-consumer-56107","protocol":"consumer"},{"group":"console-consumer-56480","protocol":"consumer"},{"group":"console-consumer-17873","protocol":"consumer"},{"group":"console-consumer-18492","protocol":"consumer"},{"group":"console-consumer-44775","protocol":"consumer"},{"group":"console-consumer-54312","protocol":"consumer"},{"group":"console-consumer-31005","protocol":"consumer"},{"group":"console-consumer-39207","protocol":"consumer"}]}}

六、获取消费分组信息

1、接口描述

本接口(GetGroupInfo)用于在用户账户下获取 CKafka 消费分组详细信息。 接口请求域名:ckafka.api.zijiebao.com

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤。

group.N

String Array

Kafka 消费分组,Consumer-group,这里是数组形式,格式:group.0=xxx&group.1=yyy。

3、java实现

代码语言:javascript复制
	/* 获取消费分组信息 */
    public static String GetGroupInfo(String nonce,String timestamp) {

    	Map<String, Object> param = new HashMap<>();
        param.put("Action", "GetGroupInfo");
        param.put("Nonce", nonce);
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
        param.put("instanceId",instanceId);
        param.put("group.0","console-consumer-2529");
        param.put("group.1","console-consumer-56107");
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//获取消费分组信息
    	String str8=GetGroupInfo(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":[
{"errCode":0,"state":"Stable","protocolType":"consumer","protocol":"range","group":"console-consumer-2529","members":[{"member_id":"consumer-1-/10.1.1.250-2019-09-09 20:18:16:801-6fdd1719-ff19-4bc1-804b-b833a0925523","client_id":"consumer-1","client_host":"/10.1.1.250","assignment":{"version":0,"topics":[{"topic":"topic_test1","partitions":[0]}]}}]},
{"errCode":0,"state":"Empty","protocolType":"consumer","protocol":"","group":"console-consumer-56107","members":[]}]}

七、获取消费分组offset

1、接口描述

接口请求域名:ckafka.api.zijiebao.com 本接口(GetGroupOffsets)用于在用户账户下获取 CKafka 消息分组 offset。

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤

group

String

Kafka 消费分组

topics

String Array

group 订阅的主题名称数组,如果没有该数组,则表示指定的 group 下所有 topic 信息

searchWord

String

模糊匹配 topicName

offset

Int

本次查询的偏移位置,默认为0

limit

Int

本次返回结果的最大个数,默认为50,最大值为50

3、java实现

代码语言:javascript复制
    /* 获取消费分组 offset接口 */
    public static String GetGroupOffsets(String nonce,String timestamp) {
    	
    	Map<String, Object> param = new HashMap<>();
        param.put("Action", "GetGroupOffsets");
        param.put("Nonce", nonce);
        param.put("instanceId",instanceId);
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
        param.put("group", "console-consumer-2529");      
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//获取消费分组 offset接口 
    	String str1=GetGroupOffsets(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"totalCount":2,"topicList":[
{"topic":"bowenqiu_test1","partitions":
[{"partition":0,"offset":1,"meta":null,"errCode":0,"logEndOffset":1,"lag":0},
{"partition":1,"offset":0,"meta":null,"errCode":0,"logEndOffset":0,"lag":0},
{"partition":2,"offset":0,"meta":null,"errCode":0,"logEndOffset":0,"lag":0}]},
{"topic":"topic_test1","partitions":
[{"partition":0,"offset":349416,"meta":null,"errCode":0,"logEndOffset":349416,"lag":0}]}]}}

八、设置消费分组offset

1、接口描述

本接口 (SetGroupOffsets) 用于在用户账户下设置 CKafka 实例某个消费分组 offset。

接口请求域名:ckafka.api.zijiebao.com

2、输入参数

以下请求参数列表仅列出了接口请求参数,其它参数见 公共请求参数 页面。

参数名称

是否必选

类型

描述

instanceId

String

(过滤条件)按照实例 ID 过滤。

group

String

kafka 消费分组。

topics

String Array

表示需要重置 offset 的 topic 数组,不填表示全部 topic。

strategy

Int

重置 offset 的策略,入参含义: 0:对齐 shift-by 参数,代表把 offset 向前或向后移动 shift 条。 1:对齐参考(by-duration,to-datetime,to-earliest,to-latest),代表把 offset 移动到指定timestamp的位置。 2:对齐参考(to-offset),代表把 offset 移动到指定的 offset 位置。

shift

Int

当 strategy 为 0 时,必须包含该字段,可以大于零代表会把 offset 向后移动 shift 条,小于零则将 offset 向前回溯 shift 条数。正确重置后新的 offset 应该是(old_offset shift),如果新的 offset 小于 partition 的 earliest 则会设置为 earliest,如果大于partition 的 latest 则会设置为 latest。

timestamp

Int

单位:ms。当 strategy 为 1 时,必须包含该字段,其中 -2 表示重置 offset 到最开始的位置,-1 表示重置到最新的位置(相当于清空),其它值则代表指定的时间,会获取 topic 中指定时间的 offset 然后进行重置,如果指定的时间不存在消息,则获取最末尾的 offset。

offset

Int

需要重新设置的 offset 位置。当 strategy 为 2 时,必须包含该字段。

3、java实现

代码语言:javascript复制
    /* 设置消费分组 offset */
    public static String SetGroupOffsets(String nonce,String timestamp) {
    	Map<String, Object> param = new HashMap<>();
        param.put("Action", "SetGroupOffsets");
        param.put("Nonce", nonce);
        param.put("Timestamp",timestamp);
        param.put("SignatureMethod", signatureMethod);
        param.put("Region", "ap-guangzhou");
        param.put("SecretId", secretId);
        param.put("instanceId",instanceId);
        param.put("group", "console-consumer-2529");  
        param.put("topics.0","topic_test1");//String Array的表示方式
        param.put("topics.1","bowenqiu_test1");
        param.put("strategy",0);
        param.put("shift",2);
    	return getResult(param,url);
    }

4、调用

代码语言:javascript复制
    	//设置消费分组 offset
    	String str9=SetGroupOffsets(nonce,timestamp);

5、结果

代码语言:javascript复制
{"code":0,"message":"","codeDesc":"Success","data":{"succ":[{"topic":"topic_test1","error_code":0,"partitions":[{"partition":0,"offset":349416,"error_code":0}]},{"topic":"bowenqiu_test1","error_code":0,"partitions":[{"partition":2,"offset":0,"error_code":0},{"partition":1,"offset":0,"error_code":0},{"partition":0,"offset":1,"error_code":0}]}],"failed":[]}}

0 人点赞