网址分类缓存库
大约 3 分钟
Gin+gredis+gorm开发一个网址分类缓存库
1. 需求
- 第三方网址分类有限额,而网址的分类基本不发生变动,故需要做一个中间件对网址分类进行缓存,减少请求第三方次数
- 请求的域名需要进行清洗,去除协议头、路径和IP,仅保留域名部分
- 需要能够导入csv原网址库,并遍历该库进行分类
2. 整体架构
2.1 routers
路由部分,使用的gin框架,主要包括两大块:
- 域名相关: 提供单域名请求的接口,返回分类
- csv部分: 提供csv上传的接口,进行批量分类;提供csv下载的接口,导出网址库
其中,在csv的上传部分实现了同步和异步两个接口
因该部分往往由运维通过curl调用,故同步的接口中的日志重定向到了c.Writer中,方便观察进度,主要代码如下:
func (a Mock) BatchSyncRequest(fn string, c *gin.Context) {
logger := log.New(c.Writer, "", log.LstdFlags)
logger.Printf("上传文件成功")
// 其他逻辑
// 不断分批读取csv,并处理
logger.Printf("%d/%d", rows, totalRows)
}
2.2 service
服务部分,主要包括两大块:
- domain: 首先在redis中进行查找,如果没有则在mysql中进行查找,如果没有则调用第三方请求;最后无论是mysql还是第三方的结果都放入redis缓存。
- mock: 导入的csv的行数可能非常大,故按批次对其进行处理,主要代码如下:
func (a Mock) BatchSyncRequest(fn string, c *gin.Context) {
// 打开 CSV 文件
file, _ := os.Open(fn)
defer file.Close()
// 创建 CSV 读取器
reader := csv.NewReader(file)
// 设置批次大小
batchSize := 100000
// 读取并处理文件
var records []string
for {
// 读取一行数据
record, err := reader.Read()
if err != nil {
// 处理结尾不满足一个批次的数据
batchExecute(records)
break
}
// 将数据添加到批次中
records = append(records, record[0])
if len(records) == batchSize {
batchExecute(records)
records = nil
}
}
}
其中batchExecute中对url进行了清洗,并最后使用协程进行了调用,代码如下:
func batchFetch(urls []string) {
var wg sync.WaitGroup
wg.Add(len(urls))
for _, url := range urls {
s := Domain{}
go func(url string, wg *sync.WaitGroup) {
defer wg.Done()
s.GetCategory(url)
}(url, &wg)
}
// 等待全部执行完
wg.Wait()
}
2.3 pkg
最后一个值得记录的是第三方的登陆、刷新Token相关的封装,并暴露给调用处唯一接口,这一部分主要是维护了一个类似状态模式的机制 首先鉴权的状态6个,相关作用如下:
const (
Login AUTHSTATE = iota + 1 // 查询AToken
AToken // 查询RToken
RToken // 刷新A、RToken
Refresh // 无AToken、无RToken
Finish // 结束
ERROR // 异常
)
当调用方法时,会根据不同的结果反馈,将设置下一状态,并继续执行,直到Finish(成功拿到token)或ERROR(内部错误)为止
type Tokens RefreshReq
func GetToken() string {
var state = AToken
context := NewContext(state)
accessToken := gredis.Get(constants.RedisKeyAccessToken)
if accessToken != "" {
state = Finish
}
refreshToken := gredis.Get(constants.RedisKeyRefreshToken)
tokens := Tokens{AccessToken: accessToken, RefreshToken: refreshToken}
for {
if state == Finish {
return tokens.AccessToken
} else if state == ERROR {
panic("hit service error")
return ""
} else {
state, tokens = context.execute.execute(tokens)
context.SetExecute(state)
}
}
}
type Execute interface {
execute(tokens Tokens) (AUTHSTATE, Tokens)
}
type LoginExe struct{}
func (c *LoginExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
fmt.Println("LoginExe")
result, _ := login()
if result.Status {
tokens.AccessToken = result.AccessToken
tokens.RefreshToken = result.RefreshToken
gredis.Set(constants.RedisKeyAccessToken, tokens.AccessToken, constants.AccessTokenTimeout)
gredis.Set(constants.RedisKeyRefreshToken, tokens.RefreshToken, constants.RefreshTokenTimeout)
// 登录成功
return Finish, tokens
} else {
// 登录失败
return ERROR, tokens
}
}
type ATokenExe struct{}
func (c *ATokenExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
if tokens.AccessToken == "" {
// 没有缓存, 刷新Token
return RToken, tokens
} else {
// 获取缓存成功
return Finish, tokens
}
}
type RTokenExe struct{}
func (c *RTokenExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
if tokens.RefreshToken == "" {
// 没有缓存, 登录
return Login, tokens
} else {
// 获取缓存成功, 执行刷新
return Refresh, tokens
}
}
type RefreshExe struct{}
func (c *RefreshExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
result, _ := refresh(RefreshReq{
AccessToken: tokens.AccessToken,
RefreshToken: tokens.RefreshToken,
})
if result.Status {
gredis.Set(constants.RedisKeyAccessToken, tokens.AccessToken, constants.AccessTokenTimeout)
gredis.Set(constants.RedisKeyRefreshToken, tokens.RefreshToken, constants.RefreshTokenTimeout)
// 刷新成功
return Finish, tokens
} else {
// 刷新失败, 登录
return Login, tokens
}
}