谈谈代码:函数式编程

2024-01-09 08:16:56 浏览数 (1)

1. 前言

一个风和日丽的下午,我看着日常看代码做重构迁移,突然看到这么段代码:

代码语言:javascript复制
    private void getTopicsDiskSizeForSomeBroker(int brokerID, AdminClient admin, Map<String, Long> topicsSizeMap) throws ExecutionException, InterruptedException {
        DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
        Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
        for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {
            Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
            for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {
                DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
                Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
                for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {
                    String topic = replicas.getKey().topic();
                    Long topicSize = topicsSizeMap.get(topic);
                    if (topicSize == null) {
                        topicsSizeMap.put(topic, replicas.getValue().size);
                    } else {
                        topicsSizeMap.put(topic, replicas.getValue().size   topicSize);
                    }
                }
            }
        }
    }

看了这段代码我整个人都不好了!

首先是那火箭式的三个嵌套for循环,再者就是那些变量声明语句.为了迭代他们,我们不得不声明它一遍...

2. 使用Stream

代码语言:javascript复制
    public List<KafkaTopicInfoBO> getTopicDiskSize() {
        return getTopicPartitionReplicaInfo().entrySet().stream()
                .map(e -> new KafkaTopicInfoBO(e.getKey().topic(), e.getValue().size))
                .collect(Collectors.toList());

    }

    protected Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> getTopicPartitionReplicaInfo() {
        Properties globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG);
        Properties adminConfig = new Properties();
        adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)));
        AdminClient adminClient = AdminClient.create(adminConfig);
        List<String> brokerIds = zkConfigService.getChildByPath(kafkaIdsPath);
        return  brokerIds.stream()
                .map(Integer::valueOf)
                .map(Collections::singletonList)
                .map(adminClient::describeLogDirs)
                .map(DescribeLogDirsResult::all)
                .map(mapKafkaFuture -> {
                    try {
                        return mapKafkaFuture.get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                })
                .map(Map::values)
                .flatMap(Collection::stream)
                .map(Map::values)
                .flatMap(Collection::stream)
                .map(e -> e.replicaInfos)
                .map(Map::entrySet)
                .flatMap(Collection::stream)
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

这样看起来似乎好了一点.但是对于不熟悉函数式编程的同学来说,理解以上代码还是有点困难的.

接下来,先来简单讲一讲函数式编程.

3. 什么是函数式编程

3.1 一句话搞懂

就像来自数学中的代数

代码语言:javascript复制
f(x)=5x^2 4x 3
g(x)=2f(x) 5=10x^2 8x 11
h(x)=f(x) g(x)=15x^2 12x 14

函数式编程定义输入数据和输出数据相关的关系——数学表达式里面其实是在做一种映射(Mapping),输入的数据和输出的数据关系是什么样的,就是用来函数定义的.

3.2 直观感受:用代码举例

代码语言:javascript复制
public class Quint{
    public static void main (String args[]){
        for (int i=0; i<25; i  ){
            System.out.println(i*i);
        }
    }
}
代码语言:javascript复制
(println (take 25 (map (fn [x] (*x x) (range)))))

简单解释一下上段Lisp代码:

  1. range函数回返回一个从0开始的整数无穷列表
  2. 然后该列表会被传入map,针对列表中的每个元素,调用平方值的匿名函数,产生了一个无穷多的,包含平方值的列表
  3. 将列表传入take函数,仅仅返回前25个
  4. println将接入的参数输出

4. 使用对函数式编程支持更好的Kotlin

代码语言:javascript复制
    protected fun getTopicPartitionReplicaInfo(): Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> {
        val globalConf = zkConfigService.getProperties(ZkPathUtils.GLOBAL_CONFIG)
        val adminConfig = Properties()
        adminConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, globalConf.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
        val adminClient = AdminClient.create(adminConfig)
        val brokerIds = zkConfigService.getChildByPath(kafkaIdsPath)
        return brokerIds.stream()
                .mapToInt {
                    Integer.valueOf(it)
                }.let { intStream ->
                    adminClient.describeLogDirs(intStream.boxed().collect(Collectors.toList()))
                }.let { describeLogDirsResult ->
                    describeLogDirsResult.all()
                }.let { mapKafkaFutrue ->
                    mapKafkaFutrue.get()
                }.let { mapStream ->
                    mapStream.values
                }.let {
                    it.stream().map { e -> e.values }.flatMap { e -> e.stream() }.collect(Collectors.toList())
                }.flatMap {
                    it.replicaInfos.entries.toList()
                }.let { it ->
                    it.associateBy({ it.key }, { it.value })
                }
    }

代码看起来大差不差.但Kotlin的这些关键字写起来更方便.我们看下Java中map函数和Kotlin中let函数的签名:

代码语言:javascript复制
     * Returns a stream consisting of the results of applying the given
     * function to the elements of this stream.
     *
     * <p>This is an <a href="package-summary.html#StreamOps">intermediate
     * operation</a>.
     *
     * @param <R> The element type of the new stream
     * @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
     *               <a href="package-summary.html#Statelessness">stateless</a>
     *               function to apply to each element
     * @return the new stream
     */
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
代码语言:javascript复制
/**
 * Calls the specified function [block] with `this` value as its argument and returns its result.
 *
 * For detailed usage information see the documentation for [scope functions](https://kotlinlang.org/docs/reference/scope-functions.html#let).
 */
@kotlin.internal.InlineOnly
public inline fun <T, R> T.let(block: (T) -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return block(this)
}

我们可以看到Java中的map是被限制在Stream API中的,而Kotlin的let并没有这种限制.

同时,我们也可以感受到,对于函数式编程的支持,明显是Kotlin更好一些——在Kotlin中,我们用一个{}就可以表示函数,而Java则需要Interface来表示(在Java中,对象是一等公民).

如果读者有兴趣的话,可以尝试一下HaskellLisp(JVM上叫Clojure).这些都是纯函数式语言.

类似,Kotlin还有很多这种函数,被称为作用域函数,在这里罗列一下常用的函数:

  • let
  • run
  • also
  • apply
  • takeIf
  • takeUnless
  • repeat

5. 小结

在《架构整洁之道》中,有这么一个总结:

  • 结构化编程是对程序控制权的直接转移的限制
  • 面向对象编程是对程序控制权的间接转移的限制
  • 函数式编程是对程序赋值操作的限制

如果说面向对象编程是对数据进行抽象,那么函数式编程则是对行为进行抽象.

5.2 函数式编程的三件套:

  1. Map
  2. Reduce
  3. Filter

举个例子,面包和蔬菜map到切碎的操作上,再reduce成汉堡.

我们可以看到map和reduce不关心输入数据,它们只控制,并不是业务.控制是描述怎么干,而业务描述要干什么.

在本文中,我们只看到了map的身影——上面提到了,map对流中的每一个元素进行操作.

可能会有读者问let是啥,在本文的代码例子中,let针对整个流进行操作.

简单来说, Map && Reduce 对应了我们日常中用的循环,而Filter对应了If

5.3 优势 && 劣势

  • 优势
    1. 无状态
    2. 并发无伤害
    3. 函数执行没有顺序上的问题
  • 劣势
    1. 数据复制严重

5.4 应用场景

  • Python的装饰器模式
  • 事件溯源:不记录最终状态,而是记录每一个事件.需要时,通过追溯(重新计算)事件来得出当前的状态.如:
    1. 数据库事务日志
    2. 版本控制器
    3. 比特币

0 人点赞