Redis Hash原理 作者: nbboy 时间: 2022-02-27 分类: 默认分类,C 评论 > 分析的Redis版本基于6.0 ## Redis Hash #### 应用场景 哈希列表结构是一般在编程上非常重要的结构,因为它的查询性能接近O(1),所以是一种非常高效的数据结构。我们平时用的字典(dict)往往都是通过哈希列表去实现的,而redis中自己实现的hash结构也非常重要。在redis中所有的键值都是通过hash结构去管理的,比如我们键入命令set a hello时,其实在redis server用结构redisDb->dict来进行维护,其键和值都是redisObject: ```c typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ ... } redisDb; ``` #### Hash结构 redis hash结构和普通的hash结构还是非常像的,但是因为它有一个渐进式hash过程(下面会讲),所以它其实有两个hash table。 ```c //字典采用链表处理冲突,而且在数据过多的时候会自动进行渐进式重新hash typedef struct dict { //一些函数指针 dictType *type; void *privdata; //rehash的oldhash和newhash dictht ht[2]; //rehash的进度(hash槽索引), 如果是-1, 则表示没有进行rehash, 否则就是在rehash long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ } dict; typedef struct dictht { //指向hash槽的头部 dictEntry **table; //分配的hash槽总数量 unsigned long size; //计算hash槽的掩码值, 它等于size - 1 unsigned long sizemask; //在用的hash槽数量 unsigned long used; } dictht; //字典的每个元素 typedef struct dictEntry { //元素的健 void *key; //指向的值,或是指向一个对象,或者数值... union { void *val; uint64_t u64; int64_t s64; double d; } v; //采用链表的方式,next指向hash槽内的下个元素 struct dictEntry *next; } dictEntry; ``` 比如size=6的hash表按照字母顺序进行编码(比如a:1,b:2,c:3,d:4,e:5,f:6,g:1...),并且插入到hash列表中后的图示如下:  #### 键冲突处理 我们知道所有的hash列表都会冲突,冲突是由于key被hash到一个bucket(有些资料里叫hash槽)里引起的。Redis采用如下的技术处理键的冲突: - 选择好的Hash函数,尽量让键分布均匀 - 用拉链的方式处理冲突,即使键值过载,也会被分散到bucket内的链表里 - 设定一个阈值,如果元素过载,则会进行自动扩容 散列表的处理冲突方式一般有两种,开放定址和拉链法,而Redis采用的是拉链法,实现上用链表来管理冲突的键值对。Redis在Hash函数的选取上则用的是SipHash,这种算法有较强的安全性,可以用来防御“Hash-flooding”。 #### 扩容/缩容 当随着元素的增加,如果Hash列表不进行扩张,则必然会增加单个Bucket的元素数量,而这会导致命中该Bucket的检索操作退化到查询列表的操作,而这是不能被接受的,所以Redis设计了自动扩容的机制。 扩容是在添加操作完成的,当元素使用比例过了设定的阈值,则进行自动扩容: ```c //超过阈值,开始进行hash扩展,扩展长度为两倍,里面会进行微调整 if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0].used*2); } ``` Redis扩容本身并不复杂,它会把散列的空间扩大到现有元素的2倍左右大小。相比扩容,缩容采用了更加主动的方式,采用定时器进行周期性的空间回收,具体操作在tryResizeHashTables函数,收缩的条件为空间利用率下降为10%,就会进行一次收缩,收缩过后的大小刚好大于原有键值对数量的2的n次。([dictResize](https://github.com/redis/redis/blob/6.0/src/dict.c#L135 "dictResize")) ```c //低于10%的时候,会进行缩容 int htNeedsResize(dict *dict) { long long size, used; size = dictSlots(dict); used = dictSize(dict); //use/size < 0.10 return (size > DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL)); } ``` #### 渐进式Rehash 作为hash列表,已有的功能都是完整的,但是作为高性能的Redis服务器如果只是容许暂停服务进行rehash,那将是不可承受的。Redis在设计上采用了渐进式的结构,也就是在rehash阶段,会有两个散列表存在,一个用来存目前用的健值对ht1,另外一个则是新扩张后的ht2。在这个阶段两个ht都需要去查找和更新,当完成这个阶段只需要考虑ht1。 可以用一个例子来图示一下,假设为了方便演示,resize比例为1的时候就进行扩张,初始化hash大小为2个Bucket,插入3个键值对后的hash状态如下:  这时候,其实hash列表里的键值对已经达到扩容的零界点,当请求插入l=33的时候,根据used/size=4/2=2>1(dict_force_resize_ratio ),所以会分配一个2倍空间(2*used)大小给新的ht1,并且启动**渐进式hash迁移**。而在hash迁移期间,所有的插入请求的键值对都会放到新的ht1。  当我们请求删除e为健的键值对时,其实它会进行两步操作(其实如果一旦进入hash迁移,则对于查找/添加/替换都会有下面的第一步): 1. 依次获取ht0的一个bucket,并且迁移到ht1中 2. 在ht0和ht1中查找是否有健,如果找到的话进行删除  *图画的不是很好,表达的是2个键值对迁移过来后,其中一个被删除(画叉的是被删除,画加号的是被正常迁移过来,顺序是先迁移,后删除)* 这时候如果来个检索操作,比如检索b为健的值是多少,同样检索操作也需要执行一次迁移操作,也即: 1. 依次获取ht0的一个bucket,并且迁移到ht1中 2. 在ht0和ht1中查找是否有健,如果找到的话返回 对于这里的例子,本次操作会迁移Bucket2,操作后会使ht0没有键值对(used==0)。而当ht0为空时候,ht1被作为新的ht0(ht1指针给ht0),ht1清空作为下一次迁移用,本轮迁移标记rehashidx置空。  需要说明的是如果在迁移阶段进行检索,并且第一步之后迁移也没有完成,则查找的时候会对h0和h1两个hash表都进行检索,因为键值有可能在h0也有可能在h1。 最后再补充一点,渐进式迁移不光光在查找/添加/替换等操作里有,而且在定时任务里也会给予一定的时间(1ms)进行迁移。 ### 参考 《算法》散列表部分 《数据结构与算法之美》散列表部分
Redis Sds原理 作者: nbboy 时间: 2022-02-24 分类: 默认分类,C 评论 > 分析的Redis版本基于6.0 ## Redis Sds #### 应用场景 sds会应用在很多地方,可以说在redis中所有需要用到string的地方都会用到sds。特别的,我们会想到字符串命令的key和value底层都是用sds存储的[(stringCommand)][1]。可以先来看一个redis命令: ```bash set a hello ``` 其实该命令会在服务端接受后被存储到robj对象中,key和value都是如此,该结构有如下的结构: ```c /* The actual Redis Object */ #define OBJ_STRING 0 /* String object. */ #define OBJ_LIST 1 /* List object. */ #define OBJ_SET 2 /* Set object. */ #define OBJ_ZSET 3 /* Sorted set object. */ #define OBJ_HASH 4 /* Hash object. */ typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ int refcount; void *ptr; } robj; ``` 其中type就表示的是什么类型,对于字符串命令就是0,当然还有很多类型,定义在OBJ_XXX常量里,这里不做展开。Redis是C语言开发的,而对于C给带来高性能的同时也会带来一些字符串安全的问题,很多0Day都是通过栈空间的溢出来做文章,所以作者在设计sds的时候考虑到了内存安全性,后面会具体看到这部分设计。 #### Sds的结构 当用户键入上面set a hello命令的时候,值会存入sds最终如下图所示:  Buf指向的值才是最终的字符串内容,并且以\0作为结束,这个和C语言的字符串是一样的。Len就是Buf指向的字符串的长度,所以如果计算字符串长度不需要每次调用strlen,而是直接返回Len就可以,计算成本为O(1)!Alloc为sds预分配的长度,一般情况下它比Len要大一点,具体细节下面会说。 sds根据不同的字符串长度,会使用不同的结构去管理,比如用32位可以表示的长度则为sdshdr32,其他结构也是类似的: ```c struct __attribute__ ((__packed__)) sdshdr32 { //字符串分配出去的长度 uint32_t len; /* used */ //分配的总长度 uint32_t alloc; /* excluding the header and null terminator */ //各种类型, sdshdr* unsigned char flags; /* 3 lsb of type, 5 unused bits */ //字符串真正的指向 char buf[]; }; ``` #### Sds的扩容和缩容 Sds的扩容也是会根据字符串的大小进行按需扩容,而且每次扩容都是按两倍大小进行扩容,用伪代码描述为:  当然有扩容也会有缩容,用户可以主动请求缩容,容量会缩小为刚好可以容下字符串大小,其过程类似,它是由sdsRemoveFreeSpace实现。 #### 内存安全性 因为sds不是以\0作为字符串的结束,而是以len来表示字符串的长度,所以它可以包含任何的二进制数据(包括\0)。 ## 参考的资料 《Redis设计与实现》 [1]: https://redis.io/commands#string "string command"
重读《Unix编程艺术》笔记 作者: nbboy 时间: 2021-10-01 分类: 软件架构,软件工程,设计模式,C,Golang 1 条评论 第一次记不清什么时候读的了,大概是在刚毕业那会,也没读懂多少,但是印象中只记得一句话:**一种是设计的极为简洁,以至于无法看到明显的缺陷;另外一种是设计的极为复杂,以至于有缺陷也看不出来。**作者引用这句的意思是尽量把软件设计得简洁,这样才能尽量减少BUG。 作者水平非常牛,可以看下参考资料里的链接,作者写过大量的开源软件,所以他说的有一定的参考意义。对书中我比较感兴趣的是设计原则部分和软件复杂度部分,就针对这两块记录下我的总结和理解。 ### Unix软件设计原则 1. 简洁原则:其实就是我们常说的KISS原则(Keep It Simple, Stupid) 2. 模块原则,组合原则:作者提倡用清晰的接口+简单的模块组合成复杂的软件 3. 清晰原则:有时候,尽量写一些很轻松就可以阅读的代码,而不是过度优化的代码,即使性能比前者好 4. 分离原则:容易变的和不容易变的需要分离 5. 透明原则:给模块或者程序预留测试,统计,监控接口,一切都在掌控之中。 6. 表示原则:尽量用数据结构去驱动,而不是逻辑 7. 通俗原则,默认原则:现在流行的说法就是约定优于配置,Java的SpringBoot就是把这种思想推到了极致,让新手入门比较简单 8. 补救原则:常说的Fail-Fast思想,作者还说了我们应该遵循宽容地收,谨慎地发的思想,这个想法不单单用在网络程序上,我们平时写方法也可以把它作为准则之一。 9. 经济原则:类似清晰原则,不要去写晦涩的优化代码,除非在性能严格场景,或者可以封装复杂度 10. 生成原则:尽量利用自动代码生成来代替手工代码 11. 优化原则:先设计一个简单的可以实现的原型,然后再去优化代码性能 12. 多样原则:更加包容的心态去接受新事物 13. 扩展原则:为将来可能的扩展预留接口 ### 好的程序特征 - **首先应该为程序选择恰当的数据结构,其次是算法,因为数据结构比算法更有表现力。** - 不需要过早的优化。 - 数据结构和算法应该尽量简单,拿不准就用枚举法。 - 紧凑性:减少冗余设计,可靠精简的核心算法,可以避免出现一些边界情况。 - 正交性:只做一件事情,减少副作用。 - SPOT原则:更具体了紧凑原则,对于数据模型尽量要简化,抽象/模型尽量和现实/需求一一对应。 - 分层原则:好的软件是分层设计的,编程习惯上一般可以分为自顶向下和自底向上方式。 - **衡量是否一个好程序,不在无可增加,而在无可删减** - 一个模块不应该暴露过多的API/全局变量,也需要控制函数的体积 - 透明原则:添加过度的抽象层/过度保护底层细节会让实现变得不透明,保持简单是实行透明性的一个方法 ### 复杂度来源 全书说得最多的是简洁性,那对于软件复杂性的来源可以归结为3类: - 接口(可以理解为用户界面)的复杂性 - 实现(代码的实现细节)的复杂性 - 代码行数带来的复杂性 ### 复杂度种类 作者主要认为复杂度主要分为3种,即**偶然复杂度,选择复杂度,本质复杂度**:  偶然复杂度也是最好控制的,属于代码层面的复杂度,利用重构能一定程度减轻偶然复杂度。选择性复杂度主要产生于需要做的功能,比如对于财务报表功能和对于飞机自动化控制程序的功能显然是完全不是一个等级的。本质性复杂度主要根据开发工具,选取的核心数据结构,和真正的功能本身逻辑复杂度都可能影响到。 那这些复杂度怎么破?作者也只是给出了一些指导性建议: - 对于偶然性复杂度,通过设计更加简单的程序(其实之前讲的一些原则都是可以用来消除本类复杂度的)能够带来一些帮助。 - 对于选择性复杂度,通过选择要做哪些功能来权衡功能和复杂度之间的利弊。 - 对于本质性复杂度,只有对要解决的程序问题进行深入思考,设计合适的数据结构才能降低此类复杂度。 ### 总结 编程属于实践性活动,作者的大量经验教训也是在爬了很多坑才总结出来的,即使到现在也在提交代码。所以以上的编程原则,以及对复杂度的讨论,都需要笔者和读者去实践中应用和领悟才行。 ### 参考 [作者简介](https://zh.wikipedia.org/wiki/%E5%9F%83%E9%87%8C%E5%85%8B%C2%B7%E9%9B%B7%E8%92%99 "作者简介") [知乎上的评价](https://zhuanlan.zhihu.com/p/25040637 "知乎上的评价") [作者维护的软件](http://www.catb.org/~esr/software.html "作者维护的软件")
Golang Context 作者: nbboy 时间: 2021-05-10 分类: 软件架构,设计模式,C,Golang 评论 # Context > 使用上下文的一个很好的心理模型是它应该在您的程序中流通,想象一条河或流水。 **分析版本:1.15.2** ### Context context接口只有4个方法,先看下接口定义 ```go type Context interface { //超时时间 Deadline() (deadline time.Time, ok bool) //需要监听的通道 Done() <-chan struct{} //如果没有关闭,则返回nil,否则返回一个错误值 Err() error //指定key的value Value(key interface{}) interface{} } ``` 目前版本中,实现的struct为emptyCtx,cancelCtx,timerCtx,valueCtx,每个ctx对应的应用场景都不一样,先看下最简单的emptyCtx ### emptyCtx ```go var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background } func TODO() Context { return todo } ``` TODO和Background用的都是emptyCtx,Background主要被用来作为其他Ctx的根,而TODO主要可以视为一种nil的Ctx去用,因为在官方的设计中,不允许使用nil作为Ctx的值。emptyCtx的实现非常简单,不做具体介绍,都是空的方法体。 先看下比较常用的cancelCtx的使用方法 ### cancelCtx ```go gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done()://取消后从这里返回 fmt.Println("ctx.done") return case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) for n := range gen(ctx) { fmt.Println(n) if n == 5 { //达到目标,取消ctx cancel() break } } time.Sleep(3 * time.Second) ``` cancelCtx主要用来控制goroutine的生命周期,即什么时候结束生命周期,当然这个需要goroutine本身去配合,select Done返回的通道。再看下,cancelCtx的内部结构 ```go type cancelCtx struct { Context mu sync.Mutex // protects following fields 用来保护成员 done chan struct{} // created lazily, closed by first cancel call 就是Done()返回的chan,调用cancel()后就被关闭 children map[canceler]struct{} // set to nil by the first cancel call 子ctx,所有ctx会组成一颗树形结构,而此处指向其孩子节点 err error // set to non-nil by the first cancel call 调用cancel()后,被设置成取消原因 } ``` 这里看下cancelCtx的构建函数 ```go func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } //创建cancelCtx,关联传递进来的ctx c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) } } func propagateCancel(parent Context, child canceler) { //首先检查父ctx是否关闭 done := parent.Done() if done == nil { return // parent is never canceled } select { case <-done: // parent is already canceled // 如果父ctx被取消,则也同步取消子ctx child.cancel(false, parent.Err()) return default: } //如果找到了cancelCtx if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil {//父节点已经被取消 // parent has already been canceled child.cancel(false, p.err) } else {//如果没被取消,则把子节点挂到父节点 if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} } p.mu.Unlock() } else { //如果是自定义的ctx,就会开启一个goroutine去监听父的取消事件,并且取消子ctx atomic.AddInt32(&goroutines, +1) go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } } func parentCancelCtx(parent Context) (*cancelCtx, bool) { done := parent.Done() //如果关闭,则返回false if done == closedchan || done == nil { return nil, false } //看下是否cancelCtx p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) //没找到则返回false if !ok { return nil, false } p.mu.Lock() ok = p.done == done p.mu.Unlock() //如果不一样也返回flse if !ok { return nil, false } //通过深层查找,找到了cancelCxt,则才返回true return p, true } ``` - 父Ctx如果不需要被取消,则直接返回,Background,TODO就是不需要被取消的类型 - 如果父ctx被取消,则也同步取消子ctx - parentCancelCtx会深层次得去找父cancelCtx,这里分两种情况 1)如果是标准(cancelCtx,timerCtx)则会同步父子Ctx的状态(要么都同步取消,要么建立关系) 2)如果是自定义Ctx,就会开启一个goroutine去监听父的取消事件,并且取消子ctx 这里这么做的原因,就是需要把子节点的状态和父节点要同步,调用withCancel()返回的cancel函数其实是调用cancelCtx.cancel()函数 ```go // cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() if c.err != nil { c.mu.Unlock() return // already canceled } c.err = err if c.done == nil { c.done = closedchan } else { //关闭chan close(c.done) } //子ctx依次进行cancel for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() //从根节点里移除c if removeFromParent { removeChild(c.Context, c) } } ``` 注释中说的很明白,会把打开的chan关闭,然后依次调用子ctx的cancel,所以如果我们忘记调用cancel,其实会有大量的chan没被close掉,然后造成资源的浪费! 此处试下级联取消,代码如下 ```go func main() { ctx1, cancel1 := context.WithCancel(context.Background()) defer cancel1() ctx2, cancel2 := context.WithCancel(ctx1) defer cancel2() ctx3, cancel3 := context.WithCancel(ctx2) defer cancel3() go func() { select { case <-time.After(3 * time.Second): cancel1() } }() <-ctx3.Done() } ``` 创建了三个ctx,然后第一个ctx取消后,其下的所有ctx都会取消。需要注意,代码中其实ctx1被cancel了两次,通过了解实现的代码,知道这么写其实并没有什么问题。画一个图,直观了解下3个ctx组成的结构。取消是沿着继承链,从除了根部外(Background不能被取消)一直到所有节点执行取消操作! ┌───────────────────┐ │ ┌───────────────┐ │ │ │ Background │ │ │ └───────────────┘ │ └─────────┬─────────┘ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ │ │ │ Ctx1 │ │ │ │ └───────────────┘ │ │ └─────────┬─────────┘ │ │ │ │ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ Cancel │ │ Ctx2 │ │ │ │ └───────────────┘ │ │ └─────────┬─────────┘ │ │ │ │ │ │ │ │ │ ┌─────────▼─────────┐ │ │ ┌───────────────┐ │ │ │ │ Ctx3 │ │ │ │ └───────────────┘ │ ▼ └───────────────────┘ ### timerCtx 其实Context最牛的功能我觉得还是timerCtx,先来看一下这个功能一个简单的例子。 ```go func main() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() result := make(chan int, 3) for i := 0; i < 10; i++ { go func(i int) { for { select { case <-ctx.Done(): fmt.Println("return") return case result <- i: } } }(i) } for { select { case r := <-result: fmt.Println(r) case <-ctx.Done(): return } } } ``` 其实这个ctx就是定义了一个具有超时功能的上下文,一般可以应用在可能会长时间执行的任务上,如果该任务长时间执行,我们可以设置一个ctx,超时时间到来,goroutine就从该任务返回,不会造成任务失控的情况。继续看下timerCtx的结构 ```go // A timerCtx carries a timer and a deadline. It embeds a cancelCtx to // implement Done and Err. It implements cancel by stopping its timer then // delegating to cancelCtx.cancel. // 通过计时器去实现任务的取消 type timerCtx struct { //内部也是继承了cancelCtx,所以也具有取消的能力 cancelCtx timer *time.Timer // Under cancelCtx.mu. //超时时间点 deadline time.Time } ``` WithTimeout其实内部会调用WithDeadline,我们分析下该方法 ```go func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } //父节点早于子节点指定时间,直接返回父节点,因为后面设置其实没意义 if cur, ok := parent.Deadline(); ok && cur.Before(d) { // The current deadline is already sooner than the new one. return WithCancel(parent) } c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } propagateCancel(parent, c) dur := time.Until(d) //时间已经到了,就直接cancel if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() //用定时器去处理延迟cancel if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } } ``` 这里有3点要注意: 1. 如果子Ctx超过了父Ctx则,直接使用父Ctx 2. 如果时间已经到期,则直接Cancel 3. 否则就注册一个定时器在未来一个时间执行 这里比较关心的是,他内部其实维护了一个定时器,就是那么简单而已!!!在分析一下对应的cancel方法 ```go func (c *timerCtx) cancel(removeFromParent bool, err error) { c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } c.mu.Lock() //关闭定时器 if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() } ``` 比cancelCtx多了停止定时器的操作。 ### valueCtx WithValue很容易建立valueCtx,valueCtx结构如下 ```go // A valueCtx carries a key-value pair. It implements Value for that key and // delegates all other calls to the embedded Context. type valueCtx struct { Context key, val interface{} } func WithValue(parent Context, key, val interface{}) Context { if parent == nil { panic("cannot create context from nil parent") } if key == nil { panic("nil key") } if !reflectlite.TypeOf(key).Comparable() { panic("key is not comparable") } return &valueCtx{parent, key, val} } ``` WithValue只能通过添加的方式把父Ctx和新Ctx建立连接,很显然整个Ctx看起来应该就是一棵树一样。比如如下的代码 ```go type TrackId string func main() { ctx1 := context.WithValue(context.Background(), TrackId("2021"), "123456") ctx2 := context.WithValue(ctx1, TrackId("2020"), "111111") ctx3 := context.WithValue(ctx1, TrackId("2020"), "222222") ctx4 := context.WithValue(ctx2, TrackId("2019"), "333333") ctx5 := context.WithValue(ctx2, TrackId("2019"), "444444") ctx6 := context.WithValue(ctx3, TrackId("2018"), "555555") ctx7 := context.WithValue(ctx3, TrackId("2018"), "666666") var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() fmt.Println("ctx4 ", ctx4.Value(TrackId("2019"))) fmt.Println("ctx5 ", ctx5.Value(TrackId("2019"))) fmt.Println("ctx6 ", ctx6.Value(TrackId("2020"))) fmt.Println("ctx7 ", ctx7.Value(TrackId("2021"))) }() wg.Wait() } ``` 其实就是会建立这样一棵树结构 ``` ┌────────────────────────┐ │ ┌********************┐ │ │ * Background * │ │ └********************┘ │ └────────────┬───────────┘ │ │ ┌────────────▼───────────┐ │ │ │ ctx1(val:123456) ◀──────Step3 │ │ └────────────┬───────────┘ │ ┌──────────────────────────┴──────────────────────────┐ │ │ │ │ ┌────────────▼───────────┐ ┌────────────▼───────────┐ │ │ │ │ │ ctx2(val:111111) │ │ ctx3(val:222222) ◀──────Step2 │ │ │ │ └────────────┬───────────┘ └────────────┬───────────┘ │ │ ┌─────────────┴────────────┐ ┌─────────────┴────────────┐ │ │ │ │ │ │ │ │ ┌────────────▼───────────┐ ┌────────────▼───────────┐ ┌────────────▼───────────┐ ┌────────────▼───────────┐ │ │ │ │ │ │ │ │ │ ctx4(val:333333) │ │ ctx5(val:444444) │ │ ctx6(val:555555) │ │ ctx7(val:666666) ◀──────Step1 │ │ │ │ │ │ │ │ └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ └────────────────────────┘ ``` 当然如果调用挂载的节点越多,这棵树就越大,而遍历这棵树找value信息就越慢,事实上上找value信息就是通过往上递归遍历的方法来查找的。 ```go func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) } ``` 比如ctx7.Value(TrackId("2021"))就需要通过Step1,2,3才能找到最终的value:123456。 ### 总结 timerCtx,cancelCtx可以认为是管理goroutine生命周期的一类Ctx,另外valueCtx是传递参数作用的Ctx,使用的场景其实有区别,比较会误用的是valueCtx。这里摘取了一些使用中容易挖的坑,其实要使用好它,还真不容易!
beanstalkd源码解析(2) 作者: nbboy 时间: 2020-10-23 分类: 软件架构,软件工程,设计模式,C 评论 ###开篇 在开篇先介绍下beanstalkd的一些基础结构,然后分析下第二个问题。 ####基础结构 ```c //最小堆结构(优先队列) struct Heap { int cap; int len; void **data; Less less; Record rec; }; //ADT int heapinsert(Heap *h, void *x);//向堆中插入一个新元素 void* heapremove(Heap *h, int k);//删除堆中的一个元素 ``` ```c //集合结构 struct ms { size_t used, cap, last; void **items; ms_event_fn oninsert, onremove; }; //ADT void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);//初始化 void ms_clear(ms a);//清空 int ms_append(ms a, void *item);//追加 int ms_remove(ms a, void *item);//移除 int ms_contains(ms a, void *item);//是否包含 void *ms_take(ms a);//获取一个 ``` #### 业务结构 ```c //任务对象,客户端可以调度的一个单元 struct job { //持久化字段,这些字段都被写入到WAL日志中 Jobrec r; // persistent fields; these get written to the wal /* bookeeping fields; these are in-memory only */ char pad[6]; //任务属于的管道对象 tube tube; //在链表中的上个和下个任务 job prev, next; /* linked list of jobs */ //在哈希表中的下个任务 job ht_next; /* Next job in a hash table list */ //任务在当前堆中的位置 size_t heap_index; /* where is this job in its current heap */ File *file; job fnext; job fprev; void *reserver; int walresv; int walused; char body[]; // written separately to the wal }; #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0) job allocate_job(int body_size);//分配一个job job make_job_with_id(uint pri, int64 delay, int64 ttr, int body_size, tube tube, uint64 id);//创建job void job_free(job j);//释放job /* Lookup a job by job ID */ job job_find(uint64 job_id);//查找 /* the void* parameters are really job pointers */ void job_setheappos(void*, int); int job_pri_less(void*, void*); int job_delay_less(void*, void*); job job_copy(job j);//拷贝job const char * job_state(job j);//获取job状态 int job_list_any_p(job head); job job_remove(job j);//移除job void job_insert(job head, job j);//插入job /* for unit tests */ size_t get_all_jobs_used(void); ``` ```c //管道对象,用来抽象同一种消息类型 struct tube { //被引用次数,用于监控管道被使用的情况 uint refs; //管道名称 char name[MAX_TUBE_NAME_LEN]; //预备执行队列 Heap ready; //延迟执行队列 Heap delay; //等待的连接集合 struct ms waiting; /* set of conns */ //统计结构,stats系列命令用到该结构 struct stats stat; //正被使用的次数 uint using_ct; //正被监听的次数 uint watching_ct; int64 pause; // int64 deadline_at; // struct job buried; }; tube make_tube(const char *name);//创建一个tube void tube_dref(tube t); void tube_iref(tube t); tube tube_find(const char *name);//查找 tube tube_find_or_make(const char *name); ``` ####服务有关结构 ```c /* 对服务器对象的封装,全局单例 */ struct Server { //端口 char *port; //地址 char *addr; //用户 char *user; //WAL日志对象 Wal wal; //服务器开启的套接字 Socket sock; //客户端连接池 Heap conns; }; void srvserve(Server *srv); void srvaccept(Server *s, int ev); ``` ```c //客户端链接对象 struct Conn { //服务器对象 Server *srv; //Client套接字 Socket sock; char state; char type; //下一个连接对象 Conn *next; //操作的管道 tube use; //运作时间 int64 tickat; // time at which to do more work //在链接池中的索引 int tickpos; // position in srv->conns //最近的任务 job soonest_job; // memoization of the soonest job int rw; // currently want: 'r', 'w', or 'h' //阻塞超时的时间 int pending_timeout; char halfclosed; //命令 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated int cmd_len; int cmd_read; //回复 char *reply; int reply_len; int reply_sent; char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated // How many bytes of in_job->body have been read so far. If in_job is NULL // while in_job_read is nonzero, we are in bit bucket mode and // in_job_read's meaning is inverted -- then it counts the bytes that // remain to be thrown away. int in_job_read; job in_job; // a job to be read from the client job out_job; int out_job_sent; //监听的集合 struct ms watch; //正在被消费的任务列表 struct job reserved_jobs; // linked list header }; int connless(Conn *a, Conn *b); void connrec(Conn *c, int i); void connwant(Conn *c, int rw); void connsched(Conn *c); void connclose(Conn *c); void connsetproducer(Conn *c); void connsetworker(Conn *c); job connsoonestjob(Conn *c); int conndeadlinesoon(Conn *c); int conn_ready(Conn *c); ``` 需要说明的是,文件有关的结构和WAL有关的结构,就不在这里贴出来了,因为我也没整明白。 ###分析 看第二个问题.回顾下刚才代码 ```c //投递任务到队列 static int enqueue_job(Server *s, job j, int64 delay, char update_store) { int r; j->reserver = NULL; if (delay) { //任务开始执行时间 j->r.deadline_at = nanoseconds() + delay; r = heapinsert(&j->tube->delay, j); if (!r) return 0; //设置为被等待执行状态 j->r.state = Delayed; } else {//立即执行的任务就投递到预备队列 ``` 如果是延迟任务,就放入到tube的延迟堆中,且状态是delay 那么什么时候状态变为ready,然后放入ready堆中以供客户端去消费? 客户端投递一个任务后,下一个硬件断点看下值的变化 ```c (gdb) watch all_jobs_init[4].r. body_size bury_ct created_at deadline_at delay id kick_ct pri release_ct reserve_ct state timeout_ct ttr (gdb) watch all_jobs_init[4].r.state Hardware watchpoint 6: all_jobs_init[4].r.state (gdb) info breakpoints Num Type Disp Enb Address What 6 hw watchpoint keep y all_jobs_init[4].r.state ``` 然后客户端开始消费任务,时间到期后,即命中断点 ```c (gdb) c Continuing. Hardware watchpoint 6: all_jobs_init[4].r.state Old value = 4 '\004' New value = 1 '\001' enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 479 ready_ct++; (gdb) bt #0 enqueue_job (s=0x611520 , j=0x62a970, delay=0, update_store=0 '\000') at prot.c:479 #1 0x0000000000409e90 in prottick (s=0x611520 ) at prot.c:1904 #2 0x000000000040b2ee in srvserve (s=0x611520 ) at serv.c:51 #3 0x000000000040d1d4 in main (argc=1, argv=0x7fffffffe488) at main.c:100 ``` 从堆栈中看出,是在prottick中进行调度。截取关键代码片段 ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } ``` 从上面代码可以看出把delay堆中的任务往ready堆中移动的过程,不过这段代码是怎样触发的? 往上翻一下就可以找到答案 ```c for (;;) { period = prottick(s); //sockXXX都是对平台的网络抽象 int rw = socknext(&sock, period); if (rw == -1) { twarnx("socknext"); exit(1); } //调用上面的接收链接回调函数 if (rw) { //accept开始接受网络请求 sock->f(sock->x, rw); } } int socknext(Socket **s, int64 timeout) { int r; struct epoll_event ev; r = epoll_wait(epfd, &ev, 1, (int)(timeout/1000000)); ... ``` 触发的条件有两个要么timeout到期,要么有新事件到来,不明白epoll事件机制的可以复习下epoll,那如何 计算这个超时时间呢? ```c while ((j = delay_q_peek())) { d = j->r.deadline_at - now; if (d > 0) { period = min(period, d); break; } j = delay_q_take(); //从delay里取出最近需要执行的job,放入ready队列里 r = enqueue_job(s, j, 0, 0); if (r < 1) bury_job(s, j, 0); /* out of memory, so bury it */ } for (i = 0; i < tubes.used; i++) { t = tubes.items[i]; d = t->deadline_at - now; if (t->pause && d <= 0) { t->pause = 0; process_queue(); } else if (d > 0) { period = min(period, d); } } while (s->conns.len) { Conn *c = s->conns.data[0]; d = c->tickat - now; if (d > 0) { period = min(period, d); break; } heapremove(&s->conns, 0); conn_timeout(c); } ``` 也就是根据job,tube,conn这几个对象计算一个最小时间戳 总结下第二个问题: 1.通过epoll_wait(其他平台也类似)来等待超时或者网络事件,而超时时候就做定时逻辑,包括移动delay堆中的job到ready堆中 这篇贴的代码有点多,下一篇再说第三个问题…