1. 前言
一个风和日丽的下午,我看着日常看代码做重构迁移,突然看到这么段代码:
代码语言:javascript复制 private void getTopicsDiskSizeForSomeBroker(int brokerID, AdminClient admin, Map<String, Long> topicsSizeMap) throws ExecutionException, InterruptedException {
DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {
Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {
DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {
String topic = replicas.getKey().topic();
Long topicSize = topicsSizeMap.get(topic);
if (topicSize == null) {
topicsSizeMap.put(topic, replicas.getValue().size);
} else {
topicsSizeMap.put(topic, replicas.getValue().size topicSize);
}
}
}
}
}
看了这段代码我整个人都不好了!
首先是那火箭式的三个嵌套for循环,再者就是那些变量声明语句.为了迭代他们,我们不得不声明它一遍...
2. 使用Stream
代码语言:javascript复制 public List<KafkaTopicInfoBO> getTopicDiskSize() {
return getTopicPartitionReplicaInfo().entrySet().stream()
.map(e -> new KafkaTopicInfoBO(e.getKey().topic(), e.getValue().size))
.collect(Collectors.toList());
}
protected Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> getTopicPartitionReplicaInfo() {
Properties globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG);
Properties adminConfig = new Properties();
adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
AdminClient adminClient = AdminClient.create(adminConfig);
List<String> brokerIds = zkConfigService.getChildByPath(kafkaIdsPath);
return brokerIds.stream()
.map(Integer::valueOf)
.map(Collections::singletonList)
.map(adminClient::describeLogDirs)
.map(DescribeLogDirsResult::all)
.map(mapKafkaFuture -> {
try {
return mapKafkaFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.map(Map::values)
.flatMap(Collection::stream)
.map(Map::values)
.flatMap(Collection::stream)
.map(e -> e.replicaInfos)
.map(Map::entrySet)
.flatMap(Collection::stream)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
这样看起来似乎好了一点.但是对于不熟悉函数式编程的同学来说,理解以上代码还是有点困难的.
接下来,先来简单讲一讲函数式编程.
3. 什么是函数式编程
3.1 一句话搞懂
就像来自数学中的代数
代码语言:javascript复制f(x)=5x^2 4x 3
g(x)=2f(x) 5=10x^2 8x 11
h(x)=f(x) g(x)=15x^2 12x 14
函数式编程定义输入数据和输出数据相关的关系——数学表达式里面其实是在做一种映射(Mapping),输入的数据和输出的数据关系是什么样的,就是用来函数定义的.
3.2 直观感受:用代码举例
代码语言:javascript复制public class Quint{
public static void main (String args[]){
for (int i=0; i<25; i ){
System.out.println(i*i);
}
}
}
代码语言:javascript复制(println (take 25 (map (fn [x] (*x x) (range)))))
简单解释一下上段Lisp
代码:
- range函数回返回一个从0开始的整数无穷列表
- 然后该列表会被传入map,针对列表中的每个元素,调用平方值的匿名函数,产生了一个无穷多的,包含平方值的列表
- 将列表传入take函数,仅仅返回前25个
- println将接入的参数输出
4. 使用对函数式编程支持更好的Kotlin
代码语言:javascript复制 protected fun getTopicPartitionReplicaInfo(): Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> {
val globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG)
val adminConfig = Properties()
adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
val adminClient = AdminClient.create(adminConfig)
val brokerIds = zkConfigService.getChildByPath(kafkaIdsPath)
return brokerIds.stream()
.mapToInt {
Integer.valueOf(it)
}.let { intStream ->
adminClient.describeLogDirs(intStream.boxed().collect(Collectors.toList()))
}.let { describeLogDirsResult ->
describeLogDirsResult.all()
}.let { mapKafkaFutrue ->
mapKafkaFutrue.get()
}.let { mapStream ->
mapStream.values
}.let {
it.stream().map { e -> e.values }.flatMap { e -> e.stream() }.collect(Collectors.toList())
}.flatMap {
it.replicaInfos.entries.toList()
}.let { it ->
it.associateBy({ it.key }, { it.value })
}
}
代码看起来大差不差.但Kotlin的这些关键字写起来更方便.我们看下Java中map
函数和Kotlin中let
函数的签名:
* Returns a stream consisting of the results of applying the given
* function to the elements of this stream.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* @param <R> The element type of the new stream
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function to apply to each element
* @return the new stream
*/
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
代码语言:javascript复制/**
* Calls the specified function [block] with `this` value as its argument and returns its result.
*
* For detailed usage information see the documentation for [scope functions](https://kotlinlang.org/docs/reference/scope-functions.html#let).
*/
@kotlin.internal.InlineOnly
public inline fun <T, R> T.let(block: (T) -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return block(this)
}
我们可以看到Java中的map
是被限制在Stream API中的,而Kotlin的let
并没有这种限制.
同时,我们也可以感受到,对于函数式编程的支持,明显是Kotlin更好一些——在Kotlin中,我们用一个{}
就可以表示函数,而Java则需要Interface来表示(在Java中,对象是一等公民).
如果读者有兴趣的话,可以尝试一下
Haskell
或Lisp
(JVM上叫Clojure).这些都是纯函数式语言.
类似,Kotlin还有很多这种函数,被称为作用域函数,在这里罗列一下常用的函数:
- let
- run
- also
- apply
- takeIf
- takeUnless
- repeat
5. 小结
在《架构整洁之道》中,有这么一个总结:
- 结构化编程是对程序控制权的直接转移的限制
- 面向对象编程是对程序控制权的间接转移的限制
- 函数式编程是对程序赋值操作的限制
如果说面向对象编程是对数据进行抽象,那么函数式编程则是对行为进行抽象.
5.2 函数式编程的三件套:
- Map
- Reduce
- Filter
举个例子,面包和蔬菜map到切碎的操作上,再reduce成汉堡.
我们可以看到map和reduce不关心输入数据,它们只控制,并不是业务.控制是描述怎么干,而业务描述要干什么.
在本文中,我们只看到了map的身影——上面提到了,map对流中的每一个元素进行操作.
可能会有读者问
let
是啥,在本文的代码例子中,let
针对整个流进行操作.
简单来说, Map && Reduce
对应了我们日常中用的循环
,而Filter
对应了If
5.3 优势 && 劣势
- 优势
- 无状态
- 并发无伤害
- 函数执行没有顺序上的问题
- 劣势
- 数据复制严重
5.4 应用场景
- Python的装饰器模式
- 事件溯源:不记录最终状态,而是记录每一个事件.需要时,通过追溯(重新计算)事件来得出当前的状态.如:
- 数据库事务日志
- 版本控制器
- 比特币