Go 语言通道在缓存设计中如何避免重复抑制

缓存在现在的大数据量的项目中,可谓无处不在,当用户请求一个数据的时候,不是直接从数据库拿取,而是首先从缓存中拿取,如果缓存不存在的情况下,才会从数据库拿取,并设置缓存,便于下个请求可以直接从缓存中拿取。如果两个请求同时请求一个还未缓存的数据,那么它们将分别前往数据库拿取数据,然后一个再覆盖另一个保存到缓存中,这种情况就是重复抑制(duplicate suppression)。接下来我们就要解决这个问题,同时确保支持高并发,高性能。

我在每个方法,函数,结构体上加了很详细的注释,方便用户阅读。但是光读懂别人的代码还是远远不够的,我们需要读懂代码背后的设计思想,读懂设计逻辑,去认真思考设计者是如何一步一步的设计出来的。下面的这个包很具有代表性,它的设计思想,值得我们来学习学习。

当我们自己在做一个缓存层的时候,我们首先会想到的就是一系列的方法和函数,一系列的实体类(结构)。没错的,那么我们的获取数据的方法就出来了,它差不多就是一个这样的方法:
func Get(key string) (interface{}, error) {}

我们还会想起,如果缓存不存在的情况下,我们得去数据源获取数据,那么我们会有一个函数,它对应我们后面的httpGetBody函数:
func FromSoure(key string) (interface{}, error) {}

接下来就是我们的缓存数据了,它是什么格式呢,在Go语言中,当然毫无疑问,就是结构体了:
type result struct {}

前面我们提到,要解决重复抑制问题,那么我们得有一个状态标示它,数据已经在缓存,或者不在,那么我们的result结构体必须得到一个包装,这个结构体必须包含一个result,同时加一个状态,这个状态用通道类型,因为通道可以实现线程同步:
type entry struct {result, ready}

基于Go语言强大的通道能力,我们将获取缓存的请求的处理细节用另一个goroutine来处理,使用通道来通信,这样获取数据的Get方法就变得简洁了,这个设计是不大容易想到,但是我坚信,所有的灵感都源自勤学苦练。于是乎这里就有一个专门的方法server来处理请求,同时还有一个请求结构体request,请求必然会包容一个缓存的key,与一个响应response,response必然是一个result类型的通道,这个与前面相呼应,因为请求与处理请求是在不同的goroutine,使用通道来传输数据是自然而然的事情:
func server(f Func) {}
type request struct {key, response}

到这里,我们设计的60%上下已经都出来了,接下来就是组织代码,开始撸码了~~~至于后面方法,call(从数据源获取数据,并关闭对应通道),deliver(发送数据到响应通道),这些是使得设计更加优雅,不是设计的核心部分,能想到自然多多益善~~~。

package memo

// Func 是一个函数类型
// 从数据源取数据的函数就必须符合这个函数类型的签名
type Func func(key string) (interface{}, error)

// 这个结构是缓存的数据,value存放用户需要的缓存数据
type result struct {
    value interface{}
    err error
}

// 缓存实体,包含一个缓存对象和一个结构通道
// 当数据还未加载到缓存时,在这个通道上接收数据将被阻塞
// 当数据已经加入到缓存后,这个通道将关闭
// 通道关闭后,任何在这个通道上接收数据将立刻得到一个零值,而不会阻塞线程
type entry struct {
    res result
    ready chan struct{}
}

// 请求结构体,这个设计非常优雅,不愧是大师的手笔
// key是缓存的键值,response是一个缓存对象的通道
type request struct {
    key string
    response chan<- result
}

// Memo 是缓存结构,我们在使用时就是用这个结构体来初始化
// requests通道巧妙的设计,使得可以使用独立线程来处理请求
// 后面的server方法就是专门用来处理请求
type Memo struct {
    requests chan request
}

// New 是初始化函数,当缓存不存在时,这个传入的函数用来向数据源取数据
func New(f Func) *Memo {
    memo := &Memo{requests: make(chan request)}
    go memo.server(f)
    return memo
}

// Get 是获取缓存的方法,如果数据还未加入缓存,将用Func对应的函数取数据,然后存入缓存
func (memo *Memo) Get(key string) (interface{}, error) {
    response := make(chan result)
    memo.requests <- request{key, response}

    // 接收通道,当数据还未载入缓存,将在这里阻塞,直到数据发送到这个通道
    res := <-response
    return res.value, res.err
}

// Close 是关闭请求
func (memo *Memo) Close() {
    close(memo.requests)
}

// 缓存数据请求的处理,这里有两个独立线程
// 一个是向数据源取数据,如果数据已经在缓存中,那么这个if内的代码将不会执行
// 一个是将缓存数据发送到request结构体的response通道,与Get方法的接受通道对应
func (memo *Memo) server(f Func) {
    cache := make(map[string]*entry)
    for req := range memo.requests {
        e := cache[req.key]
        if e == nil {
            e = &entry{ready: make(chan struct{})}
            cache[req.key] = e
            go e.call(f, req.key)
        }
        go e.deliver(req.response)
    }
}

// 从数据源取数据,取完后关闭通道,于是这个通道将只允许接收数据,不允许发送数据
func (e *entry) call(f Func, key string) {
    e.res.value, e.res.err = f(key)
    close(e.ready)
}

// 将缓存数据发送到request结构体的response通道
func (e *entry) deliver(response chan<- result) {
    <-e.ready
    response <- e.res
}

上面是全部的缓存核心代码,它是基本不变的,所以在实际的项目中可以作为一个独立的包。下面我们写一个demo来使用这个包:
package main

import (
    "fmt"
    "io/ioutil"
    "log"
    "net/http"

    memo "hrefs.cn/cache"
)

// 当缓存不存在的情况下,获取源数据的函数
// 这个是demo,通过url获取一个网页的html代码
// url就是缓存的key,html就是缓存的内容
func httpGetBody(url string) (interface{}, error) {
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return ioutil.ReadAll(resp.Body)
}

func main() {
    // 初始化memo
    m := memo.New(httpGetBody)
    url := "https://www.hrefs.cn"
    body, err := m.Get(url)
    if err != nil {
        log.Print(err)
    }

    fmt.Printf("%s, %d \n", url, len(body.([]byte)))
}

执行如下
d:\gowork\demo\cache>go run main.go
https://www.hrefs.cn, 80736
Posted by 何敏
on 2019/03/06 01:34:14
Copyright ©2018 程序员网址导航 粤ICP备14091834号