news 2026/6/25 15:17:04

go.dev博客阅读-pipelines

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
go.dev博客阅读-pipelines

这篇文章 2014年3月13日发表,作者 Sameer Ajmani

通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等

一些开源类库也沿用了其思想,例如MapReduces、并行处理等

这篇博客要以MapReduces或者生产消费模型的思想去阅读

博客开头的示例

一个比较基础的管道使用

将一组整数通过管道依次平方,最终输出结果

// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81

过程中的一些说明

  1. gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回
  2. 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
  3. sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次
  4. gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)

这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程

并行处理

官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现

依旧是求平方的案例

// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81

说明:

  1. c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
  2. merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
  3. ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞

带取消功能的 chan

并行处理的代码改进,在每个方法中都引入done

funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}
  1. 在每个方法中,都加入了done,内部使用select来监听是否关闭,并return 释放协程
  2. 如果chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏

但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放

如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力

// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()

额外注意的点

在多任务消费读取生产数据时

funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}

这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快

gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定

🧠🧠🧠🧠

对官方这篇博客,我的理解是

  1. 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
  2. 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
  3. 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
  4. 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
  5. 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close

原文出处 https://go.dev/blog/pipelines

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/24 20:44:06

Clion如何添加后退或前进按钮在工具栏上方

原来在刚配置好的clion中编写我的第一个程序的时候&#xff0c;跳转到程序的位置后&#xff0c;想要回到原来的位置发现没有对应的按钮 然后在网上找有什么办法能够进行回退&#xff0c;然后在上午搜索有什么办法回退后搜索到一个方法就是同时按下这三个键ctrlalt⬅&#xff0c…

作者头像 李华
网站建设 2026/6/25 11:20:02

Home Assistant Android客户端连接故障终极排查指南

Home Assistant Android客户端连接故障终极排查指南 【免费下载链接】android :iphone: Home Assistant Companion for Android 项目地址: https://gitcode.com/gh_mirrors/android5/android 在智能家居生态系统中&#xff0c;Home Assistant Android客户端作为核心控制…

作者头像 李华
网站建设 2026/6/22 20:31:48

智能家居新篇章:让南方电网电费管理变得如此简单

智能家居新篇章&#xff1a;让南方电网电费管理变得如此简单 【免费下载链接】china_southern_power_grid_stat 项目地址: https://gitcode.com/gh_mirrors/ch/china_southern_power_grid_stat 你是否曾经为忘记缴纳电费而遭遇突然停电的尴尬&#xff1f;或者想要了解家…

作者头像 李华
网站建设 2026/6/17 0:12:42

好 Prompt vs 坏 Prompt:同一个 UI,生成结果差多远

很多人对 Prompt 的差异没有直觉。这一篇&#xff0c;我们用同一个 UI 需求&#xff0c;对比「坏 Prompt」和「好 Prompt」&#xff0c;看看结果到底能差多远。 场景设定 目标&#xff1a;生成一个轻量级客户管理小程序的客户列表页。用户&#xff1a;非技术背景的销售人员。 《…

作者头像 李华
网站建设 2026/6/25 7:17:21

免费终极指南:用Arcade-plus快速打造专业级音乐谱面

免费终极指南&#xff1a;用Arcade-plus快速打造专业级音乐谱面 【免费下载链接】Arcade-plus A better utility used to edit and preview aff files 项目地址: https://gitcode.com/gh_mirrors/ar/Arcade-plus 还在为复杂的音乐谱面编辑而头疼吗&#xff1f;想要一款既…

作者头像 李华