如何在Mule 4 Beta中实现自动流式传输

2018-06-07 16:40:44 浏览数 (1)

How Automatic Streaming in Mule 4 Beta Works

原文作者:Mariano Gonzalez

原文地址:https://dzone.com/articles/how-automatic-streaming-in-mule-4-beta-works

译者微博:@从流域到海域

译者博客:blog.csdn.net/solo95

如何在Mule 4 Beta中实现自动流式传输

现在流传输就像喝啤酒那样简单!Mule 4使您能够处理,访问,转换以及传输数据的方式有了令人难以置信的改善。对于特定的流式传输,Mule 4支持多个并行数据读取,没有副作用,并且用户无需先将数据缓存到内存中。

很多人不熟悉流传输的概念。因此,在我们深入了解Mule 4的流媒体特性之前,我们首先介绍一些能比较突出其价值的用例。

示例1:HTTP> 2 Files

在这个简单的流程中,您从HTTP(比方说,带有JSON的POST)接收内容,然后将其写入两个文件。运行后得到的结果是什么?第一个文件被正确写入。第二个文件被创建,但其内容为空。

示例2:HTTP> Logs> File

这个例子接收到相同的JSON POST,但是这一次它会记录它并将其写入文件。这个流程的输出是你所期望的。其中内容被记录并且文件也被写入。但行为是否正确?最简洁的答案是不。

长然而简洁的原因是,为了记录有效载荷,记录器必须完全处理掉(consume)流,这意味着它的全部内容将被加载到内存中。消息传到文件连接器时,内容已全部在内存中。大多数时候,这并不是问题; 但如果内容体量过大并且将其加载到内存中,则应用程序很可能会耗尽内存 - 这威胁到应用程序的稳定性。

示例3:HTTP> Scatter-Gather> Whatever

现在,让我们尝试同样的例子但使用分散收集组件(scatter-gather component)(仅用于说明目的)。这种情况只是失败。一个流不能同时被两个不同的线程使用,因此该组件只有两个选项:

  1. 将整个流加载到内存中(如记录器一样)。
  2. 失败。

分散收集组件选择了后者。

但为什么?

这是我们真正需要了解流式传输含义含义的部分。处理流有两个问题:

  1. 它只能被读取一次。
  2. 它不能并行读取。

赫拉克利特说,你不能在同一条河流洗两次澡。这是因为每次洗澡时,组成这条河流的水滴都不相同。喝一品脱啤酒也是如此。你喝的每一口都是一口不能再喝的。流传输中也发生了同样的事情。

流的思路是,为了避免完全将潜在的大块数据加载到内存中,您可以通过一次一小口一小口地加载它。这意味着,虽然你仍在“消化”(即处理)第一口饮料,但第二口饮料已经通过你的咽喉(AKA网络,磁盘IO等)。这不仅节省了内存,而且还提高了性能。问题是啜饮过的(即处理过的流)不能被回收!

回到示例1,在第一个文件出站后“饮用”数据流以处理它(将其写入磁盘)之后,数据流变空了(其中没有啤酒)。为了使示例正常工作,需要在第一个文件出站处理器之前放置一个<object-to-string />转换器。这样做效果并不明显,并且会迫使Mule将流的内容完全加载到内存中。

同样在示例2中,记录器必须将整个内容加载到内存中并替换掉消息有效负载。又一次,所有内容都被加载到内存中。

可重复流的介绍

那是否有一种方法可以再次让同样的啤酒倒满杯子?

在Mule 4中,你不再需要担心回答以下问题:

  • 哪些组件正在流式传输,哪些不是?
  • 流在是在此时被处理的吗?
  • 流到底在哪个位置?
  • 流在深层次意味着什么?

Mule 4现在确保任何需要读取流的组件都能够这样做,而不管哪些组件已经被篡改。该流将始终可用并将处于其起始位置。

文件存储可重复流

文件存储可重复流需要缓冲,而且我们有不同的缓冲策略。Mule现在在内存中保留了一部分内容。如果流内容小于该缓冲区的大小,那么我们很好。如果内容量较大,Mule会先将缓冲区的内容备份到磁盘,然后清除内存。这是Mule 4的默认策略。

在内存的可重复流中

你也可以采取内存策略。在这种模式下进行流式传输时,Mule永远不会使用磁盘来缓冲内容。如果超过缓冲区大小,则消息传送将失败。

代码语言:txt复制
< file : read path = “bigFile.json” > 
  < repeatable - in - memory - stream initialBufferSize = “512” 
                               bufferSizeIncrement = “256” 
                               maxBufferSize = “2048” 
                               bufferUnit = “KB” / > 
< / file : read >

以并行方式读取流

到现在为止都很好!但是我们只解决了例子1和例子2的问题,例子3仍然没有解决。

让我们回到我们的啤酒故事。所以我们回到酒吧,喝了一杯啤酒。假设1品脱包含500毫升啤酒。由于这个世界很小,你碰巧碰到酒吧的一位老朋友,你开始分享你的啤酒。借助使用吸管,你们可以平行喝,但你永远不会喝你的和朋友一样的一小口。而且,由于你在分享,当啤酒喝完时,你没有喝到完整的 500cc,这意味着你失去了一些内容。

流传输中发生了同样的事情。如果两个线程同时从同一个流中读取,则一个线程将占用一些字节,另一个线程将占用其他字节,但是没有一个线程拥有完整的内容。因此,内容已损坏。

Mule 4中新的可重复的流框架自动解决了这个问题。所有可重复的流都支持并行访问。Mule 4将自动确保组件A读取流时,它不会在组件B中产生任何副作用,从而消除脏读操作!

禁用可重复流

虽然不常见,但有些情况下您可能想要禁用此功能并使用普通的旧的流(处理方式)。例如,你的用例可能并不需要这个,你不想为额外的内存或性能开销付费。再次,您可以使用以下方法禁用它:

代码语言:txt复制
< file : read path = “bigFile.json” > 
  < non - repeatable - stream / > 
< / file : read >

请注意,通过禁用此功能,即使使用Mule 4,示例1,示例2和示例3的所有缺陷也会变为当前值

流媒体对象

原始字节流不是Mule 4支持的流式传输的唯一情况。早在2013年,Mule 3.5就发布了,我们引入了自动分页连接器的概念。这是一个允许连接器(如Salesforce)透明地访问分页数据的功能。这是一种流式传输!在底层,连接器读取了第一页,当它被使用时,它会去取下一页,从内存中丢弃前面的页面。实质上,这与从FTP流式传输文件完全相同。

文件存储自动分页

默认情况下,您现在将获得一个缓冲区,该缓冲区将大量对象保存到内存中,并使用该磁盘缓冲剩余的内容:

代码语言:txt复制
< sfdc : query query = “dsql:...”  / >

就像以前一样,你也可以在内存中完全做到这一点:

代码语言:txt复制
< sfdc : query query = “dsql:...” > 
  < repeatable - in - memory - iterable
    initialBufferSize = “100”  
    bufferSizeIncrement = “100”  
    maxBufferSize = “500”  / > 
< / sfdc : query >
缓冲区大小

但请注意,然而,控制缓冲区大小在这里需要不同的方法。在前面的例子中,所有的缓冲区大小都是以字节为单位来衡量的(或者是一个派生单位,如KB)。在这种情况下,我们会探讨以实例计数。

对象序列化

为了让FileStore策略将磁盘用作缓冲区,它需要序列化流式对象。这是否意味着它只适用于实现java.io序列化接口的对象?一点也不。就像批处理模块一样,该功能使用Kryo框架来序列化默认情况下JVM无法序列化的内容。尽管Kryo实现了很多黑魔法,但它既不强大也不是银弹(喻指新技术,尤指人们寄予厚望的某种新科技)。有些东西就是不能被序列化,所以尽量保持你的对象是简单的状态!

数据迁移数据迁移

0 人点赞