33.3 Apache Kafka支持
通过提供 spring-kafka 项目的自动配置来支持Apache Kafka。
Kafka配置由 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在启动时创建主题,请添加bean类型 NewTopic 。如果主题已存在,则忽略bean。
有关 KafkaProperties 更多支持选项,请参阅
33.3.1发送消息
Spring的 KafkaTemplate 是自动配置的,您可以直接在自己的beans中自动装配它,如下例所示:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
如果定义了属性 spring.kafka.producer.transaction-id-prefix ,则会自动配置 KafkaTransactionManager 。此外,如果
定义了 RecordMessageConverter bean,它将自动与自动配置的 KafkaTemplate 相关联。
33.3.2接收消息
当存在Apache Kafka基础结构时,可以使用 @KafkaListener 注释任何bean以创建侦听器端点。如果未定
义 KafkaListenerContainerFactory ,则会使用 spring.kafka.listener.* 中定义的键自动配置默认值。
以下组件在 someTopic 主题上创建一个侦听器端点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
如果定义了 KafkaTransactionManager bean,它将自动关联到容器工厂。同样,如果定义了 RecordMessageConverter , ErrorHandler
或 AfterRollbackProcessor bean,它将自动与默认工厂相关联。
自定义 ChainedKafkaTransactionManager 必须标记为 @Primary ,因为它通常会引用自动配置
的 KafkaTransactionManager bean。
33.3.3卡夫卡流
Apache Kafka的Spring提供了一个工厂bean来创建一个 StreamsBuilder 对象并管理其流的生命周期。Spring Boot只要 kafka-streams 在
类路径上,并且通过 @EnableKafkaStreams 注释启用Kafka Streams,就会自动配置所需的 KafkaStreamsConfiguration bean。
启用Kafka Streams意味着必须设置应用程序ID和引导程序服务器。可以使用 spring.kafka.streams.application-id 配置前者,如果未设
置,则默认为 spring.application.name 。后者可以全局设置或专门为流而重写。
使用专用属性可以使用其他几个属性; 可以使用 spring.kafka.streams.properties 命名空间设置其他任意Kafka属性。有关更多信息,另请
参见第33.3.4节“其他Kafka属性”。
要使用工厂bean,只需将 StreamsBuilder 连接到 @Bean ,如下例所示:
@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
}
默认情况下,由其创建的 StreamBuilder 对象管理的流将自动启动。您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。
33.3.4附加Kafka属性
自动配置支持的属性显示在 附录A,常见应用程序属性中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache
Kafka点状属性。有关详细信息,请参阅Apache Kafka文档。
这些属性中的前几个适用于所有组件(生产者,使用者,管理员和流),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka
指定重要性为HIGH,MEDIUM或LOW的属性。Spring Boot auto-configuration支持所有HIGH重要性属性,一些选定的MEDIUM和LOW属
性,以及任何没有默认值的属性。
只有Kafka支持的属性的一部分可以通过 KafkaProperties 类直接获得。如果您希望使用不直接支持的其他属性配置生产者或使用者,请使用以
下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
这将常见的 prop.one Kafka属性设置为 first (适用于生产者,消费者和管理员),将 prop.two admin属性设置为 second ,
将 prop.three 使用者属性设置为 third , prop.four 生产者属性为 fourth , prop.five 流属性为 fifth 。
您还可以按如下方式配置Spring Kafka JsonDeserializer :
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
同样,您可以禁用在标头中发送类型信息的 JsonSerializer 默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
重要
以这种方式设置的属性会覆盖Spring Boot明确支持的任何配置项。
34.使用 RestTemplate 调用REST服务
如果需要从应用程序调用远程REST服务,可以使用Spring Framework的 RestTemplate 类。由于 RestTemplate 实例在使用之前通常需要进行
自定义,因此Spring Boot不提供任何单个自动配置 RestTemplate bean。但是,它会自动配置 RestTemplateBuilder ,可在需要时用于创
建 RestTemplate 实例。自动配置的 RestTemplateBuilder 确保将合理的 HttpMessageConverters 应用于 RestTemplate 实例。
以下代码显示了一个典型示例:
@Service
public class MyService {
private final RestTemplate restTemplate;
public MyService(RestTemplateBuilder restTemplateBuilder) {
this.restTemplate = restTemplateBuilder.build();
}
public Details someRestCall(String name) {
return this.restTemplate.getForObject("/{name}/details", Details.class, name);
}
}
RestTemplateBuilder 包含许多可用于快速配置 RestTemplate 的有用方法。例如,要添加BASIC auth支持,可以使
用 builder.basicAuthentication("user", "password").build() 。
34.1 RestTemplate自定义
RestTemplate 自定义有三种主要方法,具体取决于您希望自定义应用的广泛程度。
要使任何自定义的范围尽可能窄,请注入自动配置的 RestTemplateBuilder ,然后根据需要调用其方法。每个方法调用都返回一个新
的 RestTemplateBuilder 实例,因此自定义只会影响构建器的这种使用。
要进行应用程序范围的附加自定义,请使用 RestTemplateCustomizer bean。所有这些beans都会自动注册到自动配置
的 RestTemplateBuilder ,并应用于使用它构建的任何模板。
以下示例显示了一个自定义程序,它为除 192.168.0.5 之外的所有主机配置代理的使用:
static class ProxyCustomizer implements RestTemplateCustomizer {
@Override
public void customize(RestTemplate restTemplate) {
HttpHost proxy = new HttpHost("proxy.example.com");
HttpClient httpClient = HttpClientBuilder.create()
setRoutePlanner(new DefaultProxyRoutePlanner(proxy) {
@Override
public HttpHost determineProxy(HttpHost target,
HttpRequest request, HttpContext context)
throws HttpException {
if (target.getHostName().equals("192.168.0.5")) {
return null;
}
return super.determineProxy(target, request, context);
}
}).build();
restTemplate.setRequestFactory(
new HttpComponentsClientHttpRequestFactory(httpClient));
}
}
最后,最极端(也很少使用)的选项是创建自己的 RestTemplateBuilder bean。这样做会关闭 RestTemplateBuilder 的自动配置,并阻止
使用任何 RestTemplateCustomizer beans。