本文是一个Kafka使用过程中的常见错误的总结。希望对你有帮助。
1、UnknownTopicOrPartitionException
代码语言:javascript复制org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition
报错内容:分区数据不在
原因分析:producer向不存在的topic发送消息,用户可以检查topic是否存在 或者设置auto.create.topics.enable参数
2、LEADER_NOT_AVAILABLE
代码语言:javascript复制WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien
报错内容:leader不可用
原因分析:原因很多 topic正在被删除 正在进行leader选举 使用kafka-topics脚本检查leader信息
进而检查broker的存活情况 尝试重启解决。
3、NotLeaderForPartitionException
代码语言:javascript复制org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition
报错内容:broker已经不是对应分区的leader了
原因分析:发生在leader变更时 当leader从一个broker切换到另一个broker时,要分析什么原因引起了leader的切换。
4、TimeoutException
代码语言:javascript复制org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-0: 30040 ms has passe
报错内容:请求超时
原因分析:观察哪里抛出的 观察网络是否能通 如果可以通 可以考虑增加request.timeout.ms的值
5、RecordTooLargeException
代码语言:javascript复制WARN async.DefaultEventHandler: Produce request with correlation id 92548048 failed due to [TopicName,1]: org.apache.kafka.common.errors.RecordTooLargeException
报错内容:消息过大
原因分析:生产者端 消息处理不过来了 可以增加 request.timeout.ms 减少 batch.size
6、Closing socket connection
代码语言:javascript复制Closing socket connection to/127,0,0,1.(kafka.network.Processor)
报错内容:连接关闭
原因分析:如果javaApi producer版本高,想在客户端consumer启动低版本验证,会不停的报错
无法识别客户端消息。
7、ConcurrentModificationException
代码语言:javascript复制java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
报错内容:线程不安全
原因分析:Kafka consumer是非线程安全的
8、NetWorkException
代码语言:javascript复制[kafka-producer-network-thread | producer-1] o.apache.kafka.common.network.Selector : [Producer clientId=producer-1] Connection with / disconnected
报错内容:网络异常
原因分析:网络连接中断 检查broker的网络情况
9、ILLEGAL_GENERATION
代码语言:javascript复制ILLEGAL_GENERATION occurred while committing offsets for group
报错内容:无效的“代”
原因分析:consumer错过了 rebalance 原因是consumer花了大量时间处理数据。
需要适当减少 max.poll.records值 增加 max.poll.interval.ms 或者想办法增加消息处理的速度
10、启动advertised.listeners配置异常
代码语言:javascript复制java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:277)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
解决方法:修改server.properties
代码语言:javascript复制advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以内网、外网ip、127.0.0.1 或域名
解析:
server.properties中有两个listeners。listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致。
11、启动PrintGCDateStamps异常
代码语言:javascript复制[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
解决方法: 更换jdk1.8.x版本或者使用>=kafka1.0.x的版本。
解析:
只有在jdk1.9并且kafka版本在1.0.x之前的版本才会出现。
12、生成者发送message失败或消费者不能消费(kafka1.0.1)
代码语言:javascript复制#(java)org.apache.kafka警告
Connection to node 0 could not be established. Broker may not be available.
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsTimeoutError.js:6:9)
at Timeout.setTimeout [as _onTimeout] (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:737:14)
at ontimeout (timers.js:466:11)
at tryOnTimeout (timers.js:304:5)
at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }
解决方法: 检查advertised.listeners的配置(如果有多个Broker可根据java版本的对应的node号检查配置),判断当前的网络是否可以连接到地址(telnet等)
13、partitions配置的值过小造成错误(kafka1.0.1)
代码语言:javascript复制#(java)org.apache.kafka(执行producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
at com.wenshao.dal.TestProducer.main(TestProducer.java:36)
# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ BrokerNotAvailableError: Could not find the leader
at new BrokerNotAvailableError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsBrokerNotAvailableError.js:11:9)
at refreshMetadata.error (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:831:16)
at D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:514:9
at KafkaClient.wrappedFn (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:379:14)
at KafkaClient.Client.handleReceivedData (D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:770:60)
at Socket.<anonymous> (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:618:10)
at Socket.emit (events.js:159:13)
at addChunk (_stream_readable.js:265:12)
at readableAddChunk (_stream_readable.js:252:11)
at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }
解决方法: 修改num.partitions的值,partitions在是在创建topic的时候默认创建的partitions节点的个数,只对新创建的topic生效,所有尽量在项目规划时候定一个合理的值。也可以通过命令行动态扩容()
代码语言:javascript复制./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 2 --topic foo
14、Kafka-Topic操作
- 添加:新增一个Kafka topic:“mobilePhone”,为它设置一个分区并且设置一个副本,并且创建在本地的zookeeper上,可以为master:2181,slave1:2181,slave3:2181/kafka 也可以直接为localhost:2181/kafka
cd /usr/kafka/bin
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave3:2181/kafka --replication-factor 1 --partitions 1 --topic mobilePhone
如果创建时,报以下错误:Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
解决方法:很可能是之前在server.properties配置文件夹里面和执行命令的zookeeper目录不一致。--zookeeper的值需要带上根目录,否则就会报这样的错误。例如配置文件里面写的连接目录是zookeeper.connect=master:2181,slave1:2181,slave3:2181/kafka,但是在执行命令时少写了kafka目录,写成一下
--zookeeper master:2181,slave1:2181,slave3:2181。就会报上述的错误,因此,务必要保证zookeeper的目录一致。
当Topic成功创建时,会输出Created topic “mobilePhone”,如上图。
注意:replication-factor不能大于broker数。
- 查询:查询某一个Topic的信息,mobilePhone的信息
cd /usr/kafka/bin
./kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone
可以不指定具体的topic名称,即不带--topic参数的执行命令,即可查询所有的topic及其信息。
- 修改:修改某个Topic参数,例如修改mobilePhone为5个分区, alter修改partition数(只能增加)
cd /usr/kafka/bin
./kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave3:2181/kafka --partitions 5 --topic mobilePhone
注意:刚刚在输入命令时报了以下的错误:
Error while executing topic command : Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 [2018-11-29 16:14:02,100] ERROR java.lang.IllegalArgumentException: Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 at kafka.admin.TopicCommand.alterTopic(TopicCommand.scala:124) at kafka.admin.TopicCommand.main(TopicCommand.scala:65) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand
造成这个错误的原因也是 在执行命令时,忘记输入配置zookeeper时的根目录hostname:port/kafak,直接写成了主机名加端口号,从而zookeeper找不到topic的路径。
- 删除:删除某些不需要的Topic
cd /usr/kafka/bin
./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone
在执行删除topic命令时,会提示无法删除,这是因为在server.properties的配置文件中,kafka默认为无法删除即false,因此需要去各个节点的配置文件中修改 delete.topic.enable=true。之后便可正常删除。如下图所示标记
但是,这样删除只是将刚刚的topic标记为删除状态,并没有真正意义上的删除,当重新创建一个同名的topic时,依然会报错,该topic已存在。因此,为了能够彻底的删除topic,我们进入zookeeper的bin目录下,输入./zkCli.sh进入zookeeper的命令行,删除三个目录:1、rmr /kafka/brokers/topics/mobilePhone; 2、rmr /kafka/admin/delete_topics/mobliePhone; 3、rmr /kafka/config/topics/mobilePhone
此时,便可以完全删除该topic,如果重新创建同名topic依然报已存在错误,需要重启kafka服务即可解决。
15、Kafka-Producer操作
在执行生产者和消费者命令之前,我们按照上面的创建方法,创建一个topic为newPhone,并更改它的分区为2。此处我们设置的zookeeper地址都为localhost:2181/kafak,这与上面并无不同,只不过是在当前机器上创建。推荐此种做法,不仅简洁,而且节约空间。
- 创建生产者
cd /usr/kafka/config
./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
broker-list:kafka的服务地址(用多个逗号隔开),端口号默认为9092,如果不想使用该端口号,可以更改config下的server.properties配置文件进行修改,如下图:
--topic newPhone:代表生产者绑定了这个topic,并且会向此主题里面生产数据,正确执行命令之后,可得如下图所示,并可以开始输入数据。
16、kafka-consumer操作
- 创建消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone [--from-beginning]
--bootstrap-server:kafka的服务地址, --topic newPhone:绑定主题,开始从指定topic里面消费(取出)数据,[--from-beginning]:从头开始读数据,并不是从consumer连上之后开始读。
整个运行流程,首先我们在使用producer生产几条数据:
此时,我们在ssh工具上(小厨用的是SecureCRT,蛮好用的嘞),clone一个Session。执行./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone,可以看到如下图一样,如果不加from-beginning参数选项,consumer则读不到Topic里面之前Producer生产的四条数据。
添加from-beginning参数选项之后,consumer便可以从头开始消费Topic里面的数据。如下图所示:
提示:如果在生产者生产数据时,输入message出现以下错误:
代码语言:javascript复制[root@master bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
>hisdhodsa
[2018-11-29 17:28:16,926] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {newPhone=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2018-11-29 17:28:17,080] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {newPhone=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
解决方法:由于9092端口没有开,所以在server.properties配置文件里,将listeners=PLAINTEXT://:9092的注释删除,如下图所示
17、kafka-启动报错
第一种错误
代码语言:javascript复制2017-02-17 17:25:29,224] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.KafkaException: Failed to acquire lock on file .lock in /var/log/kafka-logs. A Kafka instance in another process or thread is using this directory.
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:100)
at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:97)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.log.LogManager.lockLogDirs(LogManager.scala:97)
at kafka.log.LogManager.<init>(LogManager.scala:59)
at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)
at kafka.server.KafkaServer.startup(KafkaServer.scala:183)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:100)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:49)
解决方法:Failed to acquire lock on file .lock in /var/log/kafka-logs.--问题原因是有其他的进程在使用kafka,ps -ef|grep kafka,杀掉使用该目录的进程即可;
第二种错误:对index文件无权限
把文件的权限更改为正确的用户名和用户组即可;
目录/var/log/kafka-logs/,其中__consumer_offsets-29是偏移量;
第三种生产消费报错:jaas连接有问题
代码语言:javascript复制kafka_client_jaas.conf文件配置有问题
16环境上
/opt/dataload/filesource_wangjuan/conf下kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/client/keytabs/client.keytab"
serviceName="kafka"
principal="client/dcp@DCP.COM";
};
18、kafka-生产报错
第一种:生产者向topic发送消息失败:
代码语言:javascript复制
[2017-03-09 09:16:00,982] [ERROR] [startJob_Worker-10] [DCPKafkaProducer.java line:62] produceR向topicdf02211发送信息出现异常
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
原因是配置文件:kafka_client_jaas.conf中配置有问题,keyTab的路径不对,导致的;
第二种:生产消费报错:Failed to construct kafka producer
报错关键信息:Failed to construct kafka producer
解决方法:配置文件问题:KafkaClient中serviceName应该是kafka,之前配置成了zookeeper;重启后,就好了;
配置文件如下:
代码语言:javascript复制
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=kafka
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
serviceName=kafka
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=zookeeper
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/dcp16@DCP.COM";
};
问题描述:
代码语言:javascript复制[kafka@DCP16 bin]$ ./kafka-console-producer --broker-list DCP16:9092 --topic topicin050511 --producer.config ../etc/kafka/producer.properties
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:277)
... 4 more
Caused by: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:305)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 7 more
[kafka@DCP16 bin]$ ./kafka-console-producer --broker-list DCP16:9092 --topic topicin050511 --producer.config ../etc/kafka/producer.properties
消费时报错: ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
[root@DCP16 bin]# ./kafka-console-consumer --zookeeper dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --from-beginning --topic topicout050511 --new-consumer --consumer.config ../etc/kafka/consumer.properties --bootstrap-server DCP16:9092
[2017-05-07 22:24:37,479] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:53)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
... 6 more
Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user
at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)
at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
衍生问题:
kafka生产消息就会报错:
代码语言:javascript复制[2017-05-07 23:17:16,240] ERROR Error when sending message to topic topicin050511 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
把KafkaClient更改为如下的配置,就可以 了:
代码语言:javascript复制KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true;
};
19、kafka-消费报错
第一种错误:
代码语言:javascript复制replication factor: 1 larger than available brokers: 0
消费时报错:
代码语言:javascript复制Error while executing topic command : replication factor: 1 larger than available brokers: 0
解决办法:
代码语言:javascript复制/confluent-3.0.0/bin 下重启daemon
./kafka-server-stop -daemon ../etc/kafka/server.properties
./kafka-server-start -daemon ../etc/kafka/server.properties
然后zk重启;sh zkCli.sh -server ai186;
/usr/hdp/2.4.2.0-258/zookeeper/bin/zkCli.sh --脚本的目录
如果还报错,可以查看配置文件中下面的配置:
zookeeper.connect=dcp18:2181/kafkakerberos;--是group名称
第二种错误:TOPIC_AUTHORIZATION_FAILED
代码语言:javascript复制./bin/kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic wangjuan_topic1 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092
[2017-03-02 13:44:38,398] WARN The configuration zookeeper.connection.timeout.ms = 6000 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2017-03-02 13:44:38,575] WARN Error while fetching metadata with correlation id 1 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-03-02 13:44:38,677] WARN Error while fetching metadata with correlation id 2 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-03-02 13:44:38,780] WARN Error while fetching metadata with correlation id 3 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
解决方法:配置文件中下面的参数中的User的U必须是大写;
super.users=User:kafka
或者有可能是server.properties中的adver.listen的IP是不对的,有可能是代码中写死的IP;
第三种错误的可能的解决方法:
无法消费,则查看kafka的启动日志中的报错信息:日志文件的所属组不对,应该是hadoop;
或者,查看kafka对应的zookeeper的配置后缀,是否已经更改,如果更改了,则topic需要重新生成才行;
第三种错误:消费的tomcat报错:
代码语言:javascript复制[2017-04-01 06:37:21,823] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group
[2017-04-01 06:37:21,825] [WARN] [Thread-5] [ConsumerCoordinator.java line:476] Auto offset commit failed for group test-consumer-group: Commit offsets failed with retriable exception. You should retry committing offsets.
更改代码中,tomcat的心跳超时时间如下:
没有改之前的:;
代码语言:javascript复制./webapps/web/WEB-INF/classes/com/ai/bdx/dcp/hadoop/service/impl/DCPKafkaConsumer.class;
重启后,日志中显示:
代码语言:javascript复制[2017-04-01 10:14:56,167] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group
[2017-04-01 10:14:56,286] [INFO] [Thread-5] [AbstractCoordinator.java line:505] Discovered coordinator DCP187:9092 (id: 2147483647 rack: null) for group test-consumer-group.
20、kafka-创建topic时错误
创建topic时报错:
代码语言:javascript复制[2017-04-10 10:32:23,776] WARN SASL configuration failed: javax.security.auth.login.LoginException: Checksum failed Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
Exception in thread "main" org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure
at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:946)
at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:923)
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
问题定位:是jaas文件有问题:
解决方法:server.properties文件中的super.user要和jaas文件中的keytab的principle一致;
server.properties:super.users=User:client
kafka_server_jaas.conf文件改为:
代码语言:javascript复制KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=kafka
keyTab="/data/data1/confluent-3.0.0/kafka.keytab"
principal="kafka@DCP.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/client/client.keytab"
principal="client/DCP187@DCP.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName=zookeeper
keyTab="/home/client/client.keytab"
principal="client/DCP187@DCP.COM";
};
21、DataCaptain--->报错(kafka)组件原因
报错提示
Not has broker can connection metadataBrokerList
代码语言:javascript复制java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:277)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
解决方法:修改server.properties
代码语言:javascript复制advertised.listeners=PLAINTEXT://{ip}:9092 # ip可以内网、外网ip、127.0.0.1 或域名
解析: server.properties中有两个listeners。
listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。
advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致。
22、报错“TimeoutException(Java)” 或“run out of brokers(Go)” 或 “Authentication failed for user(Python)”
首先,请确保 servers 配置正确,然后通过 ping 以及 telnet 排除网络问题。假设网络运行正常,云上 Kafka 在建立连接时,会对客户端进行鉴权。鉴权方式(sasl_mechanism)有两种:
- ONS: 仅限 Java 语言使用;需要配置自己的 AccessKey,SecretKey。
- PLAIN: 所有语言可用;需要配置 AccessKey,SecretKey 的后 10 位。
如果鉴权失败,云上 Kafka 会掐掉连接。 另外,请仔细参考各个 demo 的 readme 以配置正确。
23、报错“leader is not available”或“leader is in election”
首先,检查 Topic 是否有创建;其次,检查 Topic 类型是否为“Kafka消息”。
24、报错“TOPIC_AUTHORIZATION_FAILED”或“Topic or group not authorized” 的类似字眼
此类报错通常代表权限不对,即您的 AccessKey 没有访问对应 Topic 或者 Consumer ID(又称 group 或 consumer group)的权限。
Topic 和 Consumer ID 的权限规则如下:
- Topic 必须由主账号创建;使用时,Topic 可以由主账号自己使用,也可以由主账号授权给子账号使用。
- Consumer ID 的使用权只属于创建者;主账号创建的 Consumer ID 不能给子账号使用,反之亦然。
注意:请仔细检查 AccessKey、SecretKey 来自哪个账号,避免用错。
25、Java 客户端(包括 Spring 框架)报错“Failed to send SSL close message”
该错误后面通常还会跟“connection reset by peer”或“broken pipe”。该错误主要是因为,服务端是VIP网络环境,会主动掐掉空闲连接。建议在遇到此类错误时,重试发送一次。Java客户端内部有重试机制,可以参考 Producer 最佳实践 进行配置。其它语言客户端,请参考相关文档。你可以通过修改日志级别来避免该错误,以log4j为例,加上下面这行配置:
log4j.logger.org.apache.kafka.common.network.SslTransportLayer=ERROR
26、Spring Cloud Stream 消费信息时报错"arrayindexoutofboundexception"
该错误的产生是因为 Spring Cloud 会按自己的格式解析消息内容。如果您同时使用 Spring Cloud 发送和消费,则不会有问题,这也是推荐的使用方式。
如果您使用其他方式发送,例如,调用 Kafka 原生的 Java 客户端发送,那么用 Spring Cloud 消费时,则需要设置 headerMode 为 raw,即禁用解析消息内容。具体信息参见 Spring Cloud 官网。
27、 报错“No worthy mechs found”
C 客户端或者包装C 的客户端会报这个错。这个错说明缺少一个系统库:cyrus-sasl-plain。针对yum管理的系统的安装方法为:yum install cyrus-sasl{,-plain}。
28、什么是 CID, Consumer ID, Consumer Group 和 Group ID
这几个名称指代的都是同一个概念:Kafka 的消费组 (Consumer Group)。CID 是 Consumer ID 的缩写,也等同于 Group ID (Consumer Group ID 的简称,指代一个特定的消费组)。每个消费组可以包含多个消费实例,一起负载均衡消费订阅的 topic,所以 CID 与 topic 的关系可以总结为:每个 CID 可以订阅多个 topic,每个 topic 也可以被多个 CID 订阅;各个 CID 之间相对独立,互不影响。
假设 CID 1 订阅了 Topic 1,则 Topic 1 的每条消息会发给 CID 1 的某个实例,且只会发给其中一个实例。如果 CID 2 也订阅了 Topic 1,Topic 1 的每条消息也会发给 CID 2 里的某个实例,且只发给其中一个实例。
[backcolor=transparent]注意:控制台里看到的 Producer ID 是 Aliware MQ 里的概念,Kafka 不会用到, 后续会改进优化。
29、如何查看消费进度
如需查看某个特定订阅消费者的消费进度,请按照如下步骤操作:
- 在ONS控制台左侧点击[backcolor=transparent]发布订阅管理-订阅管理。
- 在搜索框中输入topic 或者 Cosumer ID,点击[backcolor=transparent]搜索,查找你想查看消费进度的 Consumer ID。
- 找到该 Consumer ID后,点击操作列中的[backcolor=transparent]消费者状态,在跳出的页面中可查看[backcolor=transparent]堆积总量。
堆积总量 = 所有的消息数 - 已经消费的消息数 [backcolor=transparent]注意:目前消费者状态都会显示不在线,未来会进行优化。除了堆积总量,其它信息仅供参考。
30、消息堆积了怎么办
消息堆积,一般都是消费速度过慢或者消费线程阻塞造成的。建议打印出消费线程的堆栈情况查看线程执行情况。
注意:Java 进程可以用 jstack。
31、java.lang.OutOfMemoryError:Map failed
发生上述问题,原因是发生OOM啦,会导致kafka进程直接崩溃掉!因此只能重新启动broker节点了,但是为了让broker节点启动成功快一点的话,可以将一个参数的之调大:“num.recovery.threads.per.data.dir=30”,没错就是他,将他的值越调大越好。这个线程数主要是负责停止和启动broker的。因为是32core的服务器,给他分配了30个,可以尽量的把这个参数调大,便于该broker节点更快的加入到ISR列表当中。
首先,根据上面的提示恢复服务是第一件要做的事情,接下来,得分析分析为什么会出这个事情,给kafka集群分配了20G内存,如下图:
查看了近2个星期的监控图,发现可用内存在持续减少,初步怀疑可能发生了内存泄漏。
这也只是怀疑,因为出错之前没有监控JVM的情况,吃一堑,长一智,赶紧用zabbix将kafka的jvm监控起来。
之后,调整了下面的参数,先观察一段时间。
sysctl vm.max_map_count=262144 #进程中内存影视区域的最大数量。在调用malloc,直接调用mmap和mprotect和加载共享库时产生内存映射区域。虽然大多数程序需要的内存映射区域不超过1000个,但是特定的程序,特别是malloc调试器,可能需要很多,例如每次分破都会产生一道两个内存映射区域,默认值是65536。
解决方案:
第一:kafka的heap内存分配不要大于6G,我们知道kafka并不吃堆内存,如果设置默认的1G的话也并不太合理。推荐设置配置为6G即可。服务器是32G内存,然后给kafka就分配了22G的heap内存。经过参考《kafka权威指南》和《Apache kafka实战》两位大佬的笔记,他们推荐设置kafka的heap大小为5G或者6G。最后我将kafka的heap内存配置成了6G。
第二:调整“vm.max_map_count”参数,没错,就是上面提到的那个参数。在实际生产环境中,如果单台broker上topic数过多,用户可能碰到“java.lang.OutOfMemoryError:Map failed”的严重错误以导致kafka节点崩溃。这是因为大量创建topic将极大的消耗操作系统内存,用于内存映射操作。在这种情况下,用户需要调整“vm.max_map_count”参数。具体方法可以使用命令:“sysctl vm.max_map_count=N”来设置。该参数默认值是65535,可以考虑线上环境设置更大的值,比如262144甚至更大。