beanstalkd源码解析(1) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C ###简介 Beanstalkd是一个简单、高效的工作队列系统,其最初设计目的是通过后台异步执行耗时任务方式降低高容量Web应用的页面延时。而其简单、轻量、易用等特点,和对任务优先级、延时 超时重发等控制,以及众多语言版本的客户端的良好支持,使其可以很好的在各种需要队列系统的场景中应用。 其采用的是生产者、消费者模式,借鉴了memcached的设计,协议也很简单。先了解下其基础概念: #####job - 任务 job是一个需要异步处理的任务,是Beanstalkd中的基本单元,job需要放在一个tube中。Beanstalkd中的任务(job)类似于其它队列系统中的消息(message)的概念。 #####tube - 管道 管道即某一种类型的任务队列,其类似与消息的主题(topic),是Producer和Consumer的操作对象。一个Beanstalkd中可以有多个管道, 每个管道都有自己的发布者(Producer)和消费者Consumer,管道之间互相不影响。 #####producer - 生产者 任务(job)的生产者,通过put命令来将一个job放到一个tube中。 #####consumer - 消费者 任务(job)的消费者,通过reserve、release、bury、delete命令来获取或改变job的状态。 在网络上找了一副图,描述的是其状态流转情况:  一个Beanstalkd任务可能会包含以下状态: **READY** - 需要立即处理的任务。当producer直接put一个任务时,任务就处于READY状态,以等待consumer来处理。当延时 (DELAYED) 任务到期后会自动成为当前READY状态的任务 **DELAYED** - 延迟执行的任务。当任务被延时put时,任务就处于DELAYED状态。等待时间过后,任务会被迁移到READY状态。当消费者处理任务后,可以用将消息再次放回DELAYED队列延迟执行 **RESERVED** - 已经被消费者获取,正在执行的任务。当consumer获取了当前READY的任务后,该任务的状态就会迁移到RESERVED状态,这时其它的consumer就不能再操作该任务。Beanstalkd会检查任务是否在TTR(time-to-run)内完成 **BURIED** - 保留的任务,这时任务不会被执行,也不会消失。当consumer完成该任务后,可以选择delete、release或者bury操作。 delete后,任务会被删除,生命周期结束;release操作可以重新把任务状态迁移回READY状态或DELAYED状态,使其他consumer可以继续获取和执行该任务。bury会拔任务休眠,等需要该任务时,再将休眠的任务kick回READY;也可能过delete删除BURIED状态的任务 **DELETED** - 消息被删除,Beanstalkd不再维持这些消息。即任务生命周期结束。 ###分析 请原谅我罗里吧嗦得说了那么多,现在直接进入主题。我一般看源码都是带着问题去看,这样效率会高很多,这次也不例外,先提出几个问题吧。 - **beanstalkd如何维护job的状态?** - **delay状态的job怎么修改为ready?** - **如何实现优先级?** - **数据是如何持久化?** 对于问题1可以关注use,put,reserve,delete等命令的处理逻辑 ######use 是生产者命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step over * frame #0: 0x000000010000c011 beanstalkd`make_and_insert_tube(name="abc") at tube.c:70 frame #1: 0x000000010000bfd9 beanstalkd`tube_find_or_make(name="abc") at tube.c:96 frame #2: 0x00000001000087d0 beanstalkd`dispatch_cmd(c=0x0000000100400430) at prot.c:1554 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100400430) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100400430) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100400430) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100400430, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ######跟踪put命令 * * (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = breakpoint 6.1 * frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100203b90, delay=0, update_store='\x01') at prot.c:466 frame #1: 0x00000001000070bf beanstalkd`enqueue_incoming_job(c=0x0000000100203850) at prot.c:867 frame #2: 0x0000000100006dfb beanstalkd`maybe_enqueue_incoming_job(c=0x0000000100203850) at prot.c:1160 frame #3: 0x000000010000757f beanstalkd`dispatch_cmd(c=0x0000000100203850) at prot.c:1279 frame #4: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100203850) at prot.c:1686 frame #5: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100203850) at prot.c:1726 frame #6: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100203850) at prot.c:1868 frame #7: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100203850, ev=114) at prot.c:1880 frame #8: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #9: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #10: 0x00007fff6df26085 libdyld.dylib`start + 1 put命令主要是在ready堆中插入job ```c frame #0: 0x0000000100004f99 beanstalkd`enqueue_job(s=0x00000001000104b8, j=0x0000000100500370, delay=0, update_store='\x01') at prot.c:466 463 { 464 int r; 465 -> 466 j->reserver = NULL; 467 if (delay) { 468 //任务开始执行时间 469 j->r.deadline_at = nanoseconds() + delay; Target 0: (beanstalkd) stopped. (lldb) l 470 r = heapinsert(&j->tube->delay, j); 471 if (!r) return 0; 472 //设置为被等待执行状态 473 j->r.state = Delayed; 474 } else {//立即执行的任务就投递到预备队列 475 r = heapinsert(&j->tube->ready, j); 476 if (!r) return 0; (lldb) l 477 //设置状态为预备执行状态 478 j->r.state = Ready; 479 ready_ct++; 480 481 //对于紧急任务,进行额外的跟踪处理 482 if (j->r.pri < URGENT_THRESHOLD) { 483 global_stat.urgent_ct++; (lldb) l 484 j->tube->stat.urgent_ct++; 485 } 486 } ``` ######reserve命令 (lldb) bt * thread #1, queue = 'com.apple.main-thread', stop reason = step in * frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 frame #1: 0x00000001000097f0 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 frame #2: 0x00000001000079cd beanstalkd`dispatch_cmd(c=0x0000000100500420) at prot.c:1358 frame #3: 0x0000000100006ca5 beanstalkd`do_cmd(c=0x0000000100500420) at prot.c:1686 frame #4: 0x00000001000067ca beanstalkd`conn_data(c=0x0000000100500420) at prot.c:1726 frame #5: 0x0000000100006623 beanstalkd`h_conn(fd=6, which=114, c=0x0000000100500420) at prot.c:1868 frame #6: 0x0000000100005968 beanstalkd`prothandle(c=0x0000000100500420, ev=114) at prot.c:1880 frame #7: 0x000000010000bb75 beanstalkd`srvserve(s=0x00000001000104b8) at serv.c:63 frame #8: 0x000000010000e19f beanstalkd`main(argc=1, argv=0x00007ffeefbff370) at main.c:100 frame #9: 0x00007fff6df26085 libdyld.dylib`start + 1 ```c 1344 case OP_RESERVE: /* FALLTHROUGH */ 1345 /* don't allow trailing garbage */ -> 1346 if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) { 1347 return reply_msg(c, MSG_BAD_FORMAT); 1348 } 1349 Target 0: (beanstalkd) stopped. (lldb) l 1350 op_ct[type]++; 1351 connsetworker(c); 1352 1353 if (conndeadlinesoon(c) && !conn_ready(c)) { 1354 return reply_msg(c, MSG_DEADLINE_SOON); 1355 } 1356 (lldb) l 1357 /* try to get a new job for this guy */ 1358 wait_for_job(c, timeout); 1359 process_queue(); 1360 break; ``` 把当前conn先加入到waitting集合中 ```c * thread #1, queue = 'com.apple.main-thread', stop reason = step over frame #0: 0x00000001000097e7 beanstalkd`wait_for_job(c=0x0000000100500420, timeout=3600) at prot.c:1024 1021 wait_for_job(Conn *c, int timeout) 1022 { 1023 c->state = STATE_WAIT; -> 1024 enqueue_waiting_conn(c); 1025 1026 /* Set the pending timeout to the requested timeout amount */ 1027 c->pending_timeout = timeout; Target 0: (beanstalkd) stopped. (lldb) s Process 40697 stopped * thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x000000010000ac0c beanstalkd`enqueue_waiting_conn(c=0x0000000100500420) at prot.c:687 684 tube t; 685 size_t i; 686 -> 687 global_stat.waiting_ct++; 688 c->type |= CONN_TYPE_WAITING; 689 for (i = 0; i < c->watch.used; i++) { 690 t = c->watch.items[i]; Target 0: (beanstalkd) stopped. (lldb) l 691 t->stat.waiting_ct++; //加入到tube的等待集合中 692 ms_append(&t->waiting, c); 693 } 694 } ``` 状态从ready到reserve ```c thread #1, queue = 'com.apple.main-thread', stop reason = step in frame #0: 0x0000000100006030 beanstalkd`reserve_job(c=0x0000000100500420, j=0x0000000100203b90) at prot.c:380 377 static void 378 reserve_job(Conn *c, job j) 379 { -> 380 j->r.deadline_at = nanoseconds() + j->r.ttr; 381 global_stat.reserved_ct++; /* stats */ 382 j->tube->stat.reserved_ct++; 383 j->r.reserve_ct++; Target 0: (beanstalkd) stopped. (lldb) l //改变状态为reserved 384 j->r.state = Reserved; //插入到conn的reserved_jobs堆中 385 job_insert(&c->reserved_jobs, j); 386 j->reserver = c; 387 c->pending_timeout = -1; 388 if (c->soonest_job && j->r.deadline_at < c->soonest_job->r.deadline_at) { 389 c->soonest_job = j; 390 } (lldb) l 391 return reply_job(c, j, MSG_RESERVED); 392 } ``` ######简单看下delete操作 ```c case OP_DELETE: -> 1363 errno = 0; 1364 id = strtoull(c->cmd + CMD_DELETE_LEN, &end_buf, 10); 1365 if (errno) return reply_msg(c, MSG_BAD_FORMAT); 1366 op_ct[type]++; Target 0: (beanstalkd) stopped. (lldb) l 1367 //查找到job后,从几个堆中进行移除操作 1368 j = job_find(id); 1369 j = remove_reserved_job(c, j) ? : 1370 remove_ready_job(j) ? : 1371 remove_buried_job(j) ? : 1372 remove_delayed_job(j); 1373 (lldb) l 1374 if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD); 1375 1376 j->tube->stat.total_delete_ct++; 1377 1378 j->r.state = Invalid; 1379 r = walwrite(&c->srv->wal, j); 1380 walmaint(&c->srv->wal); (lldb) l //最后进行释放操作 1381 job_free(j); ``` ######对第一个问题做下简单总结: 1.ready,delay堆都在tube对象进行维护,而reserved堆则在conn对象进行维护。 2.消费客户端连接都先放到对应watch的tube等待集合中(tube.waiting),在process_queue函数中进行统一消费,其用伪代码如下描述,这段是比较关键的部分 ```python while j = next_eligible_job()://从优先堆中取出job job_remove(tube.ready, j)//ready状态堆中移除job job_insert(conn.reserved, j)//把job插入客户端连接的reserved集合中 ``` 下一篇再说第二个问题... 标签: Beanstalkd, Epoll, 队列, 源码, memcached
评论已关闭