30个Kafka常见错误小集合

2021-09-22 11:37:51 浏览数 (1)

本文是一个Kafka使用过程中的常见错误的总结。希望对你有帮助。

1、UnknownTopicOrPartitionException
代码语言:javascript复制
org.apache.kafka.common.errors.UnknownTopicOrPartitionException:
This server does not host this topic-partition

报错内容:分区数据不在

原因分析:producer向不存在的topic发送消息,用户可以检查topic是否存在 或者设置auto.create.topics.enable参数

2、LEADER_NOT_AVAILABLE
代码语言:javascript复制
WARN Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClien

报错内容:leader不可用

原因分析:原因很多 topic正在被删除 正在进行leader选举 使用kafka-topics脚本检查leader信息

进而检查broker的存活情况 尝试重启解决。

3、NotLeaderForPartitionException
代码语言:javascript复制
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition

报错内容:broker已经不是对应分区的leader了

原因分析:发生在leader变更时 当leader从一个broker切换到另一个broker时,要分析什么原因引起了leader的切换。

4、TimeoutException
代码语言:javascript复制
org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for test-0: 30040 ms has passe

报错内容:请求超时

原因分析:观察哪里抛出的 观察网络是否能通 如果可以通 可以考虑增加request.timeout.ms的值

5、RecordTooLargeException
代码语言:javascript复制
WARN async.DefaultEventHandler: Produce request with correlation id 92548048 failed due to [TopicName,1]: org.apache.kafka.common.errors.RecordTooLargeException

报错内容:消息过大

原因分析:生产者端 消息处理不过来了 可以增加 request.timeout.ms 减少 batch.size

6、Closing socket connection
代码语言:javascript复制
Closing socket connection to/127,0,0,1.(kafka.network.Processor)

报错内容:连接关闭

原因分析:如果javaApi producer版本高,想在客户端consumer启动低版本验证,会不停的报错

无法识别客户端消息。

7、ConcurrentModificationException
代码语言:javascript复制
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

报错内容:线程不安全

原因分析:Kafka consumer是非线程安全的

8、NetWorkException
代码语言:javascript复制
[kafka-producer-network-thread | producer-1] o.apache.kafka.common.network.Selector : [Producer clientId=producer-1] Connection with / disconnected

报错内容:网络异常

原因分析:网络连接中断 检查broker的网络情况

9、ILLEGAL_GENERATION
代码语言:javascript复制
ILLEGAL_GENERATION occurred while committing offsets for group

报错内容:无效的“代”

原因分析:consumer错过了 rebalance 原因是consumer花了大量时间处理数据。

需要适当减少 max.poll.records值 增加 max.poll.interval.ms 或者想办法增加消息处理的速度

10、启动advertised.listeners配置异常
代码语言:javascript复制
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:277)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

解决方法:修改server.properties

代码语言:javascript复制
advertised.listeners=PLAINTEXT://{ip}:9092  # ip可以内网、外网ip、127.0.0.1 或域名

解析:

server.properties中有两个listeners。listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致。

11、启动PrintGCDateStamps异常
代码语言:javascript复制
[0.004s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/data/service/kafka_2.11-0.11.0.2/bin/../logs/kafkaServer-gc.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

解决方法: 更换jdk1.8.x版本或者使用>=kafka1.0.x的版本。

解析:

只有在jdk1.9并且kafka版本在1.0.x之前的版本才会出现。

12、生成者发送message失败或消费者不能消费(kafka1.0.1)
代码语言:javascript复制
#(java)org.apache.kafka警告
Connection to node 0 could not be established. Broker may not be available.


# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ TimeoutError: Request timed out after 30000ms
    at new TimeoutError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsTimeoutError.js:6:9)
    at Timeout.setTimeout [as _onTimeout] (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:737:14)
    at ontimeout (timers.js:466:11)
    at tryOnTimeout (timers.js:304:5)
    at Timer.listOnTimeout (timers.js:264:5) message: 'Request timed out after 30000ms' }

解决方法: 检查advertised.listeners的配置(如果有多个Broker可根据java版本的对应的node号检查配置),判断当前的网络是否可以连接到地址(telnet等)

13、partitions配置的值过小造成错误(kafka1.0.1)
代码语言:javascript复制
#(java)org.apache.kafka(执行producer.send)
Exception in thread "main" org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
    at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:908)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:778)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at com.wenshao.dal.TestProducer.main(TestProducer.java:36)


# (nodejs) kafka-node异常 (执行producer.send后的异常)
{ BrokerNotAvailableError: Could not find the leader
    at new BrokerNotAvailableError (D:projectnodekafka-testsrcnode_moduleskafka-nodeliberrorsBrokerNotAvailableError.js:11:9)
    at refreshMetadata.error (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:831:16)
    at D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:514:9
    at KafkaClient.wrappedFn (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:379:14)
    at KafkaClient.Client.handleReceivedData (D:projectnodekafka-testsrcnode_moduleskafka-nodelibclient.js:770:60)
    at Socket.<anonymous> (D:projectnodekafka-testsrcnode_moduleskafka-nodelibkafkaClient.js:618:10)
    at Socket.emit (events.js:159:13)
    at addChunk (_stream_readable.js:265:12)
    at readableAddChunk (_stream_readable.js:252:11)
    at Socket.Readable.push (_stream_readable.js:209:10) message: 'Could not find the leader' }

解决方法: 修改num.partitions的值,partitions在是在创建topic的时候默认创建的partitions节点的个数,只对新创建的topic生效,所有尽量在项目规划时候定一个合理的值。也可以通过命令行动态扩容()

代码语言:javascript复制
./bin/kafka-topics.sh --zookeeper  localhost:2181 --alter --partitions 2 --topic  foo
14、Kafka-Topic操作
  • 添加:新增一个Kafka topic:“mobilePhone”,为它设置一个分区并且设置一个副本,并且创建在本地的zookeeper上,可以为master:2181,slave1:2181,slave3:2181/kafka 也可以直接为localhost:2181/kafka
代码语言:javascript复制
cd /usr/kafka/bin
./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave3:2181/kafka --replication-factor 1 --partitions 1 --topic mobilePhone

如果创建时,报以下错误:Error while executing topic command : Replication factor: 1 larger than available brokers: 0.

解决方法:很可能是之前在server.properties配置文件夹里面和执行命令的zookeeper目录不一致。--zookeeper的值需要带上根目录,否则就会报这样的错误。例如配置文件里面写的连接目录是zookeeper.connect=master:2181,slave1:2181,slave3:2181/kafka,但是在执行命令时少写了kafka目录,写成一下

--zookeeper master:2181,slave1:2181,slave3:2181。就会报上述的错误,因此,务必要保证zookeeper的目录一致。

当Topic成功创建时,会输出Created topic “mobilePhone”,如上图。

注意:replication-factor不能大于broker数。

  • 查询:查询某一个Topic的信息,mobilePhone的信息
代码语言:javascript复制
cd /usr/kafka/bin
./kafka-topics.sh --describe --zookeeper master:2181,slave1:2181,slave3:2181/kafka --topic mobilePhone

可以不指定具体的topic名称,即不带--topic参数的执行命令,即可查询所有的topic及其信息。

  • 修改:修改某个Topic参数,例如修改mobilePhone为5个分区, alter修改partition数(只能增加)
代码语言:javascript复制
cd /usr/kafka/bin
./kafka-topics.sh --alter --zookeeper master:2181,slave1:2181,slave3:2181/kafka  --partitions 5 --topic mobilePhone
注意:刚刚在输入命令时报了以下的错误:

Error while executing topic command : Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 [2018-11-29 16:14:02,100] ERROR java.lang.IllegalArgumentException: Topic mobilePhone does not exist on ZK path master:2181,slave1:2181,slave3:2181 at kafka.admin.TopicCommand.alterTopic(TopicCommand.scala:124) at kafka.admin.TopicCommand.main(TopicCommand.scala:65) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand

造成这个错误的原因也是 在执行命令时,忘记输入配置zookeeper时的根目录hostname:port/kafak,直接写成了主机名加端口号,从而zookeeper找不到topic的路径。

  • 删除:删除某些不需要的Topic
代码语言:javascript复制
cd /usr/kafka/bin
./kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave3:2181/kafka  --topic mobilePhone

在执行删除topic命令时,会提示无法删除,这是因为在server.properties的配置文件中,kafka默认为无法删除即false,因此需要去各个节点的配置文件中修改 delete.topic.enable=true。之后便可正常删除。如下图所示标记

但是,这样删除只是将刚刚的topic标记为删除状态,并没有真正意义上的删除,当重新创建一个同名的topic时,依然会报错,该topic已存在。因此,为了能够彻底的删除topic,我们进入zookeeper的bin目录下,输入./zkCli.sh进入zookeeper的命令行,删除三个目录:1、rmr /kafka/brokers/topics/mobilePhone; 2、rmr /kafka/admin/delete_topics/mobliePhone; 3、rmr /kafka/config/topics/mobilePhone

此时,便可以完全删除该topic,如果重新创建同名topic依然报已存在错误,需要重启kafka服务即可解决。

15、Kafka-Producer操作

在执行生产者和消费者命令之前,我们按照上面的创建方法,创建一个topic为newPhone,并更改它的分区为2。此处我们设置的zookeeper地址都为localhost:2181/kafak,这与上面并无不同,只不过是在当前机器上创建。推荐此种做法,不仅简洁,而且节约空间。

  • 创建生产者
代码语言:javascript复制
cd /usr/kafka/config
./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone

broker-list:kafka的服务地址(用多个逗号隔开),端口号默认为9092,如果不想使用该端口号,可以更改config下的server.properties配置文件进行修改,如下图:

--topic newPhone:代表生产者绑定了这个topic,并且会向此主题里面生产数据,正确执行命令之后,可得如下图所示,并可以开始输入数据。

16、kafka-consumer操作
  • 创建消费者
代码语言:javascript复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone [--from-beginning]

--bootstrap-server:kafka的服务地址, --topic newPhone:绑定主题,开始从指定topic里面消费(取出)数据,[--from-beginning]:从头开始读数据,并不是从consumer连上之后开始读。

整个运行流程,首先我们在使用producer生产几条数据:

此时,我们在ssh工具上(小厨用的是SecureCRT,蛮好用的嘞),clone一个Session。执行./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic newPhone,可以看到如下图一样,如果不加from-beginning参数选项,consumer则读不到Topic里面之前Producer生产的四条数据。

添加from-beginning参数选项之后,consumer便可以从头开始消费Topic里面的数据。如下图所示:

提示:如果在生产者生产数据时,输入message出现以下错误:

代码语言:javascript复制
[root@master bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic newPhone
>hisdhodsa        
[2018-11-29 17:28:16,926] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {newPhone=LEADER_NOT_AVAILABLE}          
(org.apache.kafka.clients.NetworkClient)         
[2018-11-29 17:28:17,080] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {newPhone=LEADER_NOT_AVAILABLE}           
(org.apache.kafka.clients.NetworkClient)

解决方法:由于9092端口没有开,所以在server.properties配置文件里,将listeners=PLAINTEXT://:9092的注释删除,如下图所示

17、kafka-启动报错

第一种错误

代码语言:javascript复制
2017-02-17 17:25:29,224] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kafka.common.KafkaException: Failed to acquire lock on file .lock in /var/log/kafka-logs. A Kafka instance in another process or thread is using this directory.

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:100)

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:97)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

    at scala.collection.AbstractTraversable.map(Traversable.scala:104)

    at kafka.log.LogManager.lockLogDirs(LogManager.scala:97)

    at kafka.log.LogManager.<init>(LogManager.scala:59)

    at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)

    at kafka.server.KafkaServer.startup(KafkaServer.scala:183)

    at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:100)

    at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:49)

解决方法:Failed to acquire lock on file .lock in /var/log/kafka-logs.--问题原因是有其他的进程在使用kafka,ps -ef|grep kafka,杀掉使用该目录的进程即可;

第二种错误:对index文件无权限

把文件的权限更改为正确的用户名和用户组即可;

目录/var/log/kafka-logs/,其中__consumer_offsets-29是偏移量;

第三种生产消费报错:jaas连接有问题

代码语言:javascript复制
kafka_client_jaas.conf文件配置有问题

16环境上

/opt/dataload/filesource_wangjuan/conf下kafka_client_jaas.conf

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/keytabs/client.keytab"

        serviceName="kafka"

    principal="client/dcp@DCP.COM";

};
18、kafka-生产报错

第一种:生产者向topic发送消息失败:

代码语言:javascript复制

[2017-03-09 09:16:00,982] [ERROR] [startJob_Worker-10] [DCPKafkaProducer.java line:62] produceR向topicdf02211发送信息出现异常

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

原因是配置文件:kafka_client_jaas.conf中配置有问题,keyTab的路径不对,导致的;

第二种:生产消费报错:Failed to construct kafka producer

报错关键信息:Failed to construct kafka producer

解决方法:配置文件问题:KafkaClient中serviceName应该是kafka,之前配置成了zookeeper;重启后,就好了;

配置文件如下:

代码语言:javascript复制

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

Client {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

问题描述:

代码语言:javascript复制
[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)

    at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)

    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)

    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:277)

    ... 4 more

Caused by: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:305)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

    ... 7 more

[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties



消费时报错: ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)



[root@DCP16 bin]# ./kafka-console-consumer --zookeeper dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --from-beginning --topic topicout050511 --new-consumer --consumer.config ../etc/kafka/consumer.properties --bootstrap-server DCP16:9092

[2017-05-07 22:24:37,479] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)

    at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:53)

    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)

    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)

    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)

    ... 6 more

Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)

    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)

    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)

    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)

    at javax.security.auth.login.LoginContext.login(LoginContext.java:595)

    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

衍生问题:

kafka生产消息就会报错:

代码语言:javascript复制
[2017-05-07 23:17:16,240] ERROR Error when sending message to topic topicin050511 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

把KafkaClient更改为如下的配置,就可以 了:

代码语言:javascript复制
KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

   useTicketCache=true;

};
19、kafka-消费报错

第一种错误:

代码语言:javascript复制
replication factor: 1 larger than available brokers: 0

消费时报错:

代码语言:javascript复制
Error while executing topic command : replication factor: 1 larger than available brokers: 0

解决办法:

代码语言:javascript复制
/confluent-3.0.0/bin  下重启daemon

./kafka-server-stop  -daemon   ../etc/kafka/server.properties

./kafka-server-start  -daemon   ../etc/kafka/server.properties

然后zk重启;sh zkCli.sh -server ai186;

/usr/hdp/2.4.2.0-258/zookeeper/bin/zkCli.sh --脚本的目录

如果还报错,可以查看配置文件中下面的配置:

zookeeper.connect=dcp18:2181/kafkakerberos;--是group名称

第二种错误:TOPIC_AUTHORIZATION_FAILED

代码语言:javascript复制
./bin/kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic wangjuan_topic1 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092

[2017-03-02 13:44:38,398] WARN The configuration zookeeper.connection.timeout.ms = 6000 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)

[2017-03-02 13:44:38,575] WARN Error while fetching metadata with correlation id 1 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,677] WARN Error while fetching metadata with correlation id 2 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,780] WARN Error while fetching metadata with correlation id 3 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

解决方法:配置文件中下面的参数中的User的U必须是大写;

super.users=User:kafka

或者有可能是server.properties中的adver.listen的IP是不对的,有可能是代码中写死的IP;

第三种错误的可能的解决方法:

无法消费,则查看kafka的启动日志中的报错信息:日志文件的所属组不对,应该是hadoop;

或者,查看kafka对应的zookeeper的配置后缀,是否已经更改,如果更改了,则topic需要重新生成才行;

第三种错误:消费的tomcat报错:

代码语言:javascript复制
[2017-04-01 06:37:21,823] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 06:37:21,825] [WARN] [Thread-5] [ConsumerCoordinator.java line:476] Auto offset commit failed for group test-consumer-group: Commit offsets failed with retriable exception. You should retry committing offsets.

更改代码中,tomcat的心跳超时时间如下:

没有改之前的:;

代码语言:javascript复制
./webapps/web/WEB-INF/classes/com/ai/bdx/dcp/hadoop/service/impl/DCPKafkaConsumer.class;

重启后,日志中显示:

代码语言:javascript复制
[2017-04-01 10:14:56,167] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 10:14:56,286] [INFO] [Thread-5] [AbstractCoordinator.java line:505] Discovered coordinator DCP187:9092 (id: 2147483647 rack: null) for group test-consumer-group.
20、kafka-创建topic时错误

创建topic时报错:

代码语言:javascript复制
[2017-04-10 10:32:23,776] WARN SASL configuration failed: javax.security.auth.login.LoginException: Checksum failed Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

Exception in thread "main" org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure

    at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:946)

    at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:923)

    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)

    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)

    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)

    at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)

    at kafka.admin.TopicCommand.main(TopicCommand.scala)

问题定位:是jaas文件有问题:

解决方法:server.properties文件中的super.user要和jaas文件中的keytab的principle一致;

server.properties:super.users=User:client

kafka_server_jaas.conf文件改为:

代码语言:javascript复制
KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/data/data1/confluent-3.0.0/kafka.keytab"

    principal="kafka@DCP.COM";

};



KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};



Client {
    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};
21、DataCaptain--->报错(kafka)组件原因

报错提示

Not has broker can connection metadataBrokerList

代码语言:javascript复制
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
    at scala.Predef$.require(Predef.scala:277)
    at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1203)
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1170)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
    at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
    at kafka.Kafka$.main(Kafka.scala:82)
    at kafka.Kafka.main(Kafka.scala)

解决方法:修改server.properties

代码语言:javascript复制
advertised.listeners=PLAINTEXT://{ip}:9092  # ip可以内网、外网ip、127.0.0.1 或域名

解析: server.properties中有两个listeners。

listeners:启动kafka服务监听的ip和端口,可以监听内网ip和0.0.0.0(不能为外网ip),默认为java.net.InetAddress.getCanonicalHostName()获取的ip。

advertised.listeners:生产者和消费者连接的地址,kafka会把该地址注册到zookeeper中,所以只能为除0.0.0.0之外的合法ip或域名 ,默认和listeners的配置一致。

22、报错“TimeoutException(Java)” 或“run out of brokers(Go)” 或 “Authentication failed for user(Python)”

首先,请确保 servers 配置正确,然后通过 ping 以及 telnet 排除网络问题。假设网络运行正常,云上 Kafka 在建立连接时,会对客户端进行鉴权。鉴权方式(sasl_mechanism)有两种:

  • ONS: 仅限 Java 语言使用;需要配置自己的 AccessKey,SecretKey。
  • PLAIN: 所有语言可用;需要配置 AccessKey,SecretKey 的后 10 位。

如果鉴权失败,云上 Kafka 会掐掉连接。 另外,请仔细参考各个 demo 的 readme 以配置正确。

23、报错“leader is not available”或“leader is in election”

首先,检查 Topic 是否有创建;其次,检查 Topic 类型是否为“Kafka消息”。

24、报错“TOPIC_AUTHORIZATION_FAILED”或“Topic or group not authorized” 的类似字眼

此类报错通常代表权限不对,即您的 AccessKey 没有访问对应 Topic 或者 Consumer ID(又称 group 或 consumer group)的权限。

Topic 和 Consumer ID 的权限规则如下:

  • Topic 必须由主账号创建;使用时,Topic 可以由主账号自己使用,也可以由主账号授权给子账号使用。
  • Consumer ID 的使用权只属于创建者;主账号创建的 Consumer ID 不能给子账号使用,反之亦然。

注意:请仔细检查 AccessKey、SecretKey 来自哪个账号,避免用错。

25、Java 客户端(包括 Spring 框架)报错“Failed to send SSL close message”

该错误后面通常还会跟“connection reset by peer”或“broken pipe”。该错误主要是因为,服务端是VIP网络环境,会主动掐掉空闲连接。建议在遇到此类错误时,重试发送一次。Java客户端内部有重试机制,可以参考 Producer 最佳实践 进行配置。其它语言客户端,请参考相关文档。你可以通过修改日志级别来避免该错误,以log4j为例,加上下面这行配置:

log4j.logger.org.apache.kafka.common.network.SslTransportLayer=ERROR

26、Spring Cloud Stream 消费信息时报错"arrayindexoutofboundexception"

该错误的产生是因为 Spring Cloud 会按自己的格式解析消息内容。如果您同时使用 Spring Cloud 发送和消费,则不会有问题,这也是推荐的使用方式。

如果您使用其他方式发送,例如,调用 Kafka 原生的 Java 客户端发送,那么用 Spring Cloud 消费时,则需要设置 headerMode 为 raw,即禁用解析消息内容。具体信息参见 Spring Cloud 官网。

27、 报错“No worthy mechs found”

C 客户端或者包装C 的客户端会报这个错。这个错说明缺少一个系统库:cyrus-sasl-plain。针对yum管理的系统的安装方法为:yum install cyrus-sasl{,-plain}。

28、什么是 CID, Consumer ID, Consumer Group 和 Group ID

这几个名称指代的都是同一个概念:Kafka 的消费组 (Consumer Group)。CID 是 Consumer ID 的缩写,也等同于 Group ID (Consumer Group ID 的简称,指代一个特定的消费组)。每个消费组可以包含多个消费实例,一起负载均衡消费订阅的 topic,所以 CID 与 topic 的关系可以总结为:每个 CID 可以订阅多个 topic,每个 topic 也可以被多个 CID 订阅;各个 CID 之间相对独立,互不影响。

假设 CID 1 订阅了 Topic 1,则 Topic 1 的每条消息会发给 CID 1 的某个实例,且只会发给其中一个实例。如果 CID 2 也订阅了 Topic 1,Topic 1 的每条消息也会发给 CID 2 里的某个实例,且只发给其中一个实例。

[backcolor=transparent]注意:控制台里看到的 Producer ID 是 Aliware MQ 里的概念,Kafka 不会用到, 后续会改进优化。

29、如何查看消费进度

如需查看某个特定订阅消费者的消费进度,请按照如下步骤操作:

  • 在ONS控制台左侧点击[backcolor=transparent]发布订阅管理-订阅管理。
  • 在搜索框中输入topic 或者 Cosumer ID,点击[backcolor=transparent]搜索,查找你想查看消费进度的 Consumer ID。
  • 找到该 Consumer ID后,点击操作列中的[backcolor=transparent]消费者状态,在跳出的页面中可查看[backcolor=transparent]堆积总量。

堆积总量 = 所有的消息数 - 已经消费的消息数 [backcolor=transparent]注意:目前消费者状态都会显示不在线,未来会进行优化。除了堆积总量,其它信息仅供参考。

30、消息堆积了怎么办

消息堆积,一般都是消费速度过慢或者消费线程阻塞造成的。建议打印出消费线程的堆栈情况查看线程执行情况。

注意:Java 进程可以用 jstack。

31、java.lang.OutOfMemoryError:Map failed

发生上述问题,原因是发生OOM啦,会导致kafka进程直接崩溃掉!因此只能重新启动broker节点了,但是为了让broker节点启动成功快一点的话,可以将一个参数的之调大:“num.recovery.threads.per.data.dir=30”,没错就是他,将他的值越调大越好。这个线程数主要是负责停止和启动broker的。因为是32core的服务器,给他分配了30个,可以尽量的把这个参数调大,便于该broker节点更快的加入到ISR列表当中。

首先,根据上面的提示恢复服务是第一件要做的事情,接下来,得分析分析为什么会出这个事情,给kafka集群分配了20G内存,如下图:

查看了近2个星期的监控图,发现可用内存在持续减少,初步怀疑可能发生了内存泄漏。

这也只是怀疑,因为出错之前没有监控JVM的情况,吃一堑,长一智,赶紧用zabbix将kafka的jvm监控起来。

之后,调整了下面的参数,先观察一段时间。

sysctl vm.max_map_count=262144 #进程中内存影视区域的最大数量。在调用malloc,直接调用mmap和mprotect和加载共享库时产生内存映射区域。虽然大多数程序需要的内存映射区域不超过1000个,但是特定的程序,特别是malloc调试器,可能需要很多,例如每次分破都会产生一道两个内存映射区域,默认值是65536。

解决方案:

第一:kafka的heap内存分配不要大于6G,我们知道kafka并不吃堆内存,如果设置默认的1G的话也并不太合理。推荐设置配置为6G即可。服务器是32G内存,然后给kafka就分配了22G的heap内存。经过参考《kafka权威指南》和《Apache kafka实战》两位大佬的笔记,他们推荐设置kafka的heap大小为5G或者6G。最后我将kafka的heap内存配置成了6G。

第二:调整“vm.max_map_count”参数,没错,就是上面提到的那个参数。在实际生产环境中,如果单台broker上topic数过多,用户可能碰到“java.lang.OutOfMemoryError:Map failed”的严重错误以导致kafka节点崩溃。这是因为大量创建topic将极大的消耗操作系统内存,用于内存映射操作。在这种情况下,用户需要调整“vm.max_map_count”参数。具体方法可以使用命令:“sysctl vm.max_map_count=N”来设置。该参数默认值是65535,可以考虑线上环境设置更大的值,比如262144甚至更大。

0 人点赞