微服务架构之Spring Boot(五十七)

2022-05-23 15:38:25 浏览数 (1)

33.3 Apache Kafka支持

通过提供 spring-kafka 项目的自动配置来支持Apache Kafka。

Kafka配置由 spring.kafka.* 中的外部配置属性控制。例如,您可以在 application.properties 中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.consumer.group-id=myGroup

要在启动时创建主题,请添加bean类型 NewTopic 。如果主题已存在,则忽略bean。

有关 KafkaProperties 更多支持选项,请参阅

33.3.1发送消息

Spring的 KafkaTemplate 是自动配置的,您可以直接在自己的beans中自动装配它,如下例所示:

@Component

public class MyBean {

private final KafkaTemplate kafkaTemplate;

@Autowired

public MyBean(KafkaTemplate kafkaTemplate) {

this.kafkaTemplate = kafkaTemplate;

}

// ...

}

如果定义了属性 spring.kafka.producer.transaction-id-prefix ,则会自动配置 KafkaTransactionManager 。此外,如果

定义了 RecordMessageConverter bean,它将自动与自动配置的 KafkaTemplate 相关联。

33.3.2接收消息

当存在Apache Kafka基础结构时,可以使用 @KafkaListener 注释任何bean以创建侦听器端点。如果未定

义 KafkaListenerContainerFactory ,则会使用 spring.kafka.listener.* 中定义的键自动配置默认值。

以下组件在 someTopic 主题上创建一个侦听器端点:

@Component

public class MyBean {

@KafkaListener(topics = "someTopic")

public void processMessage(String content) {

// ...

}

}

如果定义了 KafkaTransactionManager bean,它将自动关联到容器工厂。同样,如果定义了 RecordMessageConverter , ErrorHandler

或 AfterRollbackProcessor bean,它将自动与默认工厂相关联。

自定义 ChainedKafkaTransactionManager 必须标记为 @Primary ,因为它通常会引用自动配置

的 KafkaTransactionManager bean。

33.3.3卡夫卡流

Apache Kafka的Spring提供了一个工厂bean来创建一个 StreamsBuilder 对象并管理其流的生命周期。Spring Boot只要 kafka-streams 在

类路径上,并且通过 @EnableKafkaStreams 注释启用Kafka Streams,就会自动配置所需的 KafkaStreamsConfiguration bean。

启用Kafka Streams意味着必须设置应用程序ID和引导程序服务器。可以使用 spring.kafka.streams.application-id 配置前者,如果未设

置,则默认为 spring.application.name 。后者可以全局设置或专门为流而重写。

使用专用属性可以使用其他几个属性; 可以使用 spring.kafka.streams.properties 命名空间设置其他任意Kafka属性。有关更多信息,另请

参见第33.3.4节“其他Kafka属性”。

要使用工厂bean,只需将 StreamsBuilder 连接到 @Bean ,如下例所示:

@Configuration

@EnableKafkaStreams

static class KafkaStreamsExampleConfiguration {

@Bean

public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {

KStream<Integer, String> stream = streamsBuilder.stream("ks1In");

stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",

Produced.with(Serdes.Integer(), new JsonSerde<>()));

return stream;

}

}

默认情况下,由其创建的 StreamBuilder 对象管理的流将自动启动。您可以使用 spring.kafka.streams.auto-startup 属性自定义此行为。

33.3.4附加Kafka属性

自动配置支持的属性显示在 附录A,常见应用程序属性中。请注意,在大多数情况下,这些属性(连字符或camelCase)直接映射到Apache

Kafka点状属性。有关详细信息,请参阅Apache Kafka文档。

这些属性中的前几个适用于所有组件(生产者,使用者,管理员和流),但如果您希望使用不同的值,则可以在组件级别指定。Apache Kafka

指定重要性为HIGH,MEDIUM或LOW的属性。Spring Boot auto-configuration支持所有HIGH重要性属性,一些选定的MEDIUM和LOW属

性,以及任何没有默认值的属性。

只有Kafka支持的属性的一部分可以通过 KafkaProperties 类直接获得。如果您希望使用不直接支持的其他属性配置生产者或使用者,请使用以

下属性:

spring.kafka.properties.prop.one=first

spring.kafka.admin.properties.prop.two=second

spring.kafka.consumer.properties.prop.three=third

spring.kafka.producer.properties.prop.four=fourth

spring.kafka.streams.properties.prop.five=fifth

这将常见的 prop.one Kafka属性设置为 first (适用于生产者,消费者和管理员),将 prop.two admin属性设置为 second ,

将 prop.three 使用者属性设置为 third , prop.four 生产者属性为 fourth , prop.five 流属性为 fifth 。

您还可以按如下方式配置Spring Kafka JsonDeserializer :

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

同样,您可以禁用在标头中发送类型信息的 JsonSerializer 默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.producer.properties.spring.json.add.type.headers=false

重要

以这种方式设置的属性会覆盖Spring Boot明确支持的任何配置项。

34.使用 RestTemplate 调用REST服务

如果需要从应用程序调用远程REST服务,可以使用Spring Framework的 RestTemplate 类。由于 RestTemplate 实例在使用之前通常需要进行

自定义,因此Spring Boot不提供任何单个自动配置 RestTemplate bean。但是,它会自动配置 RestTemplateBuilder ,可在需要时用于创

建 RestTemplate 实例。自动配置的 RestTemplateBuilder 确保将合理的 HttpMessageConverters 应用于 RestTemplate 实例。

以下代码显示了一个典型示例:

@Service

public class MyService {

private final RestTemplate restTemplate;

public MyService(RestTemplateBuilder restTemplateBuilder) {

this.restTemplate = restTemplateBuilder.build();

}

public Details someRestCall(String name) {

return this.restTemplate.getForObject("/{name}/details", Details.class, name);

}

}

RestTemplateBuilder 包含许多可用于快速配置 RestTemplate 的有用方法。例如,要添加BASIC auth支持,可以使

用 builder.basicAuthentication("user", "password").build() 。

34.1 RestTemplate自定义

RestTemplate 自定义有三种主要方法,具体取决于您希望自定义应用的广泛程度。

要使任何自定义的范围尽可能窄,请注入自动配置的 RestTemplateBuilder ,然后根据需要调用其方法。每个方法调用都返回一个新

的 RestTemplateBuilder 实例,因此自定义只会影响构建器的这种使用。

要进行应用程序范围的附加自定义,请使用 RestTemplateCustomizer bean。所有这些beans都会自动注册到自动配置

的 RestTemplateBuilder ,并应用于使用它构建的任何模板。

以下示例显示了一个自定义程序,它为除 192.168.0.5 之外的所有主机配置代理的使用:

static class ProxyCustomizer implements RestTemplateCustomizer {

@Override

public void customize(RestTemplate restTemplate) {

HttpHost proxy = new HttpHost("proxy.example.com");

HttpClient httpClient = HttpClientBuilder.create()

setRoutePlanner(new DefaultProxyRoutePlanner(proxy) {

@Override

public HttpHost determineProxy(HttpHost target,

HttpRequest request, HttpContext context)

throws HttpException {

if (target.getHostName().equals("192.168.0.5")) {

return null;

}

return super.determineProxy(target, request, context);

}

}).build();

restTemplate.setRequestFactory(

new HttpComponentsClientHttpRequestFactory(httpClient));

}

}

最后,最极端(也很少使用)的选项是创建自己的 RestTemplateBuilder bean。这样做会关闭 RestTemplateBuilder 的自动配置,并阻止

使用任何 RestTemplateCustomizer beans。

0 人点赞