Flink 内存模型
Flink内存模型
Flink1.12.0支持更为细粒度的内存配置,本文基于Flink1.12对现行的Flink内存管理机制进行介绍,主要内容来自Flink文档
总内存说明
Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。
配置Flink内存使用的简单方法可以使用如下两个配置项
Component | Option for TaskManager | Option for JobManager |
---|---|---|
Total Flink memory | taskmanager.memory.flink.size |
jobmanager.memory.flink.size |
Total process memory | taskmanager.memory.process.size |
jobmanager.memory.process.size |
Flink会根据默认值或其他配置参数来调整剩余内存部分的大小。
配置
Flink memory
对于standalone deployments
更为合适,该参数可以声明为Flink本身提供多少内存,total Flink memory
被划分为JVM Heap
和Off-heap
内存。配置
total process memory
可以声明Flink JVM进程总计使用多少内存,对于容器话部署来说,这涉及请求多大的容器。此外,可以通过设置
total Flink memory
中的特定内部组成部分来进行内存配置,JM及TM所需要设置的内存组成部分是不一样的。详情参考TaskManager和JobManager文档。
用户需要选择至少一种方式进行配置,需要从以下无默认值的配置参数(参数组合中)选择一个给出明确的配置。
同时进行total process memory
及total flink memory
配置时可能会造成配置冲突
Flink 进程启动时,会根据配置的和自动推导出的各内存部分大小,显式地设置以下 JVM 参数:
JVM 参数 | TaskManager 取值 | JobManager 取值 |
---|---|---|
-Xmx 和 -Xms | 框架堆内存 + 任务堆内存 | JVM 堆内存 (*) |
-XX:MaxDirectMemorySize (TaskManager 始终设置,JobManager 见注释) | 框架堆外内存 + 任务堆外内存(**) + 网络内存 | 堆外内存 (**) (***) |
-XX:MaxMetaspaceSize | JVM Metaspace | JVM Metaspace |
(*) 请记住,根据所使用的 GC 算法,你可能无法使用到全部堆内存。一些 GC 算法会为它们自身分配一定量的堆内存。这会导致堆的指标返回一个不同的最大值。
(**) 请注意,堆外内存也包括了用户代码使用的本地内存(非直接内存)。
(***) 只有在 jobmanager.memory.enable-jvm-direct-memory-limit
设置为 true
时,JobManager 才会设置 JVM 直接内存限制。
TaskManager内存模型
配置堆内存和托管内存
另一种配置 Flink 内存的方式是同时设置任务堆内存和托管内存。 通过这种方式,用户可以更好地掌控用于 Flink 任务的 JVM 堆内存及 Flink 的托管内存大小。
Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。 关于各内存部分的更多细节,请参考相关文档。
提示 如果已经明确设置了任务堆内存和托管内存,建议不要再设置进程总内存或 Flink 总内存,否则可能会造成内存配置冲突。
任务(算子)堆内存
如果希望确保指定大小的 JVM 堆内存给用户代码使用,可以明确指定任务堆内存(taskmanager.memory.task.heap.size
)。 指定的内存将被包含在总的 JVM 堆空间中,专门用于 Flink 算子及用户代码的执行。
托管内存
托管内存是由 Flink 负责分配和管理的本地(堆外)内存。 以下场景需要使用托管内存:
- 流处理作业中用于 RocksDB State Backend。
- 批处理作业中用于排序、哈希表及缓存中间结果。
- 流处理和批处理作业中用于在 Python 进程中执行用户自定义函数。
可以通过以下两种范式指定托管内存的大小:
- 通过
taskmanager.memory.managed.size
明确指定其大小。 - 通过
taskmanager.memory.managed.fraction
指定在Flink 总内存中的占比。
当同时指定二者时,会优先采用指定的大小(Size)。 若二者均未指定,会根据默认占比进行计算。
消费者权重
对于包含不同种类的托管内存消费者的作业,可以进一步控制托管内存如何在消费者之间分配。 通过 taskmanager.memory.managed.consumer-weights
可以为每一种类型的消费者指定一个权重,Flink 会按照权重的比例进行内存分配。 目前支持的消费者类型包括:
DATAPROC
:用于流处理中的 RocksDB State Backend 和批处理中的内置算法。PYTHON
:用户 Python 进程。
例如,一个流处理作业同时使用到了 RocksDB State Backend 和 Python UDF,消费者权重设置为 DATAPROC:70,PYTHON:30
,那么 Flink 会将 70%
的托管内存用于 RocksDB State Backend,30%
留给 Python 进程。
提示 只有作业中包含某种类型的消费者时,Flink 才会为该类型分配托管内存。 例如,一个流处理作业使用 Heap State Backend 和 Python UDF,消费者权重设置为 DATAPROC:70,PYTHON:30
,那么 Flink 会将全部托管内存用于 Python 进程,因为 Heap State Backend 不使用托管内存。
提示 对于未出现在消费者权重中的类型,Flink 将不会为其分配托管内存。 如果缺失的类型是作业运行所必须的,则会引发内存分配失败。 默认情况下,消费者权重中包含了所有可能的消费者类型。 上述问题仅可能出现在用户显式地配置了消费者权重的情况下。
配置堆外内存(直接内存或本地内存)
用户代码中分配的堆外内存被归为任务堆外内存(Task Off-heap Memory),可以通过 taskmanager.memory.task.off-heap.size
指定。
提示 你也可以调整框架堆外内存(Framework Off-heap Memory)。 这是一个进阶配置,建议仅在确定 Flink 框架需要更多的内存时调整该配置。
Flink 将框架堆外内存和任务堆外内存都计算在 JVM 的直接内存限制中,请参考 JVM 参数。
提示 本地内存(非直接内存)也可以被归在框架堆外内存或任务堆外内存中,在这种情况下 JVM 的直接内存限制可能会高于实际需求。
提示 网络内存(Network Memory)同样被计算在 JVM 直接内存中。 Flink 会负责管理网络内存,保证其实际用量不会超过配置大小。 因此,调整网络内存的大小不会对其他堆外内存有实质上的影响。
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。
组成部分 | 配置参数 | 描述 |
---|---|---|
框架堆内存(Framework Heap Memory) | taskmanager.memory.framework.heap.size |
用于 Flink 框架的 JVM 堆内存(进阶配置)。 |
任务堆内存(Task Heap Memory) | taskmanager.memory.task.heap.size |
用于 Flink 应用的算子及用户代码的 JVM 堆内存。 |
托管内存(Managed memory) | taskmanager.memory.managed.size taskmanager.memory.managed.fraction |
由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。 |
框架堆外内存(Framework Off-heap Memory) | taskmanager.memory.framework.off-heap.size |
用于 Flink 框架的堆外内存(直接内存或本地内存)(进阶配置)。 |
任务堆外内存(Task Off-heap Memory) | taskmanager.memory.task.off-heap.size |
用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)。 |
网络内存(Network Memory) | taskmanager.memory.network.min taskmanager.memory.network.max taskmanager.memory.network.fraction |
用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存的受限的等比内存部分。 |
JVM Metaspace | taskmanager.memory.jvm-metaspace.size |
Flink JVM 进程的 Metaspace。 |
JVM 开销 | taskmanager.memory.jvm-overhead.min taskmanager.memory.jvm-overhead.max taskmanager.memory.jvm-overhead.fraction |
用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存的受限的等比内存部分。 |
Web UI 界面介绍
内存根据Slot数量划分部分源码
1 | // org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java |
获取已用Managed内存核心源码
1 | // org/apache/flink/runtime/metrics/util/MetricUtils.java |
Flink源码修改
增加显示算子实例与slot之间的对应关系
请求地址
:
数据样例
:
- 增加Taskmanager中对slotsStatus的显示