跳至主要內容

网址分类缓存库

pptg大约 3 分钟

Gin+gredis+gorm开发一个网址分类缓存库

1. 需求

  • 第三方网址分类有限额,而网址的分类基本不发生变动,故需要做一个中间件对网址分类进行缓存,减少请求第三方次数
  • 请求的域名需要进行清洗,去除协议头、路径和IP,仅保留域名部分
  • 需要能够导入csv原网址库,并遍历该库进行分类

2. 整体架构

网址分类缓存库架构
网址分类缓存库架构

2.1 routers

路由部分,使用的gin框架,主要包括两大块:

  • 域名相关: 提供单域名请求的接口,返回分类
  • csv部分: 提供csv上传的接口,进行批量分类;提供csv下载的接口,导出网址库

其中,在csv的上传部分实现了同步和异步两个接口

因该部分往往由运维通过curl调用,故同步的接口中的日志重定向到了c.Writer中,方便观察进度,主要代码如下:

func (a Mock) BatchSyncRequest(fn string, c *gin.Context) {
	logger := log.New(c.Writer, "", log.LstdFlags)
	logger.Printf("上传文件成功")
	// 其他逻辑
	// 不断分批读取csv,并处理
	logger.Printf("%d/%d", rows, totalRows)
}

2.2 service

服务部分,主要包括两大块:

  • domain: 首先在redis中进行查找,如果没有则在mysql中进行查找,如果没有则调用第三方请求;最后无论是mysql还是第三方的结果都放入redis缓存。
  • mock: 导入的csv的行数可能非常大,故按批次对其进行处理,主要代码如下:
func (a Mock) BatchSyncRequest(fn string, c *gin.Context) {
	// 打开 CSV 文件
	file, _ := os.Open(fn)
	defer file.Close()
	
	// 创建 CSV 读取器
	reader := csv.NewReader(file)
	
	// 设置批次大小
	batchSize := 100000

	// 读取并处理文件
	var records []string
	for {
		// 读取一行数据
		record, err := reader.Read()
		if err != nil {
			// 处理结尾不满足一个批次的数据
			batchExecute(records)
			break
		}

		// 将数据添加到批次中
		records = append(records, record[0])
		if len(records) == batchSize {
			batchExecute(records)
			records = nil
		}
	}
}

其中batchExecute中对url进行了清洗,并最后使用协程进行了调用,代码如下:

func batchFetch(urls []string) {
	var wg sync.WaitGroup
	wg.Add(len(urls))
	
	for _, url := range urls {
		s := Domain{}
		go func(url string, wg *sync.WaitGroup) {
			defer wg.Done()
			s.GetCategory(url)
		}(url, &wg)
	}
	
	// 等待全部执行完
	wg.Wait()
}

2.3 pkg

最后一个值得记录的是第三方的登陆、刷新Token相关的封装,并暴露给调用处唯一接口,这一部分主要是维护了一个类似状态模式的机制 首先鉴权的状态6个,相关作用如下:

const (
	Login   AUTHSTATE = iota + 1 // 查询AToken
	AToken                       // 查询RToken
	RToken                       // 刷新A、RToken
	Refresh                      // 无AToken、无RToken
	Finish                       // 结束
	ERROR                        // 异常
)

当调用方法时,会根据不同的结果反馈,将设置下一状态,并继续执行,直到Finish(成功拿到token)或ERROR(内部错误)为止

type Tokens RefreshReq

func GetToken() string {
	var state = AToken
	context := NewContext(state)
	accessToken := gredis.Get(constants.RedisKeyAccessToken)
	if accessToken != "" {
		state = Finish
	}
	refreshToken := gredis.Get(constants.RedisKeyRefreshToken)
	tokens := Tokens{AccessToken: accessToken, RefreshToken: refreshToken}
	for {
		if state == Finish {
			return tokens.AccessToken
		} else if state == ERROR {
			panic("hit service error")
			return ""
		} else {
			state, tokens = context.execute.execute(tokens)
			context.SetExecute(state)
		}
	}
}

type Execute interface {
	execute(tokens Tokens) (AUTHSTATE, Tokens)
}

type LoginExe struct{}

func (c *LoginExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
	fmt.Println("LoginExe")
	result, _ := login()
	if result.Status {
		tokens.AccessToken = result.AccessToken
		tokens.RefreshToken = result.RefreshToken
		gredis.Set(constants.RedisKeyAccessToken, tokens.AccessToken, constants.AccessTokenTimeout)
		gredis.Set(constants.RedisKeyRefreshToken, tokens.RefreshToken, constants.RefreshTokenTimeout)
		// 登录成功
		return Finish, tokens
	} else {
		// 登录失败
		return ERROR, tokens
	}
}

type ATokenExe struct{}

func (c *ATokenExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
	if tokens.AccessToken == "" {
		// 没有缓存, 刷新Token
		return RToken, tokens
	} else {
		// 获取缓存成功
		return Finish, tokens
	}
}

type RTokenExe struct{}

func (c *RTokenExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
	if tokens.RefreshToken == "" {
		// 没有缓存, 登录
		return Login, tokens
	} else {
		// 获取缓存成功, 执行刷新
		return Refresh, tokens
	}
}

type RefreshExe struct{}

func (c *RefreshExe) execute(tokens Tokens) (AUTHSTATE, Tokens) {
	result, _ := refresh(RefreshReq{
		AccessToken:  tokens.AccessToken,
		RefreshToken: tokens.RefreshToken,
	})
	if result.Status {
		gredis.Set(constants.RedisKeyAccessToken, tokens.AccessToken, constants.AccessTokenTimeout)
		gredis.Set(constants.RedisKeyRefreshToken, tokens.RefreshToken, constants.RefreshTokenTimeout)
		// 刷新成功
		return Finish, tokens
	} else {
		// 刷新失败, 登录
		return Login, tokens
	}
}