NIUCLOUD是一款SaaS管理后台框架多应用插件+云编译。上千名开发者、服务商正在积极拥抱开发者生态。欢迎开发者们免费入驻。一起助力发展! 广告
>[info] mr(并发) go-zero 框架提供的一个并发处理工具包,提供了多种并发处理模式。 * **安装:** ~~~ go get "github.com/zeromicro/go-zero/core/mr" ~~~ * **mr.ForEach** > * 使用 mr.ForEach(data, fn) 并发处理集合中的每个元素 > * 对集合中的每个元素应用指定函数 ~~~ fmt.Println("=== mr.ForEach(开始) ===") numbers := []int{1, 2, 3, 4, 5} // mr.ForEach 需要一个生成器函数和处理函数,这里我们使用无缓冲通道来演示 mr.ForEach(func(source chan<- int) { for _, num := range numbers { source <- num } }, func(num int) { logx.Infof("处理数字: %d", num) }) fmt.Println("=== mr.ForEach(结束) ===") ~~~ ``` === mr.ForEach(开始) === {"@timestamp":"2025-12-23T10:39:06.817+08:00","caller":"src/test.go:130","content":"处理数字: 2","level":"info"} {"@timestamp":"2025-12-23T10:39:06.817+08:00","caller":"src/test.go:130","content":"处理数字: 1","level":"info"} {"@timestamp":"2025-12-23T10:39:06.817+08:00","caller":"src/test.go:130","content":"处理数字: 3","level":"info"} {"@timestamp":"2025-12-23T10:39:06.817+08:00","caller":"src/test.go:130","content":"处理数字: 5","level":"info"} {"@timestamp":"2025-12-23T10:39:06.817+08:00","caller":"src/test.go:130","content":"处理数字: 4","level":"info"} === mr.ForEach(结束) === ``` * **mr.Finish** > * 使用 mr.Finish(funcs...) 并发执行多个有返回值的任务 > * 返回所有任务的结果和错误信息 ~~~ fmt.Println("=== mr.Finish(并发执行多个有返回值的任务)(开始) ===") err = mr.Finish( func() (err error) { logx.Info("任务1完成") return nil }, func() (err error) { logx.Info("任务2完成") return fmt.Errorf("任务2执行失败") }) if err != nil { logx.Info("mr.Finish 执行错误") } fmt.Println("=== mr.Finish(并发执行多个有返回值的任务)(结束) ===") ~~~ ``` === mr.Finish(并发执行多个有返回值的任务)(开始) === {"@timestamp":"2025-12-23T10:59:00.591+08:00","caller":"src/test.go:136","content":"任务1完成","level":"info"} {"@timestamp":"2025-12-23T10:59:00.591+08:00","caller":"src/test.go:140","content":"任务2完成","level":"info"} {"@timestamp":"2025-12-23T10:59:00.591+08:00","caller":"src/test.go:145","content":"mr.Finish 执行错误","level":"info"} === mr.Finish(并发执行多个有返回值的任务)(结束) === ``` * **mr.FinishVoid** > * 使用 mr.FinishVoid(funcs...) 并发执行多个无返回值的任务 > * 所有任务执行完毕后返回错误信息 ~~~ fmt.Println("=== mr.FinishVoid(进行无返回值的并发执行)(开始) ===") mr.FinishVoid( func() { logx.Info("无返回值任务1完成") }, func() { logx.Info("无返回值任务2完成") }, ) fmt.Println("=== mr.FinishVoid(进行无返回值的并发执行)(结束) ===") ~~~ ``` === mr.FinishVoid(进行无返回值的并发执行)(开始) === {"@timestamp":"2025-12-23T11:07:07.742+08:00","caller":"src/test.go:155","content":"无返回值任务2完成","level":"info"} {"@timestamp":"2025-12-23T11:07:07.742+08:00","caller":"src/test.go:152","content":"无返回值任务1完成","level":"info"} === mr.FinishVoid(进行无返回值的并发执行)(结束) === ``` * **mr.MapReduce** > * 使用 mr.MapReduce(mapFn, reduceFn) 进行 map-reduce 操作 > * generateFn:数据生产 > * mapFn: 对每个元素进行处理并写入管道 > * reduceFn: 从管道读取数据并进行聚合操作 > * 返回最终结果和错误 ~~~ fmt.Println("=== mr.MapReduce(分布式环境中并行处理数据)(开始) ===") // generateFn:数据生产 // mapperFn:数据加工 // reducerFn:数据聚合 type StockInfoScm struct { InventoryAll string // 完整库位号,例如:01-04-A-04-E2601001 UniqueCode string // 唯一码,仓库码+货架码 StockName string // 库位码,例如:E2601001 StockStr string // 校验码,例如:169 49 14 50 1 27 } skuIds := []uint64{1} stockInfo, err := mr.MapReduce(func(source chan<- uint64) { for _, skuId := range skuIds { source <- skuId } }, func(skuId uint64, writer mr.Writer[StockInfoScm], cancel func(error)) { writer.Write(StockInfoScm{ InventoryAll: "01-04-A-04-E2601001", UniqueCode: "库码+货架码", StockName: "E2601001", StockStr: "169 49 14 50 1 27", }) }, func(pipe <-chan StockInfoScm, writer mr.Writer[[]StockInfoScm], cancel func(error)) { stockInfoScm := make([]StockInfoScm, 0, len(skuIds)) for p := range pipe { stockInfoScm = append(stockInfoScm, p) } writer.Write(stockInfoScm) }) logx.Infof("%+v", stockInfo) fmt.Println("=== mr.MapReduce(分布式环境中并行处理数据)(结束) ===") ~~~ ``` === mr.MapReduce(分布式环境中并行处理数据)(开始) === {"@timestamp":"2025-12-30T17:43:12.641+08:00","caller":"src/test.go:202","content":"[{InventoryAll:01-04-A-04-E2601001 UniqueCode:库码+货架码 StockName:E2601001 StockStr:169 49 14 50 1 27}]","level":"info"} === mr.MapReduce(分布式环境中并行处理数据)(结束) === ```