首页 > 其他分享 >【协程库】协程调度

【协程库】协程调度

时间:2024-12-04 21:29:19浏览次数:9  
标签:协程库 协程 主协程 caller 调度 任务 线程

协程调度是指管理和控制多个协程在程序中的执行顺序和时机的过程。协程是一种轻量级的、用户态的线程,允许在单个线程内实现并发执行。 协程调度器负责决定何时切换协程、哪个协程应该运行,以及如何协调协程之间的执行。 在前面的协程模块中,对于每个协程,都需要用户手动调用协程的resume 方法将协程运行起来,然后等协程运行结束并返回,再运行下⼀个协程。这种运行协程的方式其实是用户自己在挑选协程执行,相当于用户在充当调度器,显然不够灵活。 引入协程调度后,则 可以先创建⼀个协程调度器,然后把这些要调度的协程传递给调度器,由调度器负责把这些协程⼀个⼀个消耗掉 。 进程的调度算法:先来先服务、最短作业优先、最高响应比优先、时间片轮转等,协程的调度也类似,我们可以随便选择调度协程的算法,sylar 里使用的就是最简单的先来先服务。 先展现一个简单的协程调度器,负责以下功能:

  • 通过 std::list<sylar::Fiber::ptr> m_tasks 存储待调度的协程任务。
  • 提供 schedule 方法添加协程任务。
  • 提供 run 方法依次执行调度的协程任务。
#include "sylar/sylar.h"

class Scheduler {
public:
    void schedule(sylar::Fiber::ptr task) {
        m_tasks.push_back(task);
    }

    void run() {
        sylar::Fiber::ptr task;
        auto it = m_tasks.begin();

        while(it != m_tasks.end()) {
            task = *it;
            m_tasks.erase(it++);
            task->resume();
        }
    }

private:
    std::list<sylar::Fiber::ptr> m_tasks;
};

void test_fiber(int i) { //一个简单的协程任务,打印"hello world"及索引
    std::cout << "hello world " << i << std::endl;
}

int main() {
    sylar::Fiber::GetThis(); // 初始化当前线程的主协程
    Scheduler sc; // 创建调度器

    // 添加调度任务
    for(auto i = 0; i < 10; i++) {
        sylar::Fiber::ptr fiber(new sylar::Fiber(
            std::bind(test_fiber, i)
        ));
        sc.schedule(fiber);
    }

    // 执行调度人物
    sc.run();
    return 0;
}

协程调度的细节

上面的实现可以看成sylar的协程调度器的一个特例,当sylar的协程调度器只使用main函数所在的线程进行调度时,它的工作原理和上面的完全一样

首先是关于调度任务的定义,对于协程调度器来说,协程当然可以作为调度任务, 但实际上,函数也应可以,因为函数也是可执行的对象,调度器应当支持直接调度一个函数。 这在代码实现上也很简单,只需要将函数包装成协程即可,协程调度器的实现重点还是以协程为基础。 接下来是多线程,通过前面协程模块的知识我们可以知道, ⼀个线程同⼀时刻只能运行⼀个协程,所以,作为协程调度器,势必要用到多线程来提高调度的效率,因为有多个线程就意味着有多个协程可以同时执行,这显然是要好过单线程的。
Q:既然多线程可以提高协程调度的效率,那么, 能不能把调度器所在的线程(caller线程)也加入进来作为调度线程呢? 比如典型地,在 main 函数中定义的调度器,能不能把 main 函数所在的线程也用来执行调度任务呢? A: 答案是肯定的,在实现相同调度能力的情况下(指能够同时调度的协程数量),线程数越小,线程切换的开销也就越小,效率就更高⼀些,将调度器所在的线程也用于执行调度任务,可以减少总线程数,降低上下文切换的成本。所以, 调度器所在的线程,也应该⽀持⽤来执⾏调度任务。甚至,调度器完全可以不创建新的线程,而只使用caller线程来进行协程调度,比如只使用main函数所在的线程来进行协程调度。

接下来是调度器如何运行,这里可以简单地认为,调度器创建后,内部首先会创建⼀个调度线程池,调度开始后, 所有调度线程按顺序从任务队列里取任务执行,调度线程数越多,能够同时调度的任务也就越多,当所有任务都调度完后,调度线程就停下来等新的任务进来。

接下来是添加调度任务,添加调度任务的本质就是往调度器的任务队列里塞任务, 但是,只添加调度任务是不够的,还应该有⼀种方式用于通知调度线程有新的任务加进来了,因为调度线程并不⼀定知道有新任务进来了。当然调度线程也可以不停地轮询有没有新任务,但是这样CPU占用率会很高。 接下来是调度器的停止。调度器应该支持停止调度的功能,以便回收调度线程的资源,只有当所有的调度线程都结束后,调度器才算真正停止。 通过上面的描述,⼀个协程调度器的⼤概设计也就出炉了: 调度器内部维护一个任务队列和⼀个调度线程池。 开始调度后,线程池从任务队列里按顺序取任务执行。调度线程可以包含caller 线程。当全部任务都执⾏完了,线程池停⽌调度,等新的任务进来。添加新任务后,通知线程池有新的任务进来了,线程池重新开始运⾏调度。停⽌调度时,各调度线程退出,调度器停⽌⼯作。

如左图,当主线程,即调度器线程不参与调度时,即use_caller为false时,就必须要创建其他线程进行协程调度:因为有单独的线程用于协程调度,那么只需要让新线程的入口函数作为调度协程,从任务队列里取任务执行就行了,main函数与调度协程完全不相关,main函数只需要向调度器添加任务,然后在适当的时机停止调度器即可。当调度器停止时,main函数要等待调度线程结束后再退出。

如右图,当主线程参与调度时,use_caller为true,可以是多线程,可以是单线程,多线程时调度线程的协程切换如上,那主线程和单线程的时候协程是怎样切换的呢:

协程有3类:main函数对应的主协程;调度协程;待调度的任务协程

main线程中协程的运行顺序:

  1. main函数主协程运行,创建调度器
  2. 仍然是main函数主协程运行,向调度器添加一些调度任务
  3. 开始协程调度,main函数主协程让出执行权,切换到调度协程,调度协程从任务队列里按执行顺序执行所有的任务
  4. 每次执行一个任务,调度协程都要让出执行权,再切到该任务的协程里取执行,任务执行结束后,还有再切回调度协程,继续下一个任务的调度
  5. 所有任务都执行完后,调度协程还要让出执行权并切回main函数主协程,以保证程序能顺利结束
在具体的实现上,前面提到 sylar 的子协程只能和线程主协程切换,而不能和另⼀个子协程切换。在上面的情况 1  中,线程主协程是main 函数对应的协程,另外的两类协程,也就是调度协程和任务协程,都是子协程,也就是说,调度协程不能直接和任务协程切换,⼀旦切换,程序的main 函数协程就跑飞了。 解决单线程环境下caller线程主协程-调度协程-任务协程之间的上下文切换,是sylar协程调度实现的关键。 其实,子协程和子协程切换导致线程主协程跑飞的 关键原因在于,每个线程只有两个线程局部变量用于保存当前的协程上下文信息。 也就是说线程任何时候都最多只能知道两个协程的上下文,其中一个是当前正在运行协程的上下文,另⼀个是线程主协程的上下文,如果子协程和子协程切换,那这两个上下文都会变成子协程的上下文,线程主协程的上下文丢失了,程序也就跑飞了。如果不改变这种局部,就只能线程主协程去充当调度协程,这就相当于又回到了让用户充当调度器的情况。 总结一下:

问题描述

在单线程环境中,每个线程通常只能拥有两个线程局部变量来保存协程的上下文信息:

  • 当前正在运行的协程上下文
  • 线程主协程的上下文

当存在多个任务协程(子协程)时,频繁在子协程之间切换会导致:

  • 线程主协程上下文丢失:因为每次切换只保留两个上下文,当切换到另一个子协程时,主协程的上下文被覆盖,导致程序运行异常(“跑飞”)。

解决方案

那么,如何改变这种情况呢?其实非常简单,只需要给每个线程增加⼀个线程局部变量用于保存调度协程的上下文就可以了,这样,每个线程可以同时保存三个协程的上下文,

  • 当前正在执行的协程上下文
  • 线程主协程的上下文
  • 调度协程的上下文

有了这三个上下文,协程就可以根据自己的身份来选择和每次和哪个协程进行交换,具体操作如下,具体步骤如下:

下面看看对协程类进行改造,增加m_runInScheduler成员,表示当前协程是否参与调度器调度,在协程的resume和yield时,根据协程的运行环境确定是和线程主协程进行交换还是和调度器进行交换:

1. 新增成员变量
  • m_runInScheduler
    • 类型:bool
    • 作用:标识当前协程是否参与调度器调度。
2. 构造函数
  • 初始化参数
    • cb:协程的回调函数。
    • stacksize:协程的栈大小,若未指定则使用默认值。
    • run_in_scheduler:是否参与调度器调度。
  • 初始化流程
    • 分配唯一的协程ID并递增总协程数。
    • 设置栈大小并分配栈空间。
    • 初始化上下文 (getcontext),配置上下文链接和栈信息。
    • 使用 makecontext 设置协程的主函数为 Fiber::MainFunc
    • 记录协程创建日志。

3. resume 方法
  • 功能:恢复协程的执行。
  • 步骤
    • 断言协程状态不是终止 (TERM) 或正在运行 (RUNNING)。
    • 设置当前协程为执行状态。
    • 根据 m_runInScheduler 决定与调度器主协程或线程主协程交换上下文。
    • 使用 swapcontext 进行上下文切换,若失败则断言错误。

4. yield 方法
  • 功能:挂起当前协程的执行,返回到调度器或主协程。
  • 步骤
    • 断言协程当前状态为运行 (RUNNING) 或终止 (TERM)。
    • 设置当前协程为主协程。
    • 若协程未终止,设置状态为就绪 (READY)。
    • 根据 m_runInScheduler 决定与调度器主协程或线程主协程交换上下文。
    • 使用 swapcontext 进行上下文切换,若失败则断言错误。


调度器类

封装一个N-M的协程调度器,内部有一个线程池,支持协程在线程池里面切换

主要成员变量:

std::string m_name //协程调度器的名称,用于标识当前调度器实例
MutexType m_mutex //用于保护调度任务和其他共享资源的互斥锁。确保在多线程环境下的线程安全。
std::vector<Thread::ptr> m_threads //线程池中的线程对象。Thread::ptr 应该是 Thread 类的智能指针,表示线程池管理的所有线程。
std::list<ScheduleTask> m_tasks //存储调度任务的队列。任务可以是协程对象或者回调函数。
std::vector<int> m_threadIds //线程池中每个线程的ID。用于区分每个线程的标识。
size_t m_threadCount //线程池中的线程数量,不包括主线程(如果 use_caller 为 true 时)。
std::atomic<size_t> m_activeThreadCount //活跃线程的数量,通过原子操作保证线程安全。
std::atomic<size_t> m_idleThreadCount //空闲线程的数量,用于调度器判断是否有可用线程。
bool m_useCaller //是否将当前线程作为调度器线程。若为 true,表示当前线程也参与调度。
Fiber::ptr m_rootFiber //如果 use_caller 为 true,该成员变量表示当前线程的主协程
int m_rootThread //表示主协程所运行的线程ID(如果 use_caller 为 true 时)
bool m_stopping //用于表示调度器是否正在停止。如果是 true,表示调度器正在停止或已停止。

主要成员函数:

Scheduler(size_t threads = 1, bool use_caller = true, const std::string &name = "Scheduler") //构造函数,初始化调度器
virtual ~Scheduler() //析构函数,用于清理调度器资源
const std::string &getName() const //获取调度器的名称
static Scheduler *GetThis() //获取当前线程所属的调度器实例指针。通常用于协程或线程内部访问当前调度器。
static Fiber *GetMainFiber() //获取当前线程的主协程。


//调度任务,可以是协程对象或回调函数。
//此方法会将任务添加到调度队列中,并根据线程编号将任务分配给特定线程。
//如果指定了线程号,任务将被分配给该线程;如果线程号为 -1,则任务可以在任意线程上运行。
template <class FiberOrCb> void schedule(FiberOrCb fc, int thread = -1)

void start() //启动调度器,开始调度协程。
void stop() //停止调度器,在所有任务执行完成后才会返回。通常用于优雅地关闭调度器

virtual void tickle() //用于通知调度器有任务要调度。通常在有新任务时被调用,用于唤醒空闲线程或协程。
void run() //调度器的主调度函数,负责从任务队列中取出任务并执行。
virtual void idle() //当没有任务可以调度时,调度器会进入 idle 状态,执行空闲协程或任务。
virtual bool stopping() //判断调度器是否处于停止状态。用于判断是否可以继续调度新任务。
void setThis() //设置当前线程为调度器的线程,用于当前线程成为调度器的一部分。

//判断是否有空闲线程。
//当调度协程进入 idle 时,空闲线程数会增加;
//当协程从 idle 返回时,空闲线程数减少。
bool hasIdleThreads() 

struct ScheduleTask //这是一个内嵌的结构体,用于表示一个调度任务。它包含以下成员:
- Fiber::ptr fiber:协程对象,表示要调度的协程。
- std::function<void()> cb:回调函数,用于调度的任务。
- int thread:指定该任务在哪个线程上运行(-1 表示任意线程)。
- ScheduleTask(Fiber::ptr f, int thr):构造函数,接收协程和线程ID。
- ScheduleTask(std::function<void()> f, int thr):构造函数,接收回调函数和线程ID。
- void reset():重置任务的成员变量。

下面来看一下协程调度模块的全局变量和线程局部变量,这里只有以下两个线程局部变量:
// 当前线程的调度器,同⼀个调度器下的所有线程指同同⼀个调度器实例
static thread_local Scheduler *t_scheduler = nullptr;
// 当前线程的调度协程,每个线程都独有⼀份,包括caller线程
static thread_local Fiber *t_scheduler_fiber = nullptr;
t_scheduler_fiber 保存当前线程的调度协程,加上前面协程模块的 t_fiber 和 t_thread_fiber ,每个线程总共可以记录三个协程的上下文信息。

调度器的构造方法

这个构造函数有三个参数:线程数 (threads),是否使用调用者线程作为调度线程 (use_caller),以及调度器的名称 (name)。

  • 参数检查:使用SYLAR_ASSERT(thread > 0)来确保传入的线程数至少为1.
  • 成员变量初始化:m_useCaller 初始化为传入的 use_caller 参数;m_name 设置为传入的名称 name。
  • 调整线程数量:如果 use_callertrue,意味着调用者的线程也将被用作调度线程之一。因此,线程数 threads 减一(--threads),以便为调用者线程腾出位置。
  • 设置当前调度器:在 use_callertrue 的情况下,执行以下步骤:sylar::Fiber::GetThis() 被调用,用来获取或初始化当前线程的主协程;SYLAR_ASSERT(GetThis() == nullptr) 确保当前线程还没有关联的调度器。这是为了防止一个线程被多次用作调度线程;t_scheduler = this 将当前调度器实例设置为当前线程的调度器。
  • 初始化调用者的根协程:m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, false)) 创建一个新的协程,它的执行函数是调度器的 run() 方法。这个根协程在调用者线程中执行,但不会被调度器调度。当根协程停止时,它应该返回到调用者线程的主协程。
  • 线程名称和ID设置:sylar::Thread::SetName(m_name) 设置当前线程的名称;t_scheduler_fiber = m_rootFiber.get() 设置全局指针指向根协程,以便在当前线程中访问它;m_rootThread = sylar::GetThreadId() 获取当前线程的ID,并保存;m_threadIds.push_back(m_rootThread) 将当前线程的ID添加到线程ID数组中
  • 其他情况:如果 use_callerfalse,则 m_rootThread 设置为 -1,表示没有使用调用者线程作为调度线程。
  • 最后,设置 m_threadCount 为调整后的线程数量。
/**
* @brief 创建调度器
* @param[in] threads 线程数
* @param[in] use_caller 是否将当前线程也作为调度线程
* @param[in] name 名称
*/
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string &name) {
 SYLAR_ASSERT(threads > 0);
 m_useCaller = use_caller;
 m_name = name;
 if (use_caller) {
 --threads;
 sylar::Fiber::GetThis();
 SYLAR_ASSERT(GetThis() == nullptr);
 t_scheduler = this;
 /**
 * 在user_caller为true的情况下,初始化caller线程的调度协程
 * caller线程的调度协程不会被调度器调度,⽽且,caller线程的调度协程停⽌时,应该返回caller线
程的主协程
 */
 m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, false));
 sylar::Thread::SetName(m_name);
 t_scheduler_fiber = m_rootFiber.get();
 m_rootThread = sylar::GetThreadId();
 m_threadIds.push_back(m_rootThread);
 } else {
 m_rootThread = -1;
 }
 m_threadCount = threads;
}
Scheduler *Scheduler::GetThis() {
 return t_scheduler;
}

调度器的start方法

如果只使⽤ caller 线程进⾏调度,那这个⽅法啥也不做
void Scheduler::start() {
 SYLAR_LOG_DEBUG(g_logger) << "start"; //日志输出调试信息
 MutexType::Lock lock(m_mutex); //加锁保护临界区
 if (m_stopping) { //检查调度器是否已停止
 SYLAR_LOG_ERROR(g_logger) << "Scheduler is stopped";
 return;
 }
 SYLAR_ASSERT(m_threads.empty()); //确认没有线程池,这意味着调度器在启动之前还没有创建线程池。这是为了防止重复创建线程池。
 m_threads.resize(m_threadCount); //调整线程池大小
 for (size_t i = 0; i < m_threadCount; i++) { //循环遍历线程池
//创建线程:对于每个线程,调用 std::bind(&Scheduler::run, this) 将调度器的 run() 方法与当前线程绑定,作为线程的执行入口。
//每个线程都有一个唯一的名称,名称由 m_name + "_" + std::to_string(i) 构成,例如 "Scheduler_0"、"Scheduler_1" 等。
 m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this),
 m_name + "_" + std::to_string(i)));
//保存线程ID:使用 m_threadIds.push_back(m_threads[i]->getId()) 将每个新创建线程的 ID 添加到 m_threadIds 数组中,便于后续管理。
 m_threadIds.push_back(m_threads[i]->getId());
 }
}

调度协程的实现run

内部有一个while(true)循环,不停地从任务队列取任务并执行,由于Fiber类改造过,所以每个被调度器执行的协程在结束时都会回到调度协程。当任务队列为空时,代码会进idle协程,但idle协程啥也不做直接就yield了,状态还是ready,所以这里其实就是个忙等待,CPU占用率爆炸,只有当调度器检测到停⽌标志时,idle协程才会真正结束,调度协程也会检测到idle协程状态为TERM,并且随之退出整个调度协程。这里还可以看出⼀点,对于⼀个任务协程,只要其从resume中返回了,那不管它的状态是TERM还是READY,调度器都不会⾃动将其再次加⼊调度,因为前⾯说过,⼀个成熟的协程是要学会自我管理的。

void Scheduler::run() {
 SYLAR_LOG_DEBUG(g_logger) << "run";

//调用 setThis 方法,将当前线程设置为调度器的执行线程。
//通常,setThis 会记录调度器的当前线程信息,以便之后可以通过 Scheduler::GetThis() 获取当前线程的调度器实例。
 setThis(); 

//如果当前线程的线程ID不是根线程(即不是启动调度器的线程)
//则将当前协程设置为根协程 t_scheduler_fiber,这样当协程运行时会指向当前线程中的调度协程。
 if (sylar::GetThreadId() != m_rootThread) {
 t_scheduler_fiber = sylar::Fiber::GetThis().get();
 }
 Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));
 Fiber::ptr cb_fiber;
 ScheduleTask task;


 while (true) {
 task.reset();
 bool tickle_me = false; // 是否tickle其他线程进⾏任务调度
 {
 MutexType::Lock lock(m_mutex);
 auto it = m_tasks.begin();
 // 遍历所有调度任务
 while (it != m_tasks.end()) {
 if (it->thread != -1 && it->thread != sylar::GetThreadId()) {
 // 指定了调度线程,但不是在当前线程上调度,标记⼀下需要通知其他线程进⾏调度,然
后跳过这个任务,继续下⼀个
 ++it;
 tickle_me = true;
 continue;
 }

 // 找到⼀个未指定线程,或是指定了当前线程的任务 SYLAR_ASSERT(it->fiber || it->cb);
 if (it->fiber) {
 // 任务队列时的协程⼀定是READY状态,谁会把RUNNING或TERM状态的协程加⼊调度呢?
 SYLAR_ASSERT(it->fiber->getState() == Fiber::READY);
 }
 // 当前调度线程找到⼀个任务,准备开始调度,将其从任务队列中剔除,活动线程数加1
 task = *it;
 m_tasks.erase(it++);
 ++m_activeThreadCount;
 break;
 }

 // 当前线程拿完⼀个任务后,发现任务队列还有剩余,那么tickle⼀下其他线程
 tickle_me |= (it != m_tasks.end());
 }
 if (tickle_me) {
 tickle();
 }

 if (task.fiber) {
 // resume协程,resume返回时,协程要么执⾏完了,要么半路yield了,总之这个任务就算完成
了,活跃线程数减⼀
 task.fiber->resume();
 --m_activeThreadCount;
 task.reset();
 } else if (task.cb) {
 if (cb_fiber) {
 cb_fiber->reset(task.cb);
 } else {
 cb_fiber.reset(new Fiber(task.cb));
 }
 task.reset();
 cb_fiber->resume();
 --m_activeThreadCount;
 cb_fiber.reset();
 } else {
 // 进到这个分⽀情况⼀定是任务队列空了,调度idle协程即可
 if (idle_fiber->getState() == Fiber::TERM) {
 // 如果调度器没有调度任务,那么idle协程会不停地resume/yield,不会结束,如果idle协
程结束了,那⼀定是调度器停⽌了
 SYLAR_LOG_DEBUG(g_logger) << "idle fiber term";
 break;
 }
 ++m_idleThreadCount;
 idle_fiber->resume();
 --m_idleThreadCount;
 }
 }
 SYLAR_LOG_DEBUG(g_logger) << "Scheduler::run() exit";
}

1. 日志输出

输出调试日志,表明调度器的 run 方法开始执行。

2. 设置当前线程的调度器

调用 setThis 方法,将当前线程设置为调度器的执行线程。通常,setThis 会记录调度器的当前线程信息,以便之后可以通过 Scheduler::GetThis() 获取当前线程的调度器实例。

3. 检查当前线程是否为根线程

如果当前线程的线程ID不是根线程(即不是启动调度器的线程),则将当前协程设置为根协程 t_scheduler_fiber,这样当协程运行时会指向当前线程中的调度协程。

4. 创建空闲协程

创建一个空闲协程 idle_fiber,它的任务是执行 Scheduler::idle 方法。如果没有任务调度,它将执行 idle 方法。

5. 初始化协程和任务

  • cb_fiber 用来存储回调任务的协程。
  • task 用来存储当前正在执行的调度任务。

6. 主调度循环

  • task.reset() 每次开始一个新的调度周期时重置任务。
  • tickle_me 是一个标志,用于标记是否需要唤醒其他线程进行任务调度。

7. 锁定互斥量并遍历任务队列

  • MutexType::Lock lock(m_mutex)m_mutex 进行加锁,保护 m_tasks 任务队列的并发访问。
  • 遍历任务队列 m_tasks,检查任务是否被分配到当前线程。如果任务指定了线程且不是当前线程,则跳过该任务,并设置 tickle_me = true,表示需要通知其他线程进行任务调度。

8. 处理调度任务

  • 如果任务是一个协程对象,首先确保它的状态是 READY,即协程已经准备好被调度。
  • 将当前任务从队列中移除,存储到 task 中。
  • 增加活跃线程数 m_activeThreadCount,表示当前有一个线程正在处理任务。
  • 跳出任务遍历循环,准备执行任务。

9. 唤醒其他线程调度任务

如果任务队列中仍然有任务,设置 tickle_metrue,表示需要唤醒其他线程来继续处理剩余的任务。

10. 唤醒其他线程

如果 tickle_metrue,调用 tickle() 方法通知其他线程进行调度,可能是唤醒空闲线程执行任务。

11. 执行任务

1. 执行协程任务
  • 如果 task 包含一个协程(task.fiber),则恢复协程执行。
  • 协程执行完后,减少活跃线程数,并重置任务。
2. 执行回调函数
  • 如果任务是一个回调函数(task.cb),则创建一个新的协程来执行回调任务。
  • 如果 cb_fiber 已经创建,则重置它;否则,创建一个新的协程。
  • 执行回调任务后,减少活跃线程数,并重置回调协程。
3. 执行空闲协程
  • 如果任务队列为空,执行空闲协程 idle_fiber
  • 如果空闲协程的状态为 TERM(即结束),则退出调度循环,表示调度器停止。
  • 否则,恢复空闲协程的执行,等待下一次任务调度。

12. 调度器退出日志

当调度器退出时,记录日志信息,表明 run() 方法已经退出。

调度协程的stop方法

在使⽤了 caller 线程的情况下,调度器依赖 stop ⽅法来执⾏ caller 线程的调度协程,如果调度器只使⽤了caller 线程来调度,那调度器真正开始执⾏调度的位置就是这个 stop方法。
void Scheduler::stop() {
    // 记录调度器停止操作的调试日志
    SYLAR_LOG_DEBUG(g_logger) << "stop";

    // 检查调度器是否已经在停止状态,如果是,则无需重复停止
    if (stopping()) {
        return;
    }

    // 设置调度器的停止标志,防止后续继续调度任务
    m_stopping = true;

    /// 如果使用调用者线程作为调度线程,确保停止操作由调用者线程发起
    if (m_useCaller) {
        // 断言当前线程的调度器实例是当前对象,确保由调用者线程调用stop
        SYLAR_ASSERT(GetThis() == this);
    } else {
        // 断言当前线程的调度器实例不是当前对象,确保由调度线程调用stop
        SYLAR_ASSERT(GetThis() != this);
    }

    // 循环调用tickle(),唤醒所有调度线程,确保它们能够检测到停止信号并退出
    for (size_t i = 0; i < m_threadCount; i++) {
        tickle();
    }

    // 如果存在根协程(即使用了调用者线程作为调度线程),再调用一次tickle()
    if (m_rootFiber) {
        tickle();
    }

    /// 在使用调用者线程的情况下,恢复根协程的执行,使其能够检测到停止信号并退出
    if (m_rootFiber) {
        m_rootFiber->resume(); // 恢复根协程的执行
        SYLAR_LOG_DEBUG(g_logger) << "m_rootFiber end"; // 记录根协程结束的日志
    }

    // 创建一个临时线程指针容器,用于存储需要等待结束的线程
    std::vector<Thread::ptr> thrs;
    {
        // 使用互斥锁保护对线程池的访问,确保线程安全
        MutexType::Lock lock(m_mutex);
        thrs.swap(m_threads); // 交换线程池和临时容器,快速获取所有线程指针
    }

    // 遍历临时容器中的所有线程,等待它们完成执行
    for (auto &i : thrs) {
        i->join(); // 调用线程的join方法,阻塞直到线程结束
    }
}

标签:协程库,协程,主协程,caller,调度,任务,线程
From: https://blog.csdn.net/weixin_45962681/article/details/144234794

相关文章

  • 调度器69—ENQUEUE/DEQUEUE flags
    基于msm-4.14一、简介1.在enqueue_task/dequeue_task向就绪队列插入和移除任务的时候,通过flags参数判断是由于什么原因触发的enqueue和dequeue,并进行不同的响应。2.相关函数://kernel/sched/core.cstaticinlinevoidenqueue_task(structrq*rq,structtask_stru......
  • 进程、线程、协程的关系
    系统和多个线程的关系:一个系统内可以创建多个进程,一个进程可以类比为一个应用程序,一个进程内可以创建多个线程,协程是Go语言首创,通过在一个线程内代理当前线程的所有系统的调度权,模拟出多个子线程——称作协程,以达到优化效率的目的。线程和协程的关系:协程Coroutine的精髓......
  • Django-Q设置Django调度和定时任务
    场景说明:Django项目需要实现每30秒刷新一次首页信息 1.安装库pipinstalldjango-q2.添加appINSTALLED_APPS=[#...其他应用程序...'django_q',]3.配置Q_CLUSTER={'name':'project','workers':4,......
  • 不修改内核镜像的情况下,使用内核模块实现高效监控调度时延
    一、背景在之前的博客调度时延的观测_csdn调度时延的观测杰克崔-CSDN博客 里,我们讲了多种监控调度时延的方法,有依靠系统现有节点来监控,但是依赖系统现有节点做不到每个单词调度时延的监控,也讲了通过修改内核代码,在内核计算调度时延的地方加逻辑去监控,这里说的加逻辑也可以......
  • 流水线并行,重计算:GPipe;1F1B(一前一后)调度机制
    目录GPipe一、GPipe的背景与目的二、GPipe的功能与特点三、GPipe的应用与效果四、GPipe的开源与可扩展性1F1B(一前一后)调度机制一、背景与基本概念二、1F1B调度机制的要求三、应用与挑战GPipe是一个基于Lingvo(Lingvo是Google基于TensorFlow二次开发的,重点针对序列......
  • 【微电网】基于改进粒子群算法的微电网优化调度(Matlab代码实现)
    ......
  • golang: 用协程异步写日志
    一,代码1,全局文件://日志消息结构体typeLogMessagestruct{ Levelstring Messagestring}//通道varLogChanchanLogMessage//日志文件句柄varGlobalLogFile*os.File//异步日志函数funcAsyncLog(logChchanLogMessage){ for{ select{ casems......
  • 企业级数据安全-CDH集群-dolphinscheduler海豚调度一站式数据安全技术实战2025
    企业级数据安全-CDH集群-dolphinscheduler海豚调度一站式数据安全技术实战2025,为了配合集团上市数据审计要求,我们在公司内部升级了数据安全等级。一:CDH技术框架,这也是通用企业级的数据平台框架。   2024-11月  测试环境  上 kerberos 认证 1.1 修改拷贝......
  • 装配线调度题解
    装配线调度题解初始化:F[1][1]=e1+a1,1F[2][1]=e2+a2,1对于第i个装配站(3<=i<=n),我们可以考虑两种情况:在第1条装配线上完成第i个装配站的最短总时间:F[1][i]=min(F[1][i-1]+a1,i,F[2][i-1]+a1,i+t2,i-1)在第2条装配线上完成第i个装配站的最短总时间:F[2][......
  • 十一、Kubernetes调度-亲和力与反亲和力-Affinity-AntiAffinity
    亲和力Affinity存在的问题:某些Pod优先选择有ssd=true标签的节点,如果没有在考虑部署到其它节点,如其他没有标签,会pending;某些Pod需要部署在ssd=true和type=physical的节点上,但是优先部署在ssd=true的节点上;会有优先级;同一个应用的Pod不同的副本或者同一个项目的应用尽......