UE4之TaskGraph系统


TaskGraph(任务图)是UE4实现的一套异步任务并行处理系统。在定义任务的同时,可以指定任务的依赖关系,TaskGraph会按照编排好的依赖关系来运行任务。

任务开始运行前可以指定多个依赖的前置任务,只有前置任务运行结束,本任务才会开始运行。最终,所有任务依赖关系形成一张有向无环图DAG)。

每一个能被其他任务依赖的任务都会创建一个与之关联的FGraphEvent对象(篮框填充区域),任务间的依赖关系就是通过FGraphEvent来建立的。

上图中,D1任务依赖C1和C2任务。当然,任务也可以没有依赖的前置任务。

具体实现代码在:UnrealEngine\Engine\Source\Runtime\Core\Public\Async\TaskGraphInterfaces.hUnrealEngine\Engine\Source\Runtime\Core\Private\Async\TaskGraph.cpp

测试代码:UnrealEngine\Engine\Source\Runtime\Core\Private\Tests\Async\TaskGraphTest.cpp

引擎中有大量逻辑的并行依赖于TaskGraph系统:

① Actor及ActorComponent的Tick

② GC Mark

③ 执行渲染Command

TaskGraph管理两种类型的线程:外部线程(NamedThread)内部线程(AnyThread)

外部线程:非TaskGraph内部创建的线程,包括GameThread、RenderThread、RHIThread、StatsThread和AudioThread。通过FTaskGraphInterface::Get().AttachToThread()函数来添加。

内部线程:TaskGraph在引擎初始化时创建的工作线程(TaskGraphThreadHP、TaskGraphThreadNP、TaskGraphThreadBP,优先级:HP > NP > BP),具体逻辑在FTaskGraphImplementation构造函数中。

内部线程的数量则由FPlatformMisc::NumberOfWorkerThreadsToSpawn()函数来决定。当然如果平台本身不支持多线程,TaskGraph执行的逻辑会放回GameThread中。具体逻辑详见FTaskGraphImplementation构造函数:

ENamedThreads::Type

ENamedThreads::Type为int32类型的枚举,定义了线程类型,Queue Index(队列索引)、Task Priority(任务优先级)、Thread Priority(线程优先级)等。

namespace ENamedThreads
{
    enum Type : int32
    {
        UnusedAnchor = -1,
        /** The always-present, named threads are listed next **/
#if STATS
        StatsThread,   // 0 Stats线程
#endif
        RHIThread,       // 1 RHI线程
        AudioThread,     // 2 Audio线程
        GameThread,    // 3 Game线程
        // The render thread is sometimes the game thread and is sometimes the actual rendering thread
        ActualRenderingThread = GameThread + 1,  // 4 渲染线程
        // CAUTION ThreadedRenderingThread must be the last named thread, insert new named threads before it

        /** not actually a thread index. Means "Unknown Thread" or "Any Unnamed Thread" **/
        AnyThread = 0xff,  // TaskGraph内部线程

        /** High bits are used for a queue index and priority**/

        MainQueue =            0x000,
        LocalQueue =        0x100,

        NumQueues =            2,
        ThreadIndexMask =    0xff,
        QueueIndexMask =    0x100,
        QueueIndexShift =    8,

        /** High bits are used for a queue index task priority and thread priority**/

        NormalTaskPriority =    0x000,  // 正常任务优先级
        HighTaskPriority =        0x200,  // 高任务优先级

        NumTaskPriorities =        2,
        TaskPriorityMask =        0x200,
        TaskPriorityShift =        9,

        NormalThreadPriority = 0x000,     // NP线程  正常优先级调度线程
        HighThreadPriority = 0x400,       // HP线程  高优先级调度线程
        BackgroundThreadPriority = 0x800, // BP线程  低优先级调度线程

        NumThreadPriorities = 3,
        ThreadPriorityMask = 0xC00,
        ThreadPriorityShift = 10,

        /** Combinations **/
#if STATS
        StatsThread_Local = StatsThread | LocalQueue,
#endif
        GameThread_Local = GameThread | LocalQueue,
        ActualRenderingThread_Local = ActualRenderingThread | LocalQueue,

        AnyHiPriThreadNormalTask = AnyThread | HighThreadPriority | NormalTaskPriority,  // 作为正常优先级的任务,并放在HP线程上跑
        AnyHiPriThreadHiPriTask = AnyThread | HighThreadPriority | HighTaskPriority,     // 作为高优先级的任务,并放在HP线程上跑

        AnyNormalThreadNormalTask = AnyThread | NormalThreadPriority | NormalTaskPriority,  // 作为正常优先级的任务,并放在NP线程上跑
        AnyNormalThreadHiPriTask = AnyThread | NormalThreadPriority | HighTaskPriority,     // 作为高优先级的任务,并放在NP线程上跑

        AnyBackgroundThreadNormalTask = AnyThread | BackgroundThreadPriority | NormalTaskPriority,  // 作为正常优先级的任务,并放在BP线程上跑
        AnyBackgroundHiPriTask = AnyThread | BackgroundThreadPriority | HighTaskPriority,           // 作为高优先级的任务,并放在BP线程上跑
    };
    
    ... ...
}

各bit位说明如下:

注1:MainQueue,LocalQueue为FNamedTaskThreadFThreadTaskQueue Queues[ENamedThreads::NumQueues]数组的0号和1号索引对应的队列。

注2:因为NamedThread不像AnyThread那种只循环取任务的,所以无法简单的支持任务分发的递归。譬如没法在GameThread执行的task里再分发一个到GameThread的Task,所以引入额外的LocalQueue。

         发到LocalQueue的Task不会自己Dispatch(分发)和Execute(执行),需要手动ProcessThreadUntilIdle,然后就会在这个Thread上一直执行清空掉LocalQueue里的Task。

操作ENamedThreads::Type全局函数:

namespace ENamedThreads 
{
    // 获取ThreadIndex
    FORCEINLINE Type GetThreadIndex(Type ThreadAndIndex)
    {
        return ((ThreadAndIndex & ThreadIndexMask) == AnyThread) ? AnyThread : Type(ThreadAndIndex & ThreadIndexMask);
    }

    // 获取QueueIndex
    FORCEINLINE int32 GetQueueIndex(Type ThreadAndIndex)
    {
        return (ThreadAndIndex & QueueIndexMask) >> QueueIndexShift;
    }

    // 获取Task优先级
    FORCEINLINE int32 GetTaskPriority(Type ThreadAndIndex)
    {
        return (ThreadAndIndex & TaskPriorityMask) >> TaskPriorityShift;
    }

    // 获取cpu线程优先级
    FORCEINLINE int32 GetThreadPriorityIndex(Type ThreadAndIndex)
    {
        int32 Result = (ThreadAndIndex & ThreadPriorityMask) >> ThreadPriorityShift;
        check(Result >= 0 && Result < NumThreadPriorities);
        return Result;
    }

    // 设置cpu线程优先级和task优先级
    FORCEINLINE Type SetPriorities(Type ThreadAndIndex, Type ThreadPriority, Type TaskPriority)
    {
        check(
            !(ThreadAndIndex & ~ThreadIndexMask) &&  // not a thread index
            !(ThreadPriority & ~ThreadPriorityMask) && // not a thread priority
            (ThreadPriority & ThreadPriorityMask) != ThreadPriorityMask && // not a valid thread priority
            !(TaskPriority & ~TaskPriorityMask) // not a task priority
            );
        return Type(ThreadAndIndex | ThreadPriority | TaskPriority);
    }

    // 设置cpu线程优先级和task优先级 PriorityIndex为0, 1, 2  bHiPri为true表示高优先级,false为正常优先级
    FORCEINLINE Type SetPriorities(Type ThreadAndIndex, int32 PriorityIndex, bool bHiPri)
    {
        check(
            !(ThreadAndIndex & ~ThreadIndexMask) && // not a thread index
            PriorityIndex >= 0 && PriorityIndex < NumThreadPriorities // not a valid thread priority
            );
        return Type(ThreadAndIndex | (PriorityIndex << ThreadPriorityShift) | (bHiPri ? HighTaskPriority : NormalTaskPriority));
    }

    // 设置cpu线程优先级
    FORCEINLINE Type SetThreadPriority(Type ThreadAndIndex, Type ThreadPriority)
    {
        check(
            !(ThreadAndIndex & ~ThreadIndexMask) &&  // not a thread index
            !(ThreadPriority & ~ThreadPriorityMask) && // not a thread priority
            (ThreadPriority & ThreadPriorityMask) != ThreadPriorityMask // not a valid thread priority
            );
        return Type(ThreadAndIndex | ThreadPriority);
    }

    // 设置task优先级
    FORCEINLINE Type SetTaskPriority(Type ThreadAndIndex, Type TaskPriority)
    {
        check(
            !(ThreadAndIndex & ~ThreadIndexMask) &&  // not a thread index
            !(TaskPriority & ~TaskPriorityMask) // not a task priority
            );
        return Type(ThreadAndIndex | TaskPriority);
    }
}

ESubsequentsMode::Type

任务类型有两种:TrackSubsequents(可被其他任务依赖)和FireAndForget(只能执行任务,不能被其他任务依赖)

namespace ESubsequentsMode
{
    enum Type
    {
        /** Necessary when another task will depend on this task. */
        TrackSubsequents,  // 可被其他任务依赖
        /** Can be used to save task graph overhead when firing off a task that will not be a dependency of other tasks. */
        FireAndForget // 只能执行任务,不能被其他任务依赖
    };
}

FTaskThreadBase

FTaskThreadBase继承自FRunnalbeFSingleThreadRunnable,是外部线程(FNamedTaskThread)和内部线程(FTaskThreadAnyThread)的公共基类,提供了统一的函数访问。

class FTaskThreadBase : public FRunnable, FSingleThreadRunnable
{
public:
    // 构造函数
    FTaskThreadBase()
        : ThreadId(ENamedThreads::AnyThread)
        , PerThreadIDTLSSlot(0xffffffff)
        , OwnerWorker(nullptr)
    {
        NewTasks.Reset(128); // NewTasks数组的容量设为128
    }

    // 初始化相关的数据成员
    void Setup(ENamedThreads::Type InThreadId, uint32 InPerThreadIDTLSSlot, FWorkerThread* InOwnerWorker)
    {
        ThreadId = InThreadId;
        check(ThreadId >= 0);
        PerThreadIDTLSSlot = InPerThreadIDTLSSlot;
        OwnerWorker = InOwnerWorker;
    }

    // 把FWorkerThread* OwnerWorker指针设置到槽位为PerThreadIDTLSSlot的TLS数据块中
    void InitializeForCurrentThread()
    {
        FPlatformTLS::SetTlsValue(PerThreadIDTLSSlot,OwnerWorker);
    }

    // 获取ThreadId   注:该ThreadId为该线程在FTaskGraphImplementation的FWorkerThread WorkerThreads[MAX_THREADS]数组中的Index
    ENamedThreads::Type GetThreadId() const
    {
        checkThreadGraph(OwnerWorker); // make sure we are started up
        return ThreadId;
    }

    // 死循环从当前线程的索引为QueueIndex的任务队列中取出Task并执行,直到主动Request跳出循环
    virtual void ProcessTasksUntilQuit(int32 QueueIndex) = 0;

    // 处理完当前线程中所有任务,没有任务可处理时返回
    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex)
    {
        check(0);
        return 0;
    }

    // 从当前线程添加一个Task到当前线程索引为QueueIndex的任务队列中
    virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task)
    {
        check(0);
    }

    // 请求退出索引为QueueIndex的任务队列
    virtual void RequestQuit(int32 QueueIndex) = 0;

    // 从其他线程添加一个Task到当前线程索引为QueueIndex的任务队列中
    virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task)
    {
        check(0);
        return false;
    }

    // 唤醒线程去执行自己索引为QueueIndex的任务队列
    virtual void WakeUp(int32 QueueIndex = 0) = 0;

    // 索引为QueueIndex的任务队列是否正在处理任务
    virtual bool IsProcessingTasks(int32 QueueIndex) = 0;

    // SingleThreaded API

    // 单线程模式下,使用Tick来驱动任务的处理
    virtual void Tick() override
    {
        ProcessTasksUntilIdle(0);
    }


    // 线程Init函数
    virtual bool Init() override
    {
        InitializeForCurrentThread();
        return true;
    }

    // 线程处理函数
    virtual uint32 Run() override
    {
        check(OwnerWorker); // make sure we are started up
        ProcessTasksUntilQuit(0);
        FMemory::ClearAndDisableTLSCachesOnCurrentThread();
        return 0;
    }

    // 线程Stop函数
    virtual void Stop() override
    {
        RequestQuit(-1);
    }

    // 线程Exit函数
    virtual void Exit() override
    {
    }

    // 支持单线程模式
    virtual FSingleThreadRunnable* GetSingleThreadInterface() override
    {
        return this;
    }

protected:

    // 为该线程在FTaskGraphImplementation的FWorkerThread WorkerThreads[MAX_THREADS]数组中的Index
    ENamedThreads::Type                                    ThreadId;
    // 用来存放FWorkerThread* OwnerWorker指针的TLS Slot
    uint32                                                PerThreadIDTLSSlot;
    /** Used to signal stalling. Not safe for synchronization in most cases. **/
    FThreadSafeCounter                                    IsStalled;
    /** Array of tasks for this task thread. */
    TArray NewTasks;
    // Attach到的那个FWorkerThread指针上
    FWorkerThread* OwnerWorker;
};

// 外部线程
class FNamedTaskThread : public FTaskThreadBase
{
public:

    // 死循环从索引为QueueIndex的任务队列中取出Task并执行,直到主动Request跳出循环
    virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
    {
        check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

        Queue(QueueIndex).QuitForReturn = false;
        verify(++Queue(QueueIndex).RecursionGuard == 1);
        const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
        do
        {
            const bool bAllowStall = bIsMultiThread;
            ProcessTasksNamedThread(QueueIndex, bAllowStall); // 多线程模式下,bAllowStall为true
        } while (!Queue(QueueIndex).QuitForReturn && !Queue(QueueIndex).QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
        verify(!--Queue(QueueIndex).RecursionGuard);
    }

    // 处理完索引为QueueIndex的任务队列中所有任务,没有任务可处理时返回
    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
    {
        check(Queue(QueueIndex).StallRestartEvent); // make sure we are started up

        Queue(QueueIndex).QuitForReturn = false;
        verify(++Queue(QueueIndex).RecursionGuard == 1);
        uint64 ProcessedTasks = ProcessTasksNamedThread(QueueIndex, false); // 第2个参数传入false,没有任务时不挂起,而是直接返回
        verify(!--Queue(QueueIndex).RecursionGuard);
        return ProcessedTasks;
    }

    // 循环从索引为QueueIndex的任务队列取出Task来执行   注:bAllowStall为true时,无Task可执行时会挂起
    uint64 ProcessTasksNamedThread(int32 QueueIndex, bool bAllowStall)
    {
        uint64 ProcessedTasks = 0;

        // ... ...

        TStatId StallStatId;
        bool bCountAsStall = false;
        
        // ... ...
        
        const bool bIsRenderThreadMainQueue = (ENamedThreads::GetThreadIndex(ThreadId) == ENamedThreads::ActualRenderingThread) && (QueueIndex == 0);
        while (!Queue(QueueIndex).QuitForReturn)
        {
            const bool bIsRenderThreadAndPolling = bIsRenderThreadMainQueue && (GRenderThreadPollPeriodMs >= 0);
            const bool bStallQueueAllowStall = bAllowStall && !bIsRenderThreadAndPolling;
            FBaseGraphTask* Task = Queue(QueueIndex).StallQueue.Pop(0, bStallQueueAllowStall);
            TestRandomizedThreads();
            if (!Task)
            {
                // ... ...
                if (bAllowStall)
                {
                    {
                        FScopeCycleCounter Scope(StallStatId);
                        Queue(QueueIndex).StallRestartEvent->Wait(bIsRenderThreadAndPolling ? GRenderThreadPollPeriodMs : MAX_uint32, bCountAsStall); // 挂起
                        if (Queue(QueueIndex).QuitForShutdown)
                        {
                            return ProcessedTasks;
                        }
                        TestRandomizedThreads();
                    }
                    
                    // ... ...
                    
                    continue;
                }
                else
                {
                    break; // we were asked to quit
                }
            }
            else
            {
                Task->Execute(NewTasks, ENamedThreads::Type(ThreadId | (QueueIndex << ENamedThreads::QueueIndexShift))); // 执行Task
                ProcessedTasks++;
                TestRandomizedThreads();
            }
        }
        
        // ... ...
        
        return ProcessedTasks;
    }
    
    // 从当前线程将Task加入到索引为QueueIndex的任务队列中
    virtual void EnqueueFromThisThread(int32 QueueIndex, FBaseGraphTask* Task) override
    {
        checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up
        uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1;
        int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex);
        check(ThreadToStart < 0); // if I am stalled, then how can I be queueing a task?
    }

    // 请求退出
    virtual void RequestQuit(int32 QueueIndex) override
    {
        // this will not work under arbitrary circumstances. For example you should not attempt to stop threads unless they are known to be idle.
        if (!Queue(0).StallRestartEvent)
        {
            return;
        }
        if (QueueIndex == -1) // 退出MainQueue和LocalQueue
        {
            // we are shutting down
            checkThreadGraph(Queue(0).StallRestartEvent); // make sure we are started up
            checkThreadGraph(Queue(1).StallRestartEvent); // make sure we are started up
            Queue(0).QuitForShutdown = true;
            Queue(1).QuitForShutdown = true;
            Queue(0).StallRestartEvent->Trigger();
            Queue(1).StallRestartEvent->Trigger();
        }
        else
        {
            checkThreadGraph(Queue(QueueIndex).StallRestartEvent); // make sure we are started up
            Queue(QueueIndex).QuitForReturn = true;
        }
    }

    // 从其他线程将Task加入到索引为QueueIndex的任务队列中
    virtual bool EnqueueFromOtherThread(int32 QueueIndex, FBaseGraphTask* Task) override
    {
        TestRandomizedThreads();
        checkThreadGraph(Task && Queue(QueueIndex).StallRestartEvent); // make sure we are started up

        uint32 PriIndex = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn) ? 0 : 1;
        int32 ThreadToStart = Queue(QueueIndex).StallQueue.Push(Task, PriIndex);

        if (ThreadToStart >= 0)
        {
            checkThreadGraph(ThreadToStart == 0);
            QUICK_SCOPE_CYCLE_COUNTER(STAT_TaskGraph_EnqueueFromOtherThread_Trigger);
            TASKGRAPH_SCOPE_CYCLE_COUNTER(1, STAT_TaskGraph_EnqueueFromOtherThread_Trigger);
            Queue(QueueIndex).StallRestartEvent->Trigger(); // 唤醒
            return true;
        }
        return false;
    }

    // 索引为QueueIndex的任务队列是否正在处理任务
    virtual bool IsProcessingTasks(int32 QueueIndex) override
    {
        return !!Queue(QueueIndex).RecursionGuard;
    }

    // 唤醒索引为QueueIndex的任务队列,来继续执行Task
    virtual void WakeUp(int32 QueueIndex) override
    {
        QUICK_SCOPE_CYCLE_COUNTER(STAT_TaskGraph_Wakeup_Trigger);
        TASKGRAPH_SCOPE_CYCLE_COUNTER(1, STAT_TaskGraph_Wakeup_Trigger);
        Queue(QueueIndex).StallRestartEvent->Trigger(); // 唤醒
    }

private:

    // ... ...

    // NamedThread任务队列的结构体
    struct FThreadTaskQueue
    {
        // 对应2个Task优先级(NormalTaskPriority、HighTaskPriority)的任务队列
        FStallingTaskQueue2> StallQueue;

        // 防止递归。为1表示正在处理任务,为0表示没有处理任务
        uint32 RecursionGuard;

        // 是否要返回。返回时跳出任务循环
        bool QuitForReturn;

        // 是否要关闭。Shutdown时会跳出任务循环
        bool QuitForShutdown;

        // 用于阻塞和唤醒线程
        FEvent*    StallRestartEvent;

        FThreadTaskQueue()
            : RecursionGuard(0)
            , QuitForReturn(false)
            , QuitForShutdown(false)
            , StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
        {

        }
        ~FThreadTaskQueue()
        {
            FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
            StallRestartEvent = nullptr;
        }
    };

    // 返回索引为QueueIndex的任务队列
    FORCEINLINE FThreadTaskQueue& Queue(int32 QueueIndex)
    {
        checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
        return Queues[QueueIndex];
    }
    FORCEINLINE const FThreadTaskQueue& Queue(int32 QueueIndex) const
    {
        checkThreadGraph(QueueIndex >= 0 && QueueIndex < ENamedThreads::NumQueues);
        return Queues[QueueIndex];
    }

    // MainQueue和LocalQueue任务队列
    FThreadTaskQueue Queues[ENamedThreads::NumQueues]; // 注:ENamedThreads::NumQueues为2
};

// 内部线程
class FTaskThreadAnyThread : public FTaskThreadBase
{
public:
    FTaskThreadAnyThread(int32 InPriorityIndex)
        : PriorityIndex(InPriorityIndex)
    {
    }
    
    // 死循环从当前线程的任务队列中取出Task并执行,直到主动Request跳出循环  注:AnyThread只有1个队列,QueueIndex始终为0
    virtual void ProcessTasksUntilQuit(int32 QueueIndex) override
    {
        if (PriorityIndex != (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift))
        {
            FMemory::SetupTLSCachesOnCurrentThread();
        }
        check(!QueueIndex);
        const bool bIsMultiThread = FTaskGraphInterface::IsMultithread();
        do
        {
            ProcessTasks();            
        } while (!Queue.QuitForShutdown && bIsMultiThread); // @Hack - quit now when running with only one thread.
    }

    //支持单线程模式  注:AnyThread只有1个队列,QueueIndex始终为0
    virtual uint64 ProcessTasksUntilIdle(int32 QueueIndex) override
    {
        if (FTaskGraphInterface::IsMultithread() == false)
        {
            return ProcessTasks();
        }
        else
        {
            check(0);
            return 0;
        }
    }

    // 请求退出  注:AnyThread只有1个队列,QueueIndex始终为0
    virtual void RequestQuit(int32 QueueIndex) override
    {
        check(QueueIndex < 1);

        // this will not work under arbitrary circumstances. For example you should not attempt to stop threads unless they are known to be idle.
        checkThreadGraph(Queue.StallRestartEvent); // make sure we are started up
        Queue.QuitForShutdown = true;
        Queue.StallRestartEvent->Trigger(); // 唤醒
    }

    // 唤醒任务队列,来继续处理任务  注:AnyThread只有1个队列,QueueIndex始终为0
    virtual void WakeUp(int32 QueueIndex = 0) final override
    {
        QUICK_SCOPE_CYCLE_COUNTER(STAT_TaskGraph_Wakeup_Trigger);
        TASKGRAPH_SCOPE_CYCLE_COUNTER(1, STAT_TaskGraph_Wakeup_Trigger);
        Queue.StallRestartEvent->Trigger(); // 唤醒
    }

    // ... ...
    
    // 的任务队列是否正在处理任务  注:AnyThread只有1个队列,QueueIndex始终为0
    virtual bool IsProcessingTasks(int32 QueueIndex) override
    {
        check(!QueueIndex);
        return !!Queue.RecursionGuard;
    }

    // ... ...

private:

    // ... ...

    // 死循环处理任务
    uint64 ProcessTasks()
    {
        LLM_SCOPE(ELLMTag::TaskGraphTasksMisc);

        TStatId StallStatId;
        bool bCountAsStall = true;
        uint64 ProcessedTasks = 0;
        
        // ... ...
        
        verify(++Queue.RecursionGuard == 1);
        bool bDidStall = false;
        while (1)
        {
            FBaseGraphTask* Task = FindWork();
            if (!Task)
            {
                // ... ...

                TestRandomizedThreads();
                const bool bIsMultithread = FTaskGraphInterface::IsMultithread();
                if (bIsMultithread)
                {
                    FScopeCycleCounter Scope(StallStatId);
                    Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall); // 挂起
                    bDidStall = true;
                }
                if (Queue.QuitForShutdown || !bIsMultithread)
                {
                    break;
                }
                TestRandomizedThreads();

                // ... ...
                continue;
            }
            TestRandomizedThreads();

            // ... ...
            
            bDidStall = false;
            Task->Execute(NewTasks, ENamedThreads::Type(ThreadId)); // 执行任务
            ProcessedTasks++;
            TestRandomizedThreads();
            
            // ... ...
        }
        verify(!--Queue.RecursionGuard);
        return ProcessedTasks;
    }

    // AnyThread任务队列相关基础数据的结构体
    struct FThreadTaskQueue
    {
        // 用于阻塞和唤醒线程
        FEvent* StallRestartEvent;
        // 防止递归。为1表示正在处理任务,为0表示没有处理任务
        uint32 RecursionGuard;
        // 是否要关闭。Shutdown时会跳出任务循环
        bool QuitForShutdown;
        // 用于调试命令。Stall为true时,线程的执行;为false则恢复。
        bool bStallForTuning;
        FCriticalSection StallForTuning;

        FThreadTaskQueue()
            : StallRestartEvent(FPlatformProcess::GetSynchEventFromPool(false))
            , RecursionGuard(0)
            , QuitForShutdown(false)
            , bStallForTuning(false)
        {

        }
        ~FThreadTaskQueue()
        {
            FPlatformProcess::ReturnSynchEventToPool(StallRestartEvent);
            StallRestartEvent = nullptr;
        }
    };

    // 从FTaskGraphImplementation的IncomingAnyThreadTasks对应任务队列中Pop出一个Task  
    FBaseGraphTask* FindWork()
    {
        return FTaskGraphImplementation::Get().FindWork(ThreadId);
    }

    // 任务队列相关基础数据 注:AnyThread的任务队列保存FTaskGraphImplementation的IncomingAnyThreadTasks中
    FThreadTaskQueue Queue;

    // 线程的级别。HP(0)、BP(1)、NP(2)
    int32 PriorityIndex;
};

FTaskGraphInterface

FTaskGraphInterface定义了TaskGraph的公共API接口。

class FTaskGraphInterface
{
    // ... ...
public:

    // ... ...
    
    // 创建FTaskGraphImplementation对象,并调用其构造函数来完成初始化操作。该函数在FEngineLoop::PreInitPreStartupScreen中被调用
    static CORE_API void Startup(int32 NumThreads);
    
    // 关闭TaskGraph系统,进行释放销毁工作
    static CORE_API void Shutdown();
    
    // TaskGraph系统是否还在运行。判断FTaskGraphImplementation* TaskGraphImplementationSingleton是否空
    static CORE_API bool IsRunning();
    
    // 返回FTaskGraphImplementation* TaskGraphImplementationSingleton单例对象
    static CORE_API FTaskGraphInterface& Get();

    // 是否为多线程模式
    static bool IsMultithread();

    // 【API接口】  -- 被派生类FTaskGraphImplementation实现
    virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue = false) = 0;
    virtual    int32 GetNumWorkerThreads() = 0;
    virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) = 0;
    virtual void AttachToThread(ENamedThreads::Type CurrentThread)=0;
    virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread)=0; // 处理完CurrentThread中所有任务,没有任务可处理时返回  注:CurrentThread为NamedThread类型
    virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread)=0; // 死循环从CurrentThread队列中取出Task并执行,直到主动Request跳出循环  注:CurrentThread为NamedThread类型
    virtual void RequestReturn(ENamedThreads::Type CurrentThread)=0; // 主动从CurrentThread执行的Task队列循环中跳出,返回  注:CurrentThread为NamedThread类型
    virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)=0; // 等待直到Tasks任务完成
    virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask)=0;
    void WaitUntilTaskCompletes(const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        WaitUntilTasksComplete({ Task }, CurrentThreadIfKnown);
    }

    void WaitUntilTaskCompletes(FGraphEventRef&& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        WaitUntilTasksComplete({ MoveTemp(Task) }, CurrentThreadIfKnown);
    }
    void TriggerEventWhenTaskCompletes(FEvent* InEvent, const FGraphEventRef& Task, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask)
    {
        FGraphEventArray Prerequistes;
        Prerequistes.Add(Task);
        TriggerEventWhenTasksComplete(InEvent, Prerequistes, CurrentThreadIfKnown, TriggerThread);
    }
    virtual void AddShutdownCallback(TFunction<void()>& Callback) = 0;
    virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) = 0; // 唤醒ThreadToWake对应的线程

    // ... ...
};

FTaskGraphImplementation

FTaskGraphImplementationFTaskGraphInterface接口类上继承,实现了具体的功能。

FNamedTaskThreadFTaskThreadAnyThread则被封装到FWorkerThread中进行统一管理。

struct FWorkerThread
{
    FTaskThreadBase*    TaskGraphWorker;  // TaskGraph线程对象,具体分为AnyThread和NamedThread
    FRunnableThread*    RunnableThread; // 跑在cpu上线程。  当TaskGraphWorker为AnyThread时,RunnableThread才会有值;为NamedThread时,RunnableThread为nullptr
    bool                bAttached; // 是否被绑定

    /** Constructor to set reasonable defaults. **/
    FWorkerThread()
        : TaskGraphWorker(nullptr)
        , RunnableThread(nullptr)
        , bAttached(false)
    {
    }
};

FTaskGraphImplementation成员变量FWorkerThread WorkerThreads[MAX_THREADS]保存TaskGraph系统用到所有的线程对象,分布如下:

class FTaskGraphImplementation : public FTaskGraphInterface
{
public:

    // 获取静态全局FTaskGraphImplementation单例对象
    static FTaskGraphImplementation& Get()
    {        
        checkThreadGraph(TaskGraphImplementationSingleton);
        return *TaskGraphImplementationSingleton;
    }

    // 初始化,并为内部线程(AnyThread)创建对应的cpu执行线程
    FTaskGraphImplementation(int32)
    {
        bCreatedHiPriorityThreads = !!ENamedThreads::bHasHighPriorityThreads; // IOS下,ENamedThreads::bHasHighPriorityThreads为0;其他系统下为1  注:用!!是将int32类型的ENamedThreads::bHasHighPriorityThreads转换为bool类型
        bCreatedBackgroundPriorityThreads = !!ENamedThreads::bHasBackgroundThreads; // IOS下,ENamedThreads::bHasBackgroundThreads为0;其他系统下为1

        int32 MaxTaskThreads = MAX_THREADS; // 非IOS下,MAX_THREADS为83;IOS下,MAX_THREADS为31。不编译STATS宏,会都再减少1个
        int32 NumTaskThreads = FPlatformMisc::NumberOfWorkerThreadsToSpawn(); // 内部线程(AnyThread)的个数初始化为当前设备cpu核心数

        // if we don't want any performance-based threads, then force the task graph to not create any worker threads, and run in game thread
        if (!FTaskGraphInterface::IsMultithread()) // 不支持多线程时
        {
            // this is the logic that used to be spread over a couple of places, that will make the rest of this function disable a worker thread
            // @todo: it could probably be made simpler/clearer
            // this - 1 tells the below code there is no rendering thread
            MaxTaskThreads = 1;
            NumTaskThreads = 1;
            LastExternalThread = (ENamedThreads::Type)(ENamedThreads::ActualRenderingThread - 1);
            bCreatedHiPriorityThreads = false;
            bCreatedBackgroundPriorityThreads = false;
            ENamedThreads::bHasBackgroundThreads = 0;
            ENamedThreads::bHasHighPriorityThreads = 0;
        }
        else
        {
            LastExternalThread = ENamedThreads::ActualRenderingThread;

            if (FForkProcessHelper::IsForkedMultithreadInstance())
            {
                NumTaskThreads = CVar_ForkedProcess_MaxWorkerThreads;
            }
        }
        
        NumNamedThreads = LastExternalThread + 1; // 外部线程(NamedThread)总数

        NumTaskThreadSets = 1 + bCreatedHiPriorityThreads + bCreatedBackgroundPriorityThreads; // 内部线程(AnyThread)的优先级档位的数量。如:当前有3个线程优先级(HP、BP和NP)

        // if we don't have enough threads to allow all of the sets asked for, then we can't create what was asked for.
        check(NumTaskThreadSets == 1 || FMath::Min(NumTaskThreads * NumTaskThreadSets + NumNamedThreads, MAX_THREADS) == NumTaskThreads * NumTaskThreadSets + NumNamedThreads);
        NumThreads = FMath::Max(FMath::Min(NumTaskThreads * NumTaskThreadSets + NumNamedThreads, MAX_THREADS), NumNamedThreads + 1);

        // Cap number of extra threads to the platform worker thread count
        // if we don't have enough threads to allow all of the sets asked for, then we can't create what was asked for.
        check(NumTaskThreadSets == 1 || FMath::Min(NumThreads, NumNamedThreads + NumTaskThreads * NumTaskThreadSets) == NumThreads);
        NumThreads = FMath::Min(NumThreads, NumNamedThreads + NumTaskThreads * NumTaskThreadSets); // 为TaskGraph管理的线程总数。包括外部线程(NamedThread)和内部线程(AnyThread)。

        NumTaskThreadsPerSet = (NumThreads - NumNamedThreads) / NumTaskThreadSets; // 内部线程(AnyThread)的每档数量。注:HP、BP和NP的数量都是一样的
        check((NumThreads - NumNamedThreads) % NumTaskThreadSets == 0); // should be equal numbers of threads per priority set

        UE_LOG(LogTaskGraph, Log, TEXT("Started task graph with %d named threads and %d total threads with %d sets of task threads."), NumNamedThreads, NumThreads, NumTaskThreadSets);
        check(NumThreads - NumNamedThreads >= 1);  // 保证至少有一个内部线程(AnyThread)
        check(NumThreads <= MAX_THREADS);
        check(!ReentrancyCheck.GetValue()); // reentrant?
        ReentrancyCheck.Increment(); // just checking for reentrancy
        PerThreadIDTLSSlot = FPlatformTLS::AllocTlsSlot();  // 分配TLS Slot数据

        for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++)
        {
            check(!WorkerThreads[ThreadIndex].bAttached); // reentrant?
            bool bAnyTaskThread = ThreadIndex >= NumNamedThreads;  // NumNamedThreads为5
            if (bAnyTaskThread)
            {
                WorkerThreads[ThreadIndex].TaskGraphWorker = new FTaskThreadAnyThread(ThreadIndexToPriorityIndex(ThreadIndex)); // 从索引5开始为内部线程(AnyThread)
            }
            else
            {
                WorkerThreads[ThreadIndex].TaskGraphWorker = new FNamedTaskThread;// 索引0-4为外部线程(NamedThread)
            }
            WorkerThreads[ThreadIndex].TaskGraphWorker->Setup(ENamedThreads::Type(ThreadIndex), PerThreadIDTLSSlot, &WorkerThreads[ThreadIndex]); // 初始化FTaskThreadBase,设置PerThreadIDTLSSlot到TLS数据中
        }

        TaskGraphImplementationSingleton = this; // 将this指针赋值给static FTaskGraphImplementation* TaskGraphImplementationSingleton静态全局单例

        // 根据优先级档位(HP、BP和NP),为内部线程(AnyThread)创建对应的cpu执行线程,并返回RunnableThread
        const TCHAR* PrevGroupName = nullptr;
        for (int32 ThreadIndex = LastExternalThread + 1; ThreadIndex < NumThreads; ThreadIndex++)
        {
            FString Name;
            const TCHAR* GroupName = TEXT("TaskGraphNormal");
            int32 Priority = ThreadIndexToPriorityIndex(ThreadIndex);
            // These are below normal threads so that they sleep when the named threads are active
            EThreadPriority ThreadPri;
            uint64 Affinity = FPlatformAffinity::GetTaskGraphThreadMask();
            if (Priority == 1)
            {
                Name = FString::Printf(TEXT("TaskGraphThreadHP %d"), ThreadIndex - (LastExternalThread + 1));
                GroupName = TEXT("TaskGraphHigh");
                ThreadPri = TPri_SlightlyBelowNormal; // we want even hi priority tasks below the normal threads

                // If the platform defines FPlatformAffinity::GetTaskGraphHighPriorityTaskMask then use it
                if (FPlatformAffinity::GetTaskGraphHighPriorityTaskMask() != 0xFFFFFFFFFFFFFFFF)
                {
                    Affinity = FPlatformAffinity::GetTaskGraphHighPriorityTaskMask();
                }
            }
            else if (Priority == 2)
            {
                Name = FString::Printf(TEXT("TaskGraphThreadBP %d"), ThreadIndex - (LastExternalThread + 1));
                GroupName = TEXT("TaskGraphLow");
                ThreadPri = TPri_Lowest;
                // If the platform defines FPlatformAffinity::GetTaskGraphBackgroundTaskMask then use it
                if ( FPlatformAffinity::GetTaskGraphBackgroundTaskMask() != 0xFFFFFFFFFFFFFFFF )
                {
                    Affinity = FPlatformAffinity::GetTaskGraphBackgroundTaskMask();
                }
            }
            else
            {
                Name = FString::Printf(TEXT("TaskGraphThreadNP %d"), ThreadIndex - (LastExternalThread + 1));
                ThreadPri = TPri_BelowNormal; // we want normal tasks below normal threads like the game thread
            }
#if WITH_EDITOR
            uint32 StackSize = 1024 * 1024;
#elif ( UE_BUILD_SHIPPING || UE_BUILD_TEST )
            uint32 StackSize = 384 * 1024;
#else
            uint32 StackSize = 512 * 1024;
#endif
            if (GroupName != PrevGroupName)
            {
                Trace::ThreadGroupEnd();
                Trace::ThreadGroupBegin(GroupName);
                PrevGroupName = GroupName;
            }

            // We only create forkable threads on the Forked instance since the TaskGraph needs to be shutdown and recreated to properly make the switch from singlethread to multithread.
            if (FForkProcessHelper::IsForkedMultithreadInstance() && GAllowTaskGraphForkMultithreading)
            {
                WorkerThreads[ThreadIndex].RunnableThread = FForkProcessHelper::CreateForkableThread(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity);
            }
            else
            {
                WorkerThreads[ThreadIndex].RunnableThread = FRunnableThread::Create(&Thread(ThreadIndex), *Name, StackSize, ThreadPri, Affinity); 
            }
            
            WorkerThreads[ThreadIndex].bAttached = true;  // 将bAttached设置为true
        }
        Trace::ThreadGroupEnd();
    }
    
    virtual ~FTaskGraphImplementation()
    {
        for (auto& Callback : ShutdownCallbacks)
        {
            Callback();  // 触发所有的Shutdown回调
        }
        ShutdownCallbacks.Empty();
        for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++)
        {
            Thread(ThreadIndex).RequestQuit(-1); // 请求退出。会主动从当前线程执行的Task队列循环中跳出
        }
        for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++)
        {
            if (ThreadIndex > LastExternalThread) // 销毁所有内部线程(AnyThread)的RunnableThread
            {
                WorkerThreads[ThreadIndex].RunnableThread->WaitForCompletion();
                delete WorkerThreads[ThreadIndex].RunnableThread;
                WorkerThreads[ThreadIndex].RunnableThread = NULL;
            }
            WorkerThreads[ThreadIndex].bAttached = false; // 将bAttached设置为false
        }
        TaskGraphImplementationSingleton = NULL;
        NumTaskThreadsPerSet = 0;
        FPlatformTLS::FreeTlsSlot(PerThreadIDTLSSlot); // 释放TLS Slot数据
    }
// 从当前CurrentThreadIfKnown线程将Task添加到ThreadToExecuteOn线程的Task队列中
    virtual void QueueTask(FBaseGraphTask* Task, ENamedThreads::Type ThreadToExecuteOn, ENamedThreads::Type InCurrentThreadIfKnown = ENamedThreads::AnyThread) final override
    {
        TASKGRAPH_SCOPE_CYCLE_COUNTER(2, STAT_TaskGraph_QueueTask);

        if (ENamedThreads::GetThreadIndex(ThreadToExecuteOn) == ENamedThreads::AnyThread) // 如果任务要执行的线程为AnyThread
        {
            TASKGRAPH_SCOPE_CYCLE_COUNTER(3, STAT_TaskGraph_QueueTask_AnyThread);
            if (FTaskGraphInterface::IsMultithread()) // 如果支持多线程
            {
                uint32 TaskPriority = ENamedThreads::GetTaskPriority(Task->ThreadToExecuteOn); // 获取任务的优先级
                int32 Priority = ENamedThreads::GetThreadPriorityIndex(Task->ThreadToExecuteOn); // 获取所要运行线程的优先级
                if (Priority == (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedBackgroundPriorityThreads || !ENamedThreads::bHasBackgroundThreads))
                {
                    Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have background threads, promote to normal
                    TaskPriority = ENamedThreads::NormalTaskPriority >> ENamedThreads::TaskPriorityShift; // demote to normal task pri
                }
                else if (Priority == (ENamedThreads::HighThreadPriority >> ENamedThreads::ThreadPriorityShift) && (!bCreatedHiPriorityThreads || !ENamedThreads::bHasHighPriorityThreads))
                {
                    Priority = ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift; // we don't have hi priority threads, demote to normal
                    TaskPriority = ENamedThreads::HighTaskPriority >> ENamedThreads::TaskPriorityShift; // promote to hi task pri
                }
                uint32 PriIndex = TaskPriority ? 0 : 1; // PriIndex为任务优先级(NormalTask、HighTask)
                check(Priority >= 0 && Priority < MAX_THREAD_PRIORITIES); // Priority为线程级别(HP、BP、NP)
                {
                    TASKGRAPH_SCOPE_CYCLE_COUNTER(4, STAT_TaskGraph_QueueTask_IncomingAnyThreadTasks_Push);
                    int32 IndexToStart = IncomingAnyThreadTasks[Priority].Push(Task, PriIndex); // 投递到IncomingAnyThreadTasks对应线程级别(HP、BP、NP)的对应任务优先级(NormalTask、HighTask)的队列中,并返回某个已唤醒Task的线程索引IndexToStart
                    if (IndexToStart >= 0) // IndexToStart是否有效
                    {
                        StartTaskThread(Priority, IndexToStart); // Wakeup对应的AnyThread
                    }
                }
                return;
            }
            else
            {
                ThreadToExecuteOn = ENamedThreads::GameThread;  // 不支持多线程,直接跑在GameThread上
            }
        }
        ENamedThreads::Type CurrentThreadIfKnown;
        if (ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown) == ENamedThreads::AnyThread) // 当前线程为AnyThread
        {
            CurrentThreadIfKnown = GetCurrentThread();// 从当前线程的TLS数据上获取ENamedThreads::Type
        }
        else
        {
            CurrentThreadIfKnown = ENamedThreads::GetThreadIndex(InCurrentThreadIfKnown); // 获取当前线程在WorkerThreads数组上的Index
            checkThreadGraph(CurrentThreadIfKnown == ENamedThreads::GetThreadIndex(GetCurrentThread()));
        }
        {
            int32 QueueToExecuteOn = ENamedThreads::GetQueueIndex(ThreadToExecuteOn);  // 获取任务要执行的线程所在的Queue Index(MainQueue、LocalQueue)
            ThreadToExecuteOn = ENamedThreads::GetThreadIndex(ThreadToExecuteOn);  // 获取任务要执行的线程所在WorkerThreads数组上的Index
            FTaskThreadBase* Target = &Thread(ThreadToExecuteOn); // 获取WorkerThreads数组索引为ThreadToExecuteOn的FTaskThreadBase对象指针
            if (ThreadToExecuteOn == ENamedThreads::GetThreadIndex(CurrentThreadIfKnown)) // 当前发起的线程与要执行的线程为同一线程时
            {
                Target->EnqueueFromThisThread(QueueToExecuteOn, Task);
            }
            else
            {
                Target->EnqueueFromOtherThread(QueueToExecuteOn, Task);
            }
        }
    }

    // 获取各档位(HP、BP、NP)中AnyThread的个数 注:HP、BP和NP的数量都是一样的
    virtual    int32 GetNumWorkerThreads() final override
    {
        int32 Result = (NumThreads - NumNamedThreads) / NumTaskThreadSets - GNumWorkerThreadsToIgnore;
        check(Result > 0); // can't tune it to zero task threads
        return Result;
    }
// 获取当前线程的ENamedThreads::Type 注:如果bLocalQueue为true,则为NameThread时,会在返回的ENamedThreads::Type上加上ENamedThreads::LocalQueue
virtual ENamedThreads::Type GetCurrentThreadIfKnown(bool bLocalQueue) final override { ENamedThreads::Type Result = GetCurrentThread(); // 从当前线程的TLS数据上获取ENamedThreads::Type if (bLocalQueue && ENamedThreads::GetThreadIndex(Result) >= 0 && ENamedThreads::GetThreadIndex(Result) < NumNamedThreads) // bLocalQueue为true,且当前为NameThread { Result = ENamedThreads::Type(int32(Result) | int32(ENamedThreads::LocalQueue)); } return Result; } // ThreadToCheck线程是否正在处理任务 注:ThreadToCheck需为NameThread virtual bool IsThreadProcessingTasks(ENamedThreads::Type ThreadToCheck) final override { int32 QueueIndex = ENamedThreads::GetQueueIndex(ThreadToCheck); ThreadToCheck = ENamedThreads::GetThreadIndex(ThreadToCheck); check(ThreadToCheck >= 0 && ThreadToCheck < NumNamedThreads); return Thread(ThreadToCheck).IsProcessingTasks(QueueIndex); } // 将CurrentThread绑定到WorkerThreads数组上,并调用InitializeForCurrentThread函数将设置PerThreadIDTLSSlot到CurrentThread的TLS数据中 注:CurrentThread需为NameThread virtual void AttachToThread(ENamedThreads::Type CurrentThread) final override { CurrentThread = ENamedThreads::GetThreadIndex(CurrentThread); check(NumTaskThreadsPerSet); check(CurrentThread >= 0 && CurrentThread < NumNamedThreads); check(!WorkerThreads[CurrentThread].bAttached); Thread(CurrentThread).InitializeForCurrentThread(); } // 处理完CurrentThread中所有任务,没有任务可处理时返回 注:CurrentThread需为NameThread virtual uint64 ProcessThreadUntilIdle(ENamedThreads::Type CurrentThread) final override { SCOPED_NAMED_EVENT(ProcessThreadUntilIdle, FColor::Red); int32 QueueIndex = ENamedThreads::GetQueueIndex(CurrentThread); CurrentThread = ENamedThreads::GetThreadIndex(CurrentThread); check(CurrentThread >= 0 && CurrentThread < NumNamedThreads); check(CurrentThread == GetCurrentThread()); return Thread(CurrentThread).ProcessTasksUntilIdle(QueueIndex); } // 死循环从CurrentThread的队列中取出Task并执行,直到主动Request跳出循环 注:CurrentThread需为NameThread virtual void ProcessThreadUntilRequestReturn(ENamedThreads::Type CurrentThread) final override { int32 QueueIndex = ENamedThreads::GetQueueIndex(CurrentThread); CurrentThread = ENamedThreads::GetThreadIndex(CurrentThread); check(CurrentThread >= 0 && CurrentThread < NumNamedThreads); check(CurrentThread == GetCurrentThread()); Thread(CurrentThread).ProcessTasksUntilQuit(QueueIndex); } // 主动从CurrentThread执行的Task队列循环中跳出,返回 注:CurrentThread需为NameThread virtual void RequestReturn(ENamedThreads::Type CurrentThread) final override { int32 QueueIndex = ENamedThreads::GetQueueIndex(CurrentThread); CurrentThread = ENamedThreads::GetThreadIndex(CurrentThread); check(CurrentThread != ENamedThreads::AnyThread); Thread(CurrentThread).RequestQuit(QueueIndex); } // 在CurrentThreadIfKnown上等待FGraphEventArray Tasks对应的任务完成后再继续执行 virtual void WaitUntilTasksComplete(const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread) final override { TRACE_CPUPROFILER_EVENT_SCOPE(WaitUntilTasksComplete); ENamedThreads::Type CurrentThread = CurrentThreadIfKnown; if (ENamedThreads::GetThreadIndex(CurrentThreadIfKnown) == ENamedThreads::AnyThread) // CurrentThreadIfKnown为AnyThread { bool bIsHiPri = !!ENamedThreads::GetTaskPriority(CurrentThreadIfKnown); int32 Priority = ENamedThreads::GetThreadPriorityIndex(CurrentThreadIfKnown); check(!ENamedThreads::GetQueueIndex(CurrentThreadIfKnown)); CurrentThreadIfKnown = ENamedThreads::GetThreadIndex(GetCurrentThread()); CurrentThread = ENamedThreads::SetPriorities(CurrentThreadIfKnown, Priority, bIsHiPri); } else { CurrentThreadIfKnown = ENamedThreads::GetThreadIndex(CurrentThreadIfKnown); check(CurrentThreadIfKnown == ENamedThreads::GetThreadIndex(GetCurrentThread())); // we don't modify CurrentThread here because it might be a local queue } if (CurrentThreadIfKnown != ENamedThreads::AnyThread && CurrentThreadIfKnown < NumNamedThreads && !IsThreadProcessingTasks(CurrentThread)) // 为NameThread { if (Tasks.Num() < 8) // don't bother to check for completion if there are lots of prereqs...too expensive to check { bool bAnyPending = false; for (int32 Index = 0; Index < Tasks.Num(); Index++) { FGraphEvent* Task = Tasks[Index].GetReference(); if (Task && !Task->IsComplete()) { bAnyPending = true; break; } } if (!bAnyPending) // 8个任务全部完成,直接return { return; } } // 在CurrentThread上创建前置任务为FGraphEventArray Tasks的FReturnGraphTask任务, 并自动执行 TGraphTask::CreateTask(&Tasks, CurrentThread).ConstructAndDispatchWhenReady(CurrentThread); ProcessThreadUntilRequestReturn(CurrentThread); //等待CurrentThread线程下的任务都处理完毕 } else //为AnyThread { if (!FTaskGraphInterface::IsMultithread()) // 为非多线程模式 { bool bAnyPending = false; for (int32 Index = 0; Index < Tasks.Num(); Index++) { FGraphEvent* Task = Tasks[Index].GetReference(); if (Task && !Task->IsComplete()) { bAnyPending = true; break; } } if (!bAnyPending) // 所有任务都完成,直接return { return; } UE_LOG(LogTaskGraph, Fatal, TEXT("Recursive waits are not allowed in single threaded mode.")); } // We will just stall this thread on an event while we wait FScopedEvent Event; TriggerEventWhenTasksComplete(Event.Get(), Tasks, CurrentThreadIfKnown); // 任务都完成后,触发Event结束等待 } } // 在CurrentThreadIfKnown线程上等待FGraphEventArray Tasks对应的任务完成 virtual void TriggerEventWhenTasksComplete(FEvent* InEvent, const FGraphEventArray& Tasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread, ENamedThreads::Type TriggerThread = ENamedThreads::AnyHiPriThreadHiPriTask) final override { check(InEvent); bool bAnyPending = true; if (Tasks.Num() < 8) // don't bother to check for completion if there are lots of prereqs...too expensive to check { bAnyPending = false; for (int32 Index = 0; Index < Tasks.Num(); Index++) { FGraphEvent* Task = Tasks[Index].GetReference(); if (Task && !Task->IsComplete()) { bAnyPending = true; break; } } } if (!bAnyPending) { TestRandomizedThreads(); InEvent->Trigger(); return; } // 创建CurrentThread等待FGraphEventArray Tasks对应的任务完成的FTriggerEventGraphTask任务, 并自动执行。都完成后,触发Event结束等待 TGraphTask::CreateTask(&Tasks, CurrentThreadIfKnown).ConstructAndDispatchWhenReady(InEvent, TriggerThread); } // 添加Shutdown时的Callback监听回调 virtual void AddShutdownCallback(TFunction<void()>& Callback) { ShutdownCallbacks.Emplace(Callback); } // 唤醒ThreadToWake线程去处理任务 注:ThreadToWake需为NameThread virtual void WakeNamedThread(ENamedThreads::Type ThreadToWake) override { const ENamedThreads::Type ThreadIndex = ENamedThreads::GetThreadIndex(ThreadToWake); if (ThreadIndex < NumNamedThreads) { Thread(ThreadIndex).WakeUp(ENamedThreads::GetQueueIndex(ThreadToWake)); } } // 唤醒当前档位Priorit(HP、BP、NP)内索引为IndexToStart线程去处理任务 注:需为AnyThread void StartTaskThread(int32 Priority, int32 IndexToStart) { ENamedThreads::Type ThreadToWake = ENamedThreads::Type(IndexToStart + Priority * NumTaskThreadsPerSet + NumNamedThreads); ((FTaskThreadAnyThread&)Thread(ThreadToWake)).WakeUp(); } // 唤醒档位HP、NP中所有线程去处理任务。bDoBackgroundThreads为true时,也要唤醒BP中的所有线程去处理任务 注:需为AnyThread void StartAllTaskThreads(bool bDoBackgroundThreads) { for (int32 Index = 0; Index < GetNumWorkerThreads(); Index++) { for (int32 Priority = 0; Priority < ENamedThreads::NumThreadPriorities; Priority++) { if (Priority == (ENamedThreads::NormalThreadPriority >> ENamedThreads::ThreadPriorityShift) || (Priority == (ENamedThreads::HighThreadPriority >> ENamedThreads::ThreadPriorityShift) && bCreatedHiPriorityThreads) || (Priority == (ENamedThreads::BackgroundThreadPriority >> ENamedThreads::ThreadPriorityShift) && bCreatedBackgroundPriorityThreads && bDoBackgroundThreads) ) { StartTaskThread(Priority, Index); } } } } // 获取ThreadInNeed的FBaseGraphTask对象指针 注:ThreadInNeed需为AnyThread FBaseGraphTask* FindWork(ENamedThreads::Type ThreadInNeed) { int32 LocalNumWorkingThread = GetNumWorkerThreads() + GNumWorkerThreadsToIgnore; int32 MyIndex = int32((uint32(ThreadInNeed) - NumNamedThreads) % NumTaskThreadsPerSet); int32 Priority = int32((uint32(ThreadInNeed) - NumNamedThreads) / NumTaskThreadsPerSet); check(MyIndex >= 0 && MyIndex < LocalNumWorkingThread && MyIndex < (PLATFORM_64BITS ? 63 : 32) && Priority >= 0 && Priority < ENamedThreads::NumThreadPriorities); return IncomingAnyThreadTasks[Priority].Pop(MyIndex, true); } // 用于调试命令。Stall为true时,停止HP、BP、NP档位中Index的线程的执行;为false则恢复。 注:为AnyThread void StallForTuning(int32 Index, bool Stall) { for (int32 Priority = 0; Priority < ENamedThreads::NumThreadPriorities; Priority++) { ENamedThreads::Type ThreadToWake = ENamedThreads::Type(Index + Priority * NumTaskThreadsPerSet + NumNamedThreads); ((FTaskThreadAnyThread&)Thread(ThreadToWake)).StallForTuning(Stall); } } // 设置所有AnyThread的线程优先级 void SetTaskThreadPriorities(EThreadPriority Pri) { check(NumTaskThreadSets == 1); // otherwise tuning this doesn't make a lot of sense for (int32 ThreadIndex = 0; ThreadIndex < NumThreads; ThreadIndex++) { if (ThreadIndex > LastExternalThread) { WorkerThreads[ThreadIndex].RunnableThread->SetThreadPriority(Pri); } } } private: // 获取WorkerThreads[Index]对应的FTaskThreadBase对象 FTaskThreadBase& Thread(int32 Index) { checkThreadGraph(Index >= 0 && Index < NumThreads); checkThreadGraph(WorkerThreads[Index].TaskGraphWorker->GetThreadId() == Index); return *WorkerThreads[Index].TaskGraphWorker; } // 从TLS数据中获取当前线程在WorkerThreads数组中的ENamedThreads::Type ENamedThreads::Type GetCurrentThread() { ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread; FWorkerThread* TLSPointer = (FWorkerThread*)FPlatformTLS::GetTlsValue(PerThreadIDTLSSlot);// 读取当前线程的TLS数据 if (TLSPointer) { checkThreadGraph(TLSPointer - WorkerThreads >= 0 && TLSPointer - WorkerThreads < NumThreads); int32 ThreadIndex = UE_PTRDIFF_TO_INT32(TLSPointer - WorkerThreads); // 获取当前线程在WorkerThreads数组中的Index checkThreadGraph(Thread(ThreadIndex).GetThreadId() == ThreadIndex); if (ThreadIndex < NumNamedThreads) // 为NamedThread { CurrentThreadIfKnown = ENamedThreads::Type(ThreadIndex); } else // 为AnyThread { int32 Priority = (ThreadIndex - NumNamedThreads) / NumTaskThreadsPerSet; // 计算线程档位(HP、BP和NP) CurrentThreadIfKnown = ENamedThreads::SetPriorities(ENamedThreads::Type(ThreadIndex), Priority, false); } } return CurrentThreadIfKnown; } // 从WorkerThreads的ThreadIndex索引,计算得到档位(HP、BP、NP)内的Index索引 int32 ThreadIndexToPriorityIndex(int32 ThreadIndex) { check(ThreadIndex >= NumNamedThreads && ThreadIndex < NumThreads); int32 Result = (ThreadIndex - NumNamedThreads) / NumTaskThreadsPerSet; check(Result >= 0 && Result < NumTaskThreadSets); return Result; } enum { // 非IOS系统下,MAX_THREADS为83;IOS系统下,MAX_THREADS为31。没定义STATS宏,还会再 -1 MAX_THREADS = 26 * (CREATE_HIPRI_TASK_THREADS + CREATE_BACKGROUND_TASK_THREADS + 1) + ENamedThreads::ActualRenderingThread + 1, MAX_THREAD_PRIORITIES = 3 // 线程档位(HP、BP、NP) }; /** Per thread data. **/ FWorkerThread WorkerThreads[MAX_THREADS]; // 含NameThread和AnyThread的FWorkerThread数组 /** Number of threads actually in use. **/ int32 NumThreads; // 为TaskGraph管理的线程总数。包括外部线程(NamedThread)和内部线程(AnyThread) /** Number of named threads actually in use. **/ int32 NumNamedThreads; // 外部线程(NamedThread)总数 /** Number of tasks thread sets for priority **/ int32 NumTaskThreadSets; // 内部线程(AnyThread)的优先级档位的数量。如:当前有3个线程优先级(HP、BP和NP) /** Number of tasks threads per priority set **/ int32 NumTaskThreadsPerSet; // 内部线程(AnyThread)的每档数量。注:HP、BP和NP的数量都是一样的 bool bCreatedHiPriorityThreads; // 非IOS系统下,为true;IOS系统下,为false bool bCreatedBackgroundPriorityThreads; // 非IOS系统下,为true;IOS系统下,为false /** * "External Threads" are not created, the thread is created elsewhere and makes an explicit call to run * Here all of the named threads are external but that need not be the case. * All unnamed threads must be internal **/ ENamedThreads::Type LastExternalThread; // 最后一个外部线程(NamedThread)的ENamedThreads::Type FThreadSafeCounter ReentrancyCheck; // 用于数据校验 /** Index of TLS slot for FWorkerThread* pointer. **/ uint32 PerThreadIDTLSSlot; // 用于保存FWorkThread指针的TLS slot /** Array of callbacks to call before shutdown. **/ TArrayvoid()> > ShutdownCallbacks; // Shutdown Callbacks回调数组 // AnyThread的任务队列:对应线程级别(HP、BP、NP)的对应任务优先级(NormalTask、HighTask)的队列 注:宏MAX_THREAD_PRIORITIES为3,为线程级别的个数。宏PLATFORM_CACHE_LINE_SIZE为64
FStallingTaskQueue
2> IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES]; };

三种优先级(HP,BP,NP)的AnyThread使用不同的Thread优先级任务队列IncomingAnyThreadTasks[MAX_THREAD_PRIORITIES](注:MAX_THREAD_PRIORITIES为3),由FTaskGraphImplementation直接持有。

每一种Thread优先级都有两个队列,表示Task的优先级,这些队列都是无锁实现,这些Thread会根据自己的优先级到相应的队列中取出Task执行。

class FTaskThreadAnyThread : public FTaskThreadBase 
{
public:
    // ... ...
    
    // 死循环从AnyThread队列中取出Task并执行,直到Queue.QuitForShutdown为true,则跳出循环
    uint64 ProcessTasks()
    {
        LLM_SCOPE(ELLMTag::TaskGraphTasksMisc);

        TStatId StallStatId;
        bool bCountAsStall = true;
        uint64 ProcessedTasks = 0;
        
        // ... ...
        
        verify(++Queue.RecursionGuard == 1);
        bool bDidStall = false;
        while (1)
        {
            FBaseGraphTask* Task = FindWork(); // 从IncomingAnyThreadTasks对应任务队列中Pop出一个Task
            if (!Task)
            {
                // ... ...

                TestRandomizedThreads();
                const bool bIsMultithread = FTaskGraphInterface::IsMultithread();
                if (bIsMultithread)
                {
                    FScopeCycleCounter Scope(StallStatId);
                    // 多线程模式下,无Task时让线程挂起,防止cpu空转。
                    // 当有Task进入队列、主动调用Wakeup、RequestQuit时,会调用StallRestartEvent->Trigger()唤醒线程继续执行
                    Queue.StallRestartEvent->Wait(MAX_uint32, bCountAsStall); 
                    bDidStall = true;
                }
                if (Queue.QuitForShutdown || !bIsMultithread) // 直到Queue.QuitForShutdown为true,才跳出死循环
                {
                    break;
                }
                TestRandomizedThreads();

                // ... ...
                continue;
            }
            TestRandomizedThreads();

            // ... ...
            
            bDidStall = false;
            Task->Execute(NewTasks, ENamedThreads::Type(ThreadId)); // 执行Task任务
            ProcessedTasks++;
            TestRandomizedThreads();
            // ... ...
        }
        verify(!--Queue.RecursionGuard);
        return ProcessedTasks;
    }
};

而NamedThread不会创建FRunnableThread,它们真正执行Task的Thread由对应的模块创建,并自己调用FTaskGraphInterface::Get().AttachToThread(ENamedThreads::GameThread)和对应的Worker相关联。

在NamedThread的FNamedTaskThread的实现中,有两个FThreadTaskQueue,MainQueue和LocalQueue。即FThreadTaskQueue Queues[ENamedThreads::NumQueues](注:ENamedThreads::NumQueues为2)。每个FThreadTaskQueue中也包含2个优先级的Task无锁队列。

所有期望在NamedThread中执行的Task都要入队到各自的队列中,然后NamedThread在恰当的时候调用FTaskGraphInterface的接口,根据Task优先级来执行队列中的任务。

GameThread主队列中的Task,是在Game主循环的每一帧末尾同步的时候:

bool bEmptyGameThreadTasks = !FTaskGraphInterface::Get().IsThreadProcessingTasks(ENamedThreads::GameThread);

if (bEmptyGameThreadTasks)
{
    // ProcessThreadUntilIdle会找到之前创建的GameThread的FWorkerThread的FNamedTaskThread对象, 把其队列中所有Task执行完毕,没有任务可处理时返回
    FTaskGraphInterface::Get().ProcessThreadUntilIdle(ENamedThreads::GameThread);
}

RenderThread在Render模块中创建了它的Thread对象和对应的Runnable,然后在FRenderingThread的Run()中调用了RenderingThreadMain函数,来处理对应NamedThread队列中的任务:

void RenderingThreadMain( FEvent* TaskGraphBoundSyncEvent )
{
    LLM_SCOPE(ELLMTag::RenderingThreadMemory);

    ENamedThreads::Type RenderThread = ENamedThreads::Type(ENamedThreads::ActualRenderingThread);

    ENamedThreads::SetRenderThread(RenderThread);
    ENamedThreads::SetRenderThread_Local(ENamedThreads::Type(ENamedThreads::ActualRenderingThread_Local));

    FTaskGraphInterface::Get().AttachToThread(RenderThread);
    FPlatformMisc::MemoryBarrier();

    // Inform main thread that the render thread has been attached to the taskgraph and is ready to receive tasks
    if( TaskGraphBoundSyncEvent != NULL )
    {
        TaskGraphBoundSyncEvent->Trigger();
    }

    // set the thread back to real time mode
    FPlatformProcess::SetRealTimeMode();

    // ... ...

    FCoreDelegates::PostRenderingThreadCreated.Broadcast();
    check(GIsThreadedRendering);
    
    // ProcessThreadUntilRequestReturn会找到之前创建的ActualRenderingThread的FWorkerThread的FNamedTaskThread对象, 会死循环从队列中取出Task并执行,直到Request跳出循环
    FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(RenderThread);
    FPlatformMisc::MemoryBarrier();
    check(!GIsThreadedRendering);
    FCoreDelegates::PreRenderingThreadDestroyed.Broadcast();
    
    // ... ...
    
    ENamedThreads::SetRenderThread(ENamedThreads::GameThread);
    ENamedThreads::SetRenderThread_Local(ENamedThreads::GameThread_Local);
    FPlatformMisc::MemoryBarrier();
}

RHIThread在Render模块中创建了它的Thread对象和对应的Runnable,然后在FRHIThread的Run()函数中处理对应NamedThread队列中的任务:

class FRHIThread : public FRunnable
{
public:

    // ... ...
    virtual uint32 Run() override
    {
        // ... ...

        FMemory::SetupTLSCachesOnCurrentThread();
        FTaskGraphInterface::Get().AttachToThread(ENamedThreads::RHIThread);
        
        // ProcessThreadUntilRequestReturn会找到之前创建的RHIThread的FWorkerThread的FNamedTaskThread对象, 会死循环从队列中取出Task并执行,直到Request跳出循环
        FTaskGraphInterface::Get().ProcessThreadUntilRequestReturn(ENamedThreads::RHIThread);
        FMemory::ClearAndDisableTLSCachesOnCurrentThread();
        return 0;
    }
    
};

FBaseGraphTask

TaskGraph系统中任务基类,规范了任务生命周期中必须的6个阶段。向FTaskGraphInterface提供具体Task执行的入口Execute函数。内部有前置依赖Task的计数, 用于确定何时将当前Task放入任务队列中。

class FBaseGraphTask
{
    // ... ...
protected:
    FBaseGraphTask(int32 InNumberOfPrerequistitesOutstanding)
        : ThreadToExecuteOn(ENamedThreads::AnyThread)
        , NumberOfPrerequistitesOutstanding(InNumberOfPrerequistitesOutstanding + 1) // 成员变量NumberOfPrerequistitesOutstanding(依赖的前序任务数),比实际的依赖的前序任务数要大1
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_Contructed)); // 等价于checkThreadGraph(++LifeStage == 0)
        LLM(InheritedLLMTag = FLowLevelMemTracker::bIsDisabled ? ELLMTag::Untagged : (ELLMTag)FLowLevelMemTracker::Get().GetActiveTag(ELLMTracker::Default));
    }
    
    // 设置当前Task跑在InThreadToExecuteOn线程上
    void SetThreadToExecuteOn(ENamedThreads::Type InThreadToExecuteOn)
    {
        ThreadToExecuteOn = InThreadToExecuteOn;
        checkThreadGraph(LifeStage.Increment() == int32(LS_ThreadSet)); // 等价于checkThreadGraph(++LifeStage == 2)
    }

    // 判断的前序任务是否都完成。若都完成且bUnlock为true,则立即将当前Task加入到CurrentThread的任务队列中
    void PrerequisitesComplete(ENamedThreads::Type CurrentThread, int32 NumAlreadyFinishedPrequistes, bool bUnlock = true)
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_PrequisitesSetup)); // 等价于checkThreadGraph(++LifeStage == 3)
        int32 NumToSub = NumAlreadyFinishedPrequistes + (bUnlock ? 1 : 0); // the +1 is for the "lock" we set up in the constructor
        
        // int NumberOfPrerequistitesOutstandingBackup = NumberOfPrerequistitesOutstanding;
        // NumberOfPrerequistitesOutstanding = NumberOfPrerequistitesOutstanding - NumToSub;
        // if (NumberOfPrerequistitesOutstandingBackup == NumToSub)
        if (NumberOfPrerequistitesOutstanding.Subtract(NumToSub) == NumToSub) // 逻辑上等价于上面3句代码
        {
            QueueTask(CurrentThread);
        }
    }
    
    virtual ~FBaseGraphTask()
    {
        checkThreadGraph(LifeStage.Increment() == int32(LS_Deconstucted)); // 等价于checkThreadGraph(++LifeStage == 6)
    }
    
    // ... ...

    // 依赖的前序任务数为1时,将当前Task加入到CurrentThread的任务队列中
    void ConditionalQueueTask(ENamedThreads::Type CurrentThread)
    {
        if (NumberOfPrerequistitesOutstanding.Decrement()==0) // 等价于if (--NumberOfPrerequistitesOutstanding == 0)
        {
            QueueTask(CurrentThread); 
        }
    }

private:
    // ... ...

    // 执行该Task任务
    virtual void ExecuteTask(TArray& NewTasks, ENamedThreads::Type CurrentThread)=0;
// 执行该Task任务 FORCEINLINE void Execute(TArray& NewTasks, ENamedThreads::Type CurrentThread) { LLM_SCOPE(InheritedLLMTag); checkThreadGraph(LifeStage.Increment() == int32(LS_Executing)); // 等价于checkThreadGraph(++LifeStage == 5) ExecuteTask(NewTasks, CurrentThread); }

// 将当前Task加入ThreadToExecuteOn线程队列中 void QueueTask(ENamedThreads::Type CurrentThreadIfKnown) { checkThreadGraph(LifeStage.Increment() == int32(LS_Queued)); // 等价于checkThreadGraph(++LifeStage == 4) FTaskGraphInterface::Get().QueueTask(this, ThreadToExecuteOn, CurrentThreadIfKnown); } // 当前任务跑在那个线程上 ENamedThreads::Type ThreadToExecuteOn; // 依赖的前序任务数。当为1时,将当前Task加入队列 FThreadSafeCounter NumberOfPrerequistitesOutstanding; #if !UE_BUILD_SHIPPING // Life stage verification // Tasks go through 8 steps, in order. In non-final builds, we track them with a thread safe counter and verify that the progression is correct. enum ELifeStage { LS_BaseContructed = 0, // 0 LS_Contructed, // 1 LS_ThreadSet, // 2 LS_PrequisitesSetup, // 3 LS_Queued, // 4 LS_Executing, // 5 LS_Deconstucted, // 6 }; FThreadSafeCounter LifeStage; // 生命阶段计数器 #endif // ... ... };

FGraphEvent

FGraphEvent表示一个Task完成的事件。所以FGraphEvent总是和一个Task相关,它也是在一个Task初始化的时候创建的。FGraphEvent实现了Task之间的依赖关系。

只有Task依赖的所有前置Task执行完成,当前Task才会被投入到队列中。在一个Task执行完成之后,与其相关的Event就算完成了,马上Event就会处理所有依赖于自己的后续Task。

typedef TRefCountPtr<class FGraphEvent> FGraphEventRef;
typedef TArray4> > FGraphEventArray;

class FGraphEvent 
{
public:

    // 从工厂类中创建一个FGraphEvent对象
    static CORE_API FGraphEventRef CreateGraphEvent()
    {
        return TheGraphEventAllocator.New();
    }

    // 参数FBaseGraphTask* Task需要当前FGraphEvent作为其前置任务事件时,需要将Task添加进当前FGraphEvent的链表中
    // 如果当前Event已经触发,则添加失败,返回false;添加成功,则返回true
    bool AddSubsequent(class FBaseGraphTask* Task)
    {
        return SubsequentList.PushIfNotClosed(Task);
    }

    // 检查依赖的前置任务的Event个数是否为0
    void CheckDontCompleteUntilIsEmpty()
    {
        checkThreadGraph(!EventsToWaitFor.Num());
    }

    // 将EventsToWaitFor设置为当前Event的前置任务事件
    void DontCompleteUntil(FGraphEventRef EventToWaitFor)
    {
        checkThreadGraph(!IsComplete()); // EventsToWaitFor.Add(EventToWaitFor); // 将EventToWaitFor添加进EventsToWaitFor数组
        new (EventsToWaitFor) FGraphEventRef(EventToWaitFor); // 等价于上面的代码逻辑,这么写会减少拷贝带来的性能消耗  // TArray重写了placement new  详见:UnrealEngine\Engine\Source\Runtime\Core\Public\Containers\Array.h
    }

    CORE_API void DispatchSubsequents(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        TArray NewTasks;
        DispatchSubsequents(NewTasks, CurrentThreadIfKnown);
    }

    // 当前Event未完成时,推动它依赖的未完成前置任务执行。当前Event完成后,则推动依赖该Event的任务执行
    CORE_API void DispatchSubsequents(TArray& NewTasks, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        if (EventsToWaitFor.Num())
        {
            // need to save this first and empty the actual tail of the task might be recycled faster than it is cleared.
            FGraphEventArray TempEventsToWaitFor;
            Exchange(EventsToWaitFor, TempEventsToWaitFor);

            bool bSpawnGatherTask = true;

            if (GTestDontCompleteUntilForAlreadyComplete)
            {
                bSpawnGatherTask = false;
                for (FGraphEventRef& Item : TempEventsToWaitFor)
                {
                    if (!Item->IsComplete())
                    {
                        bSpawnGatherTask = true;
                        break;
                    }
                }
            }

            if (bSpawnGatherTask)  // 还存在未完成的前置任务
            {
                // create the Gather...this uses a special version of private CreateTask that "assumes" the subsequent list (which other threads might still be adding too).
                DECLARE_CYCLE_STAT(TEXT("FNullGraphTask.DontCompleteUntil"),
                STAT_FNullGraphTask_DontCompleteUntil,
                    STATGROUP_TaskGraphTasks);

                ENamedThreads::Type LocalThreadToDoGatherOn = ENamedThreads::AnyHiPriThreadHiPriTask;
                if (!GIgnoreThreadToDoGatherOn)
                {
                    LocalThreadToDoGatherOn = ThreadToDoGatherOn;
                }
                // 用this指针来构造FGraphEventRef,并关联在FNullGraphTask任务上,用来继续推动执行未完成的前置任务
                TGraphTask::CreateTask(FGraphEventRef(this), &TempEventsToWaitFor, CurrentThreadIfKnown).ConstructAndDispatchWhenReady(GET_STATID(STAT_FNullGraphTask_DontCompleteUntil), LocalThreadToDoGatherOn);
                return;
            }
        }

        // 进入这里,表示所有的前置任务都已经完成
        SubsequentList.PopAllAndClose(NewTasks);  // 从依赖该Event的SubsequentList任务列表中Pop出所有的任务,并保存到NewTasks数组中
        for (int32 Index = NewTasks.Num() - 1; Index >= 0 ; Index--) // reverse the order since PopAll is implicitly backwards
        {
            FBaseGraphTask* NewTask = NewTasks[Index];
            checkThreadGraph(NewTask);
            NewTask->ConditionalQueueTask(CurrentThreadIfKnown); // 尝试让任务加入队列,来执行
        }
        NewTasks.Reset(); // 将NewTasks数组清空
    }

    // 是否完成
    bool IsComplete() const
    {
        return SubsequentList.IsClosed();
    }

    // 等待直到当前Event的任务完成
    void Wait(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        FTaskGraphInterface::Get().WaitUntilTaskCompletes(this, CurrentThreadIfKnown);
    }

    // ... ...

private:
    // ... ...
    
    // 释放ToRecycle的内存
    static CORE_API void Recycle(FGraphEvent* ToRecycle)
    {
        TheGraphEventAllocator.Free(ToRecycle);
    }

    // 构造函数
    friend struct FGraphEventAndSmallTaskStorage;
    FGraphEvent(bool bInInline = false)
        : ThreadToDoGatherOn(ENamedThreads::AnyHiPriThreadHiPriTask)
    {
    }

    // 析构函数
    ~FGraphEvent()
    {
#if DO_CHECK
        if (!IsComplete())
        {
            check(SubsequentList.IsClosed());
        }
#endif
        CheckDontCompleteUntilIsEmpty(); // We should not have any wait untils outstanding
    }

    // Interface for TRefCountPtr

public:
    // 引用计数+1
    uint32 AddRef()
    {
        int32 RefCount = ReferenceCount.Increment();
        checkThreadGraph(RefCount > 0);
        return RefCount;
    }
    // 引用计数-1
    uint32 Release()
    {
        int32 RefCount = ReferenceCount.Decrement();
        checkThreadGraph(RefCount >= 0);
        if (RefCount == 0) // 为0时,调用Recyle函数回收当前Event的内存
        {
            Recycle(this);
        }
        return RefCount;
    }


private:

    // 依赖当前Event作为前置任务的Task列表
    TClosableLockFreePointerListUnorderedSingleConsumer0>    SubsequentList;
    // 当前Event依赖的前置任务的Event列表
    FGraphEventArray                                                        EventsToWaitFor;
    // 引用计数器
    FThreadSafeCounter                                                        ReferenceCount;
    // 用于推动执行未完成的前置任务的FNullGraphTask所跑在的线程
    ENamedThreads::Type                                                        ThreadToDoGatherOn;

    // ... ...
};

TGraphTask

继承自FBaseGraphTask,是一个带TTask类型的模板类。

TGraphTask嵌入用户定义的Task, 并依赖于FGraphEvent处理前置和后续Task。

template
class TGraphTask final : public FBaseGraphTask
{
public:
    // 用于构造TGraphTask对象的内部工具类
    class FConstructor
    {
    public:
        // 构建TGraphTask对象,如果前序任务都完成,则将其加入对应线程的任务队列中
        template
        FGraphEventRef ConstructAndDispatchWhenReady(T&&... Args)
        {
            new ((void *)&Owner->TaskStorage) TTask(Forward(Args)...);
            return Owner->Setup(Prerequisites, CurrentThreadIfKnown);
        }

        // 构建TGraphTask对象,不立即将其加入对应线程的任务队列中
        template
        TGraphTask* ConstructAndHold(T&&... Args)
        {
            new ((void *)&Owner->TaskStorage) TTask(Forward(Args)...);
            return Owner->Hold(Prerequisites, CurrentThreadIfKnown);
        }

    private:
        friend class TGraphTask;

        /** The task that created me to assist with embeded task construction and preparation. **/
        TGraphTask*                        Owner;
        /** The list of prerequisites. **/
        const FGraphEventArray*            Prerequisites;
        /** If known, the current thread.  ENamedThreads::AnyThread is also fine, and if that is the value, we will determine the current thread, as needed, via TLS. **/
        ENamedThreads::Type                CurrentThreadIfKnown;

        // ... ...
    };

    // 在堆上创建并通过新建FGraphEvent来初始化TGraphTask对象,最后用其构造并返回FConstructor对象
    static FConstructor CreateTask(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        int32 NumPrereq = Prerequisites ? Prerequisites->Num() : 0;
        if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)  // TGraphTask的size小于256时,使用SmallTaskAllocator分配器来分配内存
        {
            void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate();
            return FConstructor(new (Mem) TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown);
        }
        
        // 直接使用new来分配TGraphTask的内存
        return FConstructor(new TGraphTask(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget ? NULL : FGraphEvent::CreateGraphEvent(), NumPrereq), Prerequisites, CurrentThreadIfKnown);
    }

    // 调用ConstructAndHold的Task,需要调用Unlock。若前序任务都完成,则将其加入对应线程的任务队列中
    void Unlock(ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        ConditionalQueueTask(CurrentThreadIfKnown);
    }

    // 获取当前Task的完成事件
    FGraphEventRef GetCompletionEvent()
    {
        return Subsequents;
    }

private:
    friend class FConstructor;
    friend class FGraphEvent;

    // 执行任务  输入输出参数NewTasks
    void ExecuteTask(TArray& NewTasks, ENamedThreads::Type CurrentThread) override
    {
        checkThreadGraph(TaskConstructed);

        // Fire and forget mode must not have subsequents
        // Track subsequents mode must have subsequents
        checkThreadGraph(XOR(TTask::GetSubsequentsMode() == ESubsequentsMode::FireAndForget, IsValidRef(Subsequents))); 

        if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents) 
        {
            Subsequents->CheckDontCompleteUntilIsEmpty(); // we can only add wait for tasks while executing the task
        }
        
        TTask& Task = *(TTask*)&TaskStorage;
        {
            FScopeCycleCounter Scope(Task.GetStatId(), true); 
            Task.DoTask(CurrentThread, Subsequents);  // 执行任务逻辑
            Task.~TTask(); // 执行析构函数
            checkThreadGraph(ENamedThreads::GetThreadIndex(CurrentThread) <= ENamedThreads::GetRenderThread() || FMemStack::Get().IsEmpty()); // you must mark and pop memstacks if you use them in tasks! Named threads are excepted.
        }
        
        TaskConstructed = false;

        if (TTask::GetSubsequentsMode() == ESubsequentsMode::TrackSubsequents) // 当前Task可被其他任务依赖
        {
            FPlatformMisc::MemoryBarrier();
            Subsequents->DispatchSubsequents(NewTasks, CurrentThread);
        }

        if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)
        {
            this->TGraphTask::~TGraphTask();  // 析构TGraphTask
            FBaseGraphTask::GetSmallTaskAllocator().Free(this); // 释放内存
        }
        else
        {
            delete this; // new出来的TGraphTask,直接delete
        }
    }

    // ... ...

    // 设置当前Task想执行的线程和依赖的前序任务
    void SetupPrereqs(const FGraphEventArray* Prerequisites, ENamedThreads::Type CurrentThreadIfKnown, bool bUnlock)
    {
        checkThreadGraph(!TaskConstructed);
        TaskConstructed = true;
        TTask& Task = *(TTask*)&TaskStorage;
        SetThreadToExecuteOn(Task.GetDesiredThread()); // 设置当前Task想执行的线程
        int32 AlreadyCompletedPrerequisites = 0; // 已完成的前序任务数
        if (Prerequisites)
        {
            for (int32 Index = 0; Index < Prerequisites->Num(); Index++)
            {
                FGraphEvent* Prerequisite = (*Prerequisites)[Index];
                // Prerequisite为空,或者添加当前Task到FGraphEvent* Prerequisite的SubsequentList列表失败时   注:失败表示FGraphEvent* Prerequisite已经触发
                if (Prerequisite == nullptr || !Prerequisite->AddSubsequent(this))
                {
                    AlreadyCompletedPrerequisites++;
                }
            }
        }
        // 如果前序任务若都完成且bUnlock为true,则立即将当前Task加入到CurrentThread的任务队列中
        PrerequisitesComplete(CurrentThreadIfKnown, AlreadyCompletedPrerequisites, bUnlock);
    }

    // 设置当前Task想执行的线程和依赖的前序任务,如果前序任务若都完成,则立即将当前Task加入到CurrentThread的任务队列中,并返回Task自己的FGraphEventRef对象
    FGraphEventRef Setup(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        FGraphEventRef ReturnedEventRef = Subsequents; // very important so that this doesn't get destroyed before we return
        SetupPrereqs(Prerequisites, CurrentThreadIfKnown, true);
        return ReturnedEventRef;
    }

    // 设置当前Task想执行的线程和依赖的前序任务,不立即将其加入对应线程的任务队列中,并返回当前TGraphTask对象指针
    TGraphTask* Hold(const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        SetupPrereqs(Prerequisites, CurrentThreadIfKnown, false);
        return this;
    }

    // 在堆上创建并通过传入的FGraphEventRef SubsequentsToAssume参数来初始化TGraphTask对象,最后用其构造并返回FConstructor对象
    static FConstructor CreateTask(FGraphEventRef SubsequentsToAssume, const FGraphEventArray* Prerequisites = NULL, ENamedThreads::Type CurrentThreadIfKnown = ENamedThreads::AnyThread)
    {
        if (sizeof(TGraphTask) <= FBaseGraphTask::SMALL_TASK_SIZE)
        {
            void *Mem = FBaseGraphTask::GetSmallTaskAllocator().Allocate();
            return FConstructor(new (Mem) TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
        }
        return FConstructor(new TGraphTask(SubsequentsToAssume, Prerequisites ? Prerequisites->Num() : 0), Prerequisites, CurrentThreadIfKnown);
    }

    TAlignedBytes<sizeof(TTask),alignof(TTask)> TaskStorage;  // TTask执行体
    bool                        TaskConstructed; // Task是否被构造好。即:是否调用了SetupPrereqs函数
    
    // 当前Task的Event。其他的Task需要该Task作为前置任务时,需要等待这个Event。
    // 为ESubsequentsMode::FireAndForget类型时,Subsequents为空;为ESubsequentsMode::TrackSubsequents类型时,Subsequents的值才有效。
    FGraphEventRef                Subsequents;
};

任务类

在TaskGraph系统里面,任务类需要自己来定义。在该类中需要声明DoTask函数来表示要执行的任务内容,GetDesiredThread函数来表示要在哪个线程上面执行,大概的样子如下:

class FMyTestTask
{
public:
    FMyTestTask(ENamedThreads::Type InDesiredThread, int32 InCount, const FString& InDesc)
        : DesiredThread(InDesiredThread)
        , Count(InCount)
        , Desc(InDesc)
    {
    }
    FORCEINLINE static TStatId GetStatId()
    {
        RETURN_QUICK_DECLARE_CYCLE_STAT(FMyTestTask, STATGROUP_TestGroup);
    }
    
    ENamedThreads::Type GetDesiredThread()
    {
        return DesiredThread; // 该Task跑在构造函数传入的DesiredThread线程上
    }

    static ESubsequentsMode::Type GetSubsequentsMode()
    {
        return ESubsequentsMode::TrackSubsequents; // 当前Task可被其他任务依赖
    }

    void DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
    {
        uint32 CurrentThreadId = FPlatformTLS::GetCurrentThreadId();
        FString CurrentThreadName = FThreadManager::Get().GetThreadName(CurrentThreadId);
        UE_LOG(LogTemp, Log, TEXT("FMyTestTask %s[%d] Count:%d Desc:%s"), *CurrentThreadName, CurrentThreadId, Count, *Desc);
    }

private:
    ENamedThreads::Type DesiredThread;
    int32 Count;
    FString Desc;
};

引擎中有一些预定义常用的任务类:

类型 GetDesiredThread GetSubsequentsMode
FNullGraphTask 通过构造函数的参数传入 TrackSubsequents
FReturnGraphTask

通过构造函数的参数传入

不能为AnyThread

TrackSubsequents
FTriggerEventGraphTask 通过构造函数的参数传入 TrackSubsequents
FSimpleDelegateGraphTask 通过构造函数的参数传入 TrackSubsequents
FDelegateGraphTask 通过构造函数的参数传入 TrackSubsequents

// FFunctionGraphTask::CreateAndDispatchWhenReady静态函数内部调用它来完成lamda表达式任务的创建

template

TFunctionGraphTaskImpl 

通过构造函数的参数传入 模板参数传入
FTickFunctionTask  通过构造函数的参数传入 TrackSubsequents
FPhysXTask

CPrio_FPhysXTask.Get()

尽量用HP、高优先级Task的AnyThread

TrackSubsequents

详细的类图信息如下:

实例展示

在GameThread上创建跑在GameThread上任务,不阻塞等待任务完成

// 创建没有前置任务跑在GameThread上的FMyTestTask任务。Hold住不立即加入任务队列中,先不执行。
TGraphTask* Task1 = TGraphTask::CreateTask().ConstructAndHold(ENamedThreads::Type::GameThread, 10, TEXT("China")); // 参数ENamedThreads::Type::GameThread, 10, TEXT(("China")会传给FMyTestTask的构造函数

// 创建没有前置任务跑在GameThread上的FMyTestTask任务。如果可以则立即加入任务队列中。
FGraphEventRef Task2Event = TGraphTask::CreateTask().ConstructAndDispatchWhenReady(ENamedThreads::Type::GameThread, 20, TEXT("Hello")); // 参数ENamedThreads::Type::GameThread, 20, TEXT(("Hello")会传给FMyTestTask的构造函数

// 创建前置任务为Task1、Task2跑在GameThread上的FMyTestTask任务。如果可以则立即加入任务队列中。
FGraphEventArray Task3PreTasks = { Task1->GetCompletionEvent(), Task2Event };
FGraphEventRef Task3Event = TGraphTask::CreateTask(&Task3PreTasks).ConstructAndDispatchWhenReady(ENamedThreads::Type::GameThread, 30, TEXT("Go")); // 参数ENamedThreads::Type::GameThread, 30, TEXT(("Go")会传给FMyTestTask的构造函数

// ConstructAndHold创建的Task需要手动调用Unlock。如果可以则立即加入任务队列中。
Task1->Unlock(); 

在GameThread中创建跑在GameThread上任务,并阻塞等待任务完成

// 创建没有前置任务跑在GameThread_Local上的FMyTestTask任务。如果可以则立即加入任务队列中。
FGraphEventRef Task1Event = TGraphTask::CreateTask().ConstructAndDispatchWhenReady(ENamedThreads::Type::GameThread_Local, 1, TEXT("Test")); // 参数ENamedThreads::Type::GameThread_Local, 1, TEXT(("Test")会传给FMyTestTask的构造函数

// 阻塞等待LocalQueue中的Task1,完成后才继续
FTaskGraphInterface::Get().WaitUntilTasksComplete({ Task1Event }, ENamedThreads::GameThread_Local); // 或者调用:Task1Event->Wait(ENamedThreads::GameThread_Local);

注:任务需要投放到GameThead的LocalQueue中,然后阻塞等待GameThread对LocalQueue中的任务进行处理,直到完毕。

在GameThread中创建跑在AnyThread上任务,并阻塞等待任务完成

FGraphEventArray PreTasks;
for (int i = 0; i < 5; ++i)
{
    PreTasks.Add(FFunctionGraphTask::CreateAndDispatchWhenReady([i]()
        {
            UE_LOG(LogTemp, Log, TEXT("Task %d"), i);
        }
    ));
}

// 创建没有前置任务跑在NP上高Task优先级的AnyThread类型FNullGraphTask任务。如果可以则立即加入任务队列中。
PreTasks.Add(TGraphTask::CreateTask().ConstructAndDispatchWhenReady(TStatId(), ENamedThreads::Type::AnyNormalThreadHiPriTask));

// 创建没有前置任务跑在HP线程上高任务优先级的AnyThread类型的TFunctionGraphTaskImpl任务。如果可以则立即加入任务队列中。
FGraphEventRef Event1 = FFunctionGraphTask::CreateAndDispatchWhenReady([]()
    {
        UE_LOG(LogTemp, Log, TEXT("Main Task"));
    },
    TStatId{},
    nullptr,
    ENamedThreads::Type::AnyHiPriThreadHiPriTask
    );
PreTasks.Add(Event1);

// 创建没有前置任务跑在AnyThread上的FSimpleDelegateGraphTask任务。如果可以则立即加入任务队列中。
FSimpleDelegateGraphTask::FDelegate LambdaSimpleDelegateProc = FSimpleDelegateGraphTask::FDelegate::CreateLambda([]()
    {
        UE_LOG(LogTemp, Log, TEXT("Simple Delegate"));
    }
);
PreTasks.Add(FSimpleDelegateGraphTask::CreateAndDispatchWhenReady(LambdaSimpleDelegateProc,TStatId()));

// 创建没有前置任务跑在AnyThread上的FDelegateGraphTask任务。如果可以则立即加入任务队列中。
FDelegateGraphTask::FDelegate LambdaDelegateProc = FDelegateGraphTask::FDelegate::CreateLambda([](ENamedThreads::Type InCurrentThread, const FGraphEventRef& MyCompletionGraphEvent)
    {
        UE_LOG(LogTemp, Log, TEXT("Delegate %d"), InCurrentThread);
    }
);
PreTasks.Add(FDelegateGraphTask::CreateAndDispatchWhenReady(LambdaDelegateProc, TStatId()));

FEvent* WaitEvent = FPlatformProcess::GetSynchEventFromPool();
// 创建有8个前置任务跑在AnyThread上的FTriggerEventGraphTask任务。如果可以则立即加入任务队列中。
TGraphTask::CreateTask(&PreTasks).ConstructAndDispatchWhenReady(WaitEvent, ENamedThreads::Type::AnyThread);
WaitEvent->Wait();// 挂起。FTriggerEventGraphTask任务完成后,会执行WaitEvent->Trigger来结束等待
FPlatformProcess::ReturnSynchEventToPool(WaitEvent);

UE_LOG(LogTemp, Log, TEXT("Task Finish!"));

在GameThread中创建跑在AnyThread上任务进行计算,并阻塞等待任务完成,最后取回计算结果

int32 TotalNum = 18;
    
FSimpleDelegateGraphTask::FDelegate SimpleDelegateProc = FSimpleDelegateGraphTask::FDelegate::CreateLambda([&TotalNum]()
    {
        int32 RandSeed;
        FMath::RandInit(int64(&RandSeed));
        int32 Seconds = FMath::RandRange(1, 5);
        FPlatformAtomics::InterlockedAdd(&TotalNum, Seconds); // 对TotalNum进行原子操作,防止多线程竞争
        FPlatformProcess::Sleep(Seconds);

        UE_LOG(LogTemp, Log, TEXT("FSimpleDelegateGraphTask Sleep:%d"), Seconds);
    }
);

FGraphEventArray PreTasks;
// 创建没有前置任务跑在AnyThread上的FSimpleDelegateGraphTask任务。如果可以则立即加入任务队列中。
PreTasks.Add(FSimpleDelegateGraphTask::CreateAndDispatchWhenReady(SimpleDelegateProc, TStatId()));
// 创建没有前置任务跑在AnyThread上的FSimpleDelegateGraphTask任务。如果可以则立即加入任务队列中。
PreTasks.Add(FSimpleDelegateGraphTask::CreateAndDispatchWhenReady(SimpleDelegateProc, TStatId()));
// 创建没有前置任务跑在AnyThread上的FSimpleDelegateGraphTask任务。如果可以则立即加入任务队列中。
PreTasks.Add(FSimpleDelegateGraphTask::CreateAndDispatchWhenReady(SimpleDelegateProc, TStatId()));

// 阻塞等待PreTasks数组中所有任务完成
FTaskGraphInterface::Get().WaitUntilTasksComplete(MoveTemp(PreTasks), ENamedThreads::GameThread); // 使用MoveTemp是为了防止拷贝,提升性能

UE_LOG(LogTemp, Log, TEXT("TotalNum: %d"), TotalNum);

要调用静态的CreateTask,然后又要通过返回值执行ConstructAndDispatchWhenReady。那么这么做的目的是什么呢?

主要是为了能把两套参数都传进去:一套参数指定依赖事件,属于任务系统的自身特点;另一套参数传入玩家自定义任务的相关参数。

为了实现这个效果,UE先通过工厂方法创建抽象任务把相关特性保存进去,然后通过内部的一个帮助类FConstructor构建一个真正的玩家定义的任务。

整体结构架构图

参考

【UE4源代码观察】观察TaskGraph是如何执行任务的

UE4-多线程,TaskGraph

UE4/UE5的TaskGraph

UE并发-TaskGraph的实现和用法

《Exploring in UE4》多线程机制详解[原理分析]

UE4 C++进阶07 异步操作-基于TaskGraph的多线程(bilibili视频)