Apache Flink - is it possible to evenly distribute slot sharing groups?
We have a pipeline with operations, split into 2 workloads - Source -> Transform
are in a first group and are CPU-intensive workloads, they are put into the same slot sharing group, lets say source
. And Sink
, RAM-intensive workload, as it uses Bulk upload and holds amount of data in memory. It's sent to sink
slot sharing group.
Additionally, we have a different parallelism level of Source -> Transform
workload and Sink
workload as the first one is limited by source parallelism. So, for example, we have Source -> Transform
parallelism of 50, meanwhile Sink
parallelism equal to 78. And we have 8 TMs, each with 16 cores (and therefore slots).
In this case, the ideal slots allocation strategy for us seems to be allocating 6-7 slots on each TM for Source -> Transform
, and the rest - for Sink
leading CPU-RAM workloads to be roughly evenly distributed across all TMs.
So, I wonder whether there is some config setting which will tell to distribute slot sharing groups evenly ?
I only found cluster.evenly-spread-out-slots config parameter, but I'm not sure whether it actually evenly distributes slot sharing groups, not only slots - for example, I get TMs with 10 Source -> Transform
tasks meanwhile I would expect 6 or 7.
So, the question is whether it is possible to tell Flink to dsitribute slot sharing groups evenly across cluster ? Or probably there is any other possibility to do it ?
Distribute a Flink operator evenly across taskmanagers seems a bit similar to my question, but I'm mostly asking about slot sharing groups distribution. This topic also contains only suggestion of using cluster.evenly-spread-out-slots but probably something has changed since then.
apache-flinkflink-streaming Share Improve this question asked Oct 30, 2020 at 11:39Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
someStream.filter(...).slotSharingGroup("name");
So, I defined different groups based on the number of tasks slots that I have, together with the parallelism.
Share Improve this answer answered Oct 30, 2020 at 16:21
the feature only guarantees spreading out tasks across the set of TMs which are registered at the time of scheduling. Hence, when you are using the active Yarn mode and submit the first job, then there won't be any TMs registered. Consequently, Flink will allocate the first container, fill it up and then only allocate a new container. However, if you start Flink in standalone mode or after your first job finishes on Yarn there are still some TMs registered, then the next job would be spread out.
So, the idea is to start a detached yarn session with the increased idle containers timeout setting, first submit some short living fake job, which will simply acquires the required amount of resources from YARN and completes, and then start immediately the main pipeline which will be assigned to already allocated containers and in this case the cluster.evenly-spread-out-slots: true
does the trick and distributes all slot sharing groups evenly.
So, to sum up, the following was done to get the evenly distributed slot sharing groups within the job:
resourcemanager.taskmanager-timeout
was increased to allow the main job be submitted before the container released for an idle task manager. I increased this to 1 minute and this was more then enough.- started a
yarn-session
and submitted job dynamically to it. - tweaked the main job to call first for a fake job which simply allocates the resources. In my case, this simple code does the trick before configuring the main pipeline:
val env = StreamExecutionEnvironment.getExecutionEnvironment val job = env .fromElements(0) .map { x => x * 2 } .setParallelism(parallelismMax) .print() val jobResult = env.execute("Resources pre-allocation job") println(jobResult) print("Done. Starting main job!")
https://stackoverflow.com/questions/64607665/apache-flink-is-it-possible-to-evenly-distribute-slot-sharing-groups
source
andsink
, with different parallelism. And the issue is that tasks within the same slot sharing groupsource
are distributed randomly - we can have all 16 slots occupied by samesource
task in a single TM, and vise versa - TMs with onlysink
slot sharing groups. – Mikalai Lushchytski Oct 31, 2020 at 9:40