深入领会 Flink 的网络协议栈

深入领会 Flink 的网络协议栈

Windows命令:route命令使用实例详解(多网关配置)

作者:Nico Kruber

出处:https://ververica.cn/developers/flink-network-protocol

创作不易,迎接转载,但必须在文章开头保留此段声明,否则保留追究法律责任的权力。

Flink 的网络协议栈是组成 flink-runtime 模块的焦点组件之一,是每个 Flink 作业的焦点。它毗邻所有 TaskManager 的各个子义务(subtask),因此,对于 Flink 作业的性能包罗吞吐与延迟都至关主要。与 TaskManager 和 JobManager 之间通过基于 Akka 的 RPC 通讯的控制通道差别,TaskManager 之间的网络协议栈依赖于加倍底层的 Netty API。

本文将首先先容 Flink 露出给流算子(Stream operator)的高层抽象,然后详细先容 Flink 网络协议栈的物理实现和种种优化、优化的效果以及 Flink 在吞吐量和延迟之间的权衡。

1. 逻辑视图

Flink 的网络协议栈为相互通讯的子义务提供以下逻辑视图,例如在 A 通过 keyBy() 操作举行数据 Shuffle :

深入领会 Flink 的网络协议栈

 

这一历程建立在以下三种基本概念抽象的基础上:

▼ 子义务输出类型(ResultPartitionType):

  • Pipelined(有限的或无限的):一旦发生数据就可以连续向下游发送有限数据流或无限数据流。
  • Blocking:仅在天生完整效果后向下游发送数据。

▼ 调剂计谋:

  • 同时调剂所有义务(Eager):同时部署作业的所有子义务(用于流作业)。
  • 上游发生第一条纪录部署下游(Lazy):一旦任何生产者天生任何输出,就立刻部署下游义务。
  • 上游发生完整数据部署下游:当任何或所有生产者天生完整数据后,部署下游义务。

▼ 数据传输

  • 高吞吐:Flink 不是一个一个地发送每条纪录,而是将若干纪录缓冲到其网络缓冲区中并一次性发送它们。这降低了每条纪录的发送成本因此提高了吞吐量。
  • 低延迟:当网络缓冲区跨越一定的时间未被填满时会触发超时发送,通过减小超时时间,可以通过牺牲一定的吞吐来获取更低的延迟。

我们将在下面深入Flink网络协议栈的物理实现时看到关于吞吐延迟的优化。对于这一部门,让我们详细说明输出类型与调剂计谋。首先,需要知道的是子义务的输出类型和调剂计谋是慎密关联的,只有两者的一些特定组合才是有用的。

Pipelined效果是流式输出,需要目的subtask正在运行以便吸收数据。因此需要在上游task发生数据之前或者发生第一条数据的时刻调剂下游目的task运行。批处置作业天生有界效果数据,而流式处置作业发生无限效果数据。

批处置作业也可能以壅闭方式发生效果,详细取决于所使用的算子和毗邻模式。在这种情形下,必须守候上游task先天生完整的效果,然后才气调剂下游的吸收task运行。这能够提高批处置作业的效率而且占用更少的资源。

下表总结了 Task 输出类型以及调剂计谋的有用组合:

输出类型调剂计谋适用于…pipelined, unbounded一次性调剂所有 task流作业pipelined, unbounded上游发生第一条输出调剂下游N / A’pipelined,
bounded一次性调剂所有 taskN /A²pipelined,
bounded上游发生第一条输出调剂下游批作业blocking上游发生完整输出调剂下游批作业

注释:

[1]现在 Flink 未使用

[2]批处置 / 流盘算统一完成后,可能适用于流式作业

此外,对于具有多个输入的子义务,调剂以两种方式启动:当所有或者任何上游义务发生第一条数据或者发生完整数据时调剂义务运行。要调整批处置作业中的输出类型和调剂计谋,可以参考ExecutionConfig#setExecutionMode() ——尤其是ExecutionMode,以及ExecutionConfig#setDefaultInputDependencyConstraint()。

2. 物理数据传输

为了明白物理数据毗邻,请回忆一下,在Flink中,差别的义务可以通过slot sharing group共享相同slot。TaskManager还可以提供多个slot,以允许将统一义务的多个子义务调剂到统一个TaskManager上。

对于下图所示的示例,我们假设 2 个并发为 4 的义务部署在 2 个 TaskManager 上,每个 TaskManager 有两个 Slot。TaskManager 1 执行子义务 A.1,A.2,B.1 和 B.2,TaskManager 2 执行子义务 A.3,A.4,B.3 和 B.4。在 A 和 B 之间是 Shuffle 毗邻类型,好比来自于 A 的 keyBy() 操作,在每个 TaskManager 上会有 2×4 个逻辑毗邻,其中一些是内陆的,另一些是远程的:

深入领会 Flink 的网络协议栈

 

差别义务(远程)之间的每个网络毗邻将在 Flink 的网络客栈中获得自己的 TCP 通道。然则,若是统一义务的差别子义务被调剂到统一个 TaskManager,则它们与统一个 TaskManager 的网络毗邻将多路复用并共享统一个 TCP 信道以削减资源使用。在我们的例子中,这适用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下图所示:

深入领会 Flink 的网络协议栈

 

每个子义务的输出效果称为ResultPartition,每个ResultPartition被分成多个单独的ResultSubpartition– 每个逻辑通道一个。Flink的网络协议栈在这一点的处置上,不再处置单个纪录,而是将一组序列化的纪录填充到网络缓冲区中举行处置。每个子义务内陆缓冲区中最多可用buffer数目为(每个发送方和吸收方各一个):

#channels * buffers-per-channel + floating-buffers-per-gate

单个TaskManager上的网络层buffer总数通常不需要设置。有关若何在需要时举行设置的详细信息,请参阅设置网络缓冲区的文档

▼ 造成反压(1)

每当子义务的数据发送缓冲区耗尽时——数据驻留在 Subpartition 的缓冲区行列中或位于更底层的基于 Netty 的网络客栈内,生产者就会被壅闭,无法继续发送数据,而受到反压。吸收端以类似的方式事情:Netty 收到任何数据都需要通过网络 Buffer 通报给 Flink。若是响应子义务的网络缓冲区中没有足够可用的网络 Buffer,Flink 将住手从该通道读取,直到 Buffer 可用。这将反压该多路复用上的所有发送子义务,因此也限制了其他吸收子义务。下图说明晰过载的子义务 B.4,它会导致多路复用的反压,也会导致子义务 B.3 无法接受和处置数据,即使是 B.3 另有足够的处置能力。

深入领会 Flink 的网络协议栈

 

为了防止这种情形发生,Flink 1.5引入了自己的流量控制机制。

3.Credit-based流量控制

Credit-based 流量控制可确保发送端已经发送的任何数据,吸收端都具有足够的能力(Buffer)来吸收。新的流量控制机制基于网络缓冲区的可用性,作为 Flink 之前机制的自然延伸。每个远程输入通道(RemoteInputChannel)现在都有自己的一组独占缓冲区(Exclusive buffer),而不是只有一个共享的内陆缓冲池(LocalBufferPool)。与之前差别,内陆缓冲池中的缓冲区称为流动缓冲区(Floating buffer),由于它们会在输出通道间流动而且可用于每个输入通道。

数据吸收方会将自身的可用 Buffer 作为 Credit 见告数据发送方(1 buffer = 1 credit)。每个 Subpartition 会跟踪下游吸收端的 Credit(也就是可用于吸收数据的 Buffer 数目)。只有在响应的通道(Channel)有 Credit 的时刻 Flink 才会向更底层的网络协议栈发送数据(以 Buffer 为粒度),而且每发送一个 Buffer 的数据,响应的通道上的 Credit 会减 1。除了发送数据自己外,数据发送端还会发送响应 Subpartition 中有若干正在排队发送的 Buffer 数(称之为 Backlog)给下游。数据吸收端会行使这一信息(Backlog)去申请合适数目的 Floating buffer 用于吸收发送端的数据,这可以加速发送端聚积数据的处置。吸收端会首先申请和 Backlog 数目相等的 Buffer,但可能无法申请到所有,甚至一个都申请不到,这时吸收端会行使已经申请到的 Buffer 举行数据吸收,并监听是否有新的 Buffer 可用。

深入领会 Flink 的网络协议栈

 

Credit-based 的流控使用 buffers-per-channel 来指定每个channel有若干独占的buffer,使用 floating-buffers-per-gate 来指定共享的内陆缓冲池(local buffer pool)巨细(可选3),通过共享内陆缓冲池,credit-based流控可以使用的buffer数目可以到达与原来非credit-based流控同样的巨细。这两个参数的默认值是被经心选取的,以保证新的 credit-based 流控在网络康健延迟正常的情形下至少可以到达与原计谋相同的吞吐。您可以根据您现实的网络 RRT(round-trip-time)和带宽对这两个参数举行调整。

跨境网络小知识之IP协议

注释3:若是没有足够的 Buffer 可用,则每个缓冲池将获得全局可用 Buffer 的相同份额(±1)。

▼ 造成反压(2)

与没有流量控制的吸收端反压机制差别,Credit 提供了更直接的控制:若是吸收端的处置速率跟不上,最终它的 Credit 会削减成 0,此时发送端就不会在向网络中发送数据(数据会被序列化到 Buffer 中并缓存在发送端)。由于反压只发生在逻辑链路上,因此没必要阻断从多路复用的 TCP 毗邻中读取数据,也就不会影响其他的吸收者吸收和处置数据。

▼ Credit-based 的优势与问题

由于通过 credit-based 流控机制,多路复用中的一个信道不会由于反压壅闭其他逻辑信道,因此整体资源行使率会增添。此外,通过完全控制正在发送的数据量,我们还能够加速 checkpoint alignment:若是没有流量控制,通道需要一段时间才气填满网络协议栈的内部缓冲区并解释吸收端不再读取数据了。在这段时间里,大量的 buffer 不会被处置。任何 checkpoint barrier(触发 checkpoint 的新闻)都必须在这些数据 buffer 区后排队,因此必须等到所有这些数据都被处置后才气够触发 checkpoint(“barrier 不会再数据之前被处置!”)。

然则,来自吸收方的附加通告新闻(向发送端通知 Credit)可能会发生一些分外的开销,尤其是在使用 SSL 加密信道的场景中。此外,单个输入通道( Input channel)不能使用缓冲池中的所有 Buffer,由于存在无法共享的 Exclusive buffer。新的流控协议也有可能无法做到立刻发送尽可能多的数据(若是天生数据的速率快于吸收端反馈 Credit 的速率),这时则可能增进发送数据的时间。虽然这可能会影响作业的性能,但由于其所有优点,通常新的流量控制会显示得更好。可能会通过增添单个通道的独占 Buffer 数目,这会增大内存开销。然而,与先前实现相比,总体内存使用可能仍然会降低,由于底层的网络协议栈不再需要缓存大量数据,由于我们总是可以立刻将其传输到 Flink(一定会有响应的 Buffer 吸收数据)。

在使用新的 Credit-based 流量控制时,可能还会注重到另一件事:由于我们在发送方和吸收方之间缓冲较少的数据,反压可能会更早的到来。然而,这是我们所期望的,由于缓存更多数据并没有真正获得任何利益。若是要缓存更多的数据而且保留 Credit-based 流量控制,可以思量通过增添单个输入共享 Buffer 的数目。

深入领会 Flink 的网络协议栈

 

深入领会 Flink 的网络协议栈

 

注重:若是需要关闭 Credit-based 流量控制,可以将这个设置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。然则,此参数已过时,最终将与非 Credit-based 流控制代码一起删除。

4. 序列化与反序列化

下图从上面的扩展了更高级别的视图,其中包罗网络协议栈及其周围组件的更多详细信息,从发送算子发送纪录(record)到吸收算子获取它:

深入领会 Flink 的网络协议栈

 

在天生 record 并将其通报出去之后,例如通过 Collector#collect(),它被通报给 RecordWriter,RecordWriter 会将 JAVA 工具序列化为字节序列,最终存储在 buffer 中根据上面所形貌的在网络协议栈中举行处置。 RecordWriter 首先使用 SpanningRecordSerializer 将 Record 序列化为天真的堆上字节数组。然后,它实验将这些字节写入目的网络 channel 的 buffer 中。我们将在下面的章节回到这一部门。

 

在吸收方,底层网络协议栈(netty)将吸收到的 buffer 写入响应的输入通道(channel)。流义务的线程最终从这些行列中读取并实验在 RecordReader 的辅助下通过 SpillingAdaptiveSpanningRecordDeserializer 将累积的字节反序列化为 Java 工具。与序列化器类似,这个反序列化器还必须处置特殊情形,例如跨越多个网络 buffer 的 record,或者由于纪录自己比网络缓冲区大(默认情形下为 32KB,通过taskmanager.memory.segment-size设置)或者由于序列化 record 时,目的 buffer 中已经没有足够的剩余空间保留序列化后的字节数据,在这种情形下,Flink 将使用这些字节空间并继续将其余字节写入新的网络 buffer 中。

4.1 将网络buffer写入Netty

在上图中,Credit-based 流控制机制现实上位于“Netty Server”(和“Netty Client”)组件内部,RecordWriter 写入的 Buffer 始终以空状态(无数据)添加到 Subpartition 中,然后逐渐向其中填写序列化后的纪录。然则 Netty 在什么时刻真正的获取并发送这些 Buffer 呢?显然,不能是 Buffer 中只要有数据就发送,由于跨线程(写线程与发送线程)的数据交换与同步会造成大量的分外开销,而且会造成缓存自己失去意义(若是是这样的话,不如直接将将序列化后的字节发到网络上而不必引入中心的 Buffer)。

在 Flink 中,有三种情形可以使 Netty 服务端使用(发送)网络 Buffer:

  • 写入 Record 时 Buffer 变满,或者
  • Buffer 超时未被发送,或
  • 发送特殊新闻,例如 Checkpoint barrier。

▼ 在 Buffer 满后发送

RecordWriter 将 Record 序列化到内陆的序列化缓冲区中,并将这些序列化后的字节逐渐写入位于响应 Result subpartition 行列中的一个或多个网络 Buffer中。虽然单个 RecordWriter 可以处置多个 Subpartition,但每个 Subpartition 只会有一个 RecordWriter 向其写入数据。另一方面,Netty 服务端线程会从多个 Result subpartition 中读取并像上面所说的那样将数据写入适当的多路复用信道。这是一个典型的生产者 – 消费者模式,网络缓冲区位于生产者与消费者之间,如下图所示。在(1)序列化和(2)将数据写入 Buffer 之后,RecordWriter 会响应地更新缓冲区的写入索引。一旦 Buffer 完全填满,RecordWriter 会(3)为当前 Record 剩余的字节或者下一个 Record 从其内陆缓冲池中获取新的 Buffer,并将新的 Buffer 添加到响应 Subpartition 的行列中。这将(4)通知 Netty服务端线程有新的数据可发送(若是 Netty 还不知道有可用的数据的话4)。每当 Netty 有能力处置这些通知时,它将(5)从行列中获取可用 Buffer 并通过适当的 TCP 通道发送它。

深入领会 Flink 的网络协议栈

 

注释4:若是行列中有更多已完成的 Buffer,我们可以假设 Netty 已经收到通知。

▼ 在 Buffer 超时后发送

为了支持低延迟应用,我们不能只等到 buffer 满了才向下游发送数据。由于可能存在这种情形,某种通讯信道没有太多数据,等到 buffer 满了在发送会不必要地增添这些少量 record 的处置延迟。因此,Flink 提供了一个定期 flush 线程(the output flusher)每隔一段时间会将任何缓存的数据所有写出。可以通过 StreamExecutionEnvironment#setBufferTimeout 设置 flush 的距离,并作为延迟5的上限(对于低吞吐量通道)。下图显示了它与其他组件的交互方式:RecordWriter 如前所述序列化数据并写入网络 buffer,但同时,若是 Netty 还不知道有数据可以发送,output flusher 会(3,4)通知 Netty 服务端线程数据可读(类似与上面的“buffer 已满”的场景)。当 Netty 处置此通知(5)时,它将消费(获取并发送)buffer 中的可用数据并更新 buffer 的读取索引。Buffer 会保留在行列中——从 Netty 服务端对此 buffer 的任何进一步操作将在下次从读取索引继续读取。

深入领会 Flink 的网络协议栈

 

注释5:严格来说,Output flusher 不提供任何保证——它只向 Netty 发送通知,而 Netty 线程会根据能力与意愿举行处置。这也意味着若是存在反压,则 Output flusher 是无效的。

▼ 特殊新闻后发送

一些特殊的新闻若是通过 RecordWriter 发送,也会触发立刻 Flush 缓存的数据。其中最主要的新闻包罗 Checkpoint barrier 以及 end-of-partition 事宜,这些事宜应该尽快被发送,而不应该守候 Buffer 被填满或者 Output flusher 的下一次 Flush。

▼ 进一步的讨论

与小于 1.5 版本的 Flink 差别,请注重(a)网络 Buffer 现在会被直接放在 Subpartition 的行列中,(b)网络 Buffer 不会在 Flush 之后被关闭。这给我们带来了一些利益:

  • 同步开销较少(Output flusher 和 RecordWriter 是相互自力的)
  • 在高负荷情形下,Netty 是瓶颈(直接的网络瓶颈或反压),我们仍然可以在未完成的 Buffer 中填充数据
  • Netty 通知显著削减

然则,在低负载情形下,可能会泛起 CPU 使用率和 TCP 数据包速率的增添。这是由于,Flink 将使用任何可用的 CPU 盘算能力来实验维持所需的延迟。一旦负载增添,Flink 将通过填充更多的 Buffer 举行自我调整。由于同步开销削减,高负载场景不会受到影响,甚至可以实现更高的吞吐。

4.2 BufferBuilder 和 BufferConsumer

若是您想更深入地领会 Flink 中是若何实现生产者- 消费者机制,请仔细查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 类。虽然读取是以 buffer 为粒度,但写入它是按 record 举行的,因此是 Flink 中所有网络通讯的焦点路径。因此,我们需要在义务线程(task thread)和 Netty 线程之间实现轻量级毗邻,这意味着只管小的同步开销。你可以通过查看源代码获取加倍详细的信息。

5. 延迟与吞吐

引入网络 Buffer 的目是获得更高的资源行使率和更高的吞吐,价值是让 Record 在 Buffer 中守候一段时间。虽然可以通过 Buffer 超时给出此守候时间的上限,但可能很想知道有关这两个维度(延迟和吞吐)之间权衡的更多信息,显然,无法两者同时兼得。下图显示了差别的 Buffer 超时时间下的吞吐,超时时间从 0 最先(每个 Record 直接 Flush)到 100 毫秒(默认值),测试在具有 100 个节点每个节点 8 个 Slot 的群集上运行,每个节点运行没有营业逻辑的 Task 因此只用于测试网络协议栈的能力。为了举行对照,我们还测试了低延迟改善(如上所述)之前的 Flink 1.4 版本。

深入领会 Flink 的网络协议栈

 

如图,使用 Flink 1.5+,即使是异常低的 Buffer 超时(例如1ms)(对于低延迟场景)也提供高达超时默认参数(100ms)75% 的最大吞吐,但会缓存更少的数据。

6. 结论

领会 Result partition,批处置和流式盘算的差别网络毗邻以及调剂类型,Credit-Based 流量控制以及 Flink 网络协议栈内部的事情机理,有助于更好的明白网络协议栈相关的参数以及作业的行为。后续我们会推出更多 Flink 网络栈的相关内容,并深入更多细节,包罗运维相关的监控指标(Metrics),进一步的网络调优计谋以及需要制止的常见错误等。

路由器常见问题解决方法——看这篇就够了

分享到 :
相关推荐

发表评论

登录... 后才能评论