在其他编程语言中,如果想要在线程中通信,最常用的手段是共享内存。然而考虑到线程冲突问题,不得不考虑加锁,以保证并发安全,加锁也一定会带来额外的开销,对性能产生影响。
CSP模型 #
在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,也就是通信顺序进程(Communicating sequential processes,CSP)1。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。
使用示例 #
在使用之前,需要对channel有个整体的印象:
- FIFO (First In First Out)
- 分为有缓冲和无缓冲两种
- 在使用过程中会阻塞(无缓冲时,只操作读或写;有缓冲已满时,只操作读或者写)
- 接受者和发送者都是goroutine
参考下图:
func main() {
// Q: 有缓冲和无缓冲在使用上有什么区别?
// ch := make(chan int) // 无缓冲
ch := make(chan int, 1) // 有缓冲,大小为1
// 发送
ch <- 1
fmt.Println(<-ch)
}
注意事项 #
在使用channel时,需要注意一下事项:
操作\CH状态 | ch为空 | ch已关闭 | ch正常 |
---|---|---|---|
发送 ch <- | 死锁 | panic | 成功或阻塞 |
接收 <-ch | 死锁 | 成功或空值 | 成功或阻塞 |
关闭 close(ch) | panic | panic | 成功 |
Q: 这里考虑下如何优雅的关闭channel (避免panic)?
// A: 代码上从发送端控制channel的关闭,同时为了避免重复关闭,使用sync.Once来协助。
// create
ch := make(chan int, 233)
// sending
ch <- 1
// close from sender
once.Do(close(ch))
Make 详解 #
channel通过make关键字创建,并分配缓冲区大小。
// runtime/chan.go#makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ...
// elem.size * size 是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
// 大意是是说,当存储在buf的元素中不包含指针时,gc对此不感兴趣?(留坑)因此才会将buf和hchan分配在同一片内存空间。
var c *hchan
switch {
case mem == 0:
// 无缓冲(只分配hchan)
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 不含指针(hchan和缓冲区在一起)
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// buf的起始地址 = 基地址 + hchan结构体大小偏移量
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 有缓冲,含指针(hchan和缓冲区不在一起)
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
// ...
return c
}
Close 详解 #
close(ch)
可以关闭channel。已经关闭的channel不可以发送,也不能再被关闭,但是还能接收。如果缓冲数据被处理完那么会接受到空值。
// runtime/chan.go#closechan
func closechan(c *hchan) {
if c == nil {
// 如果channel为空,则会触发panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
// 关闭已经关闭的channel,也会触发panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// ...
// 设置已关闭标志位
c.closed = 1
var glist gList
// 释放所有的recvq中的g
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 清空数据
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
// ...
glist.push(gp)
}
// 释放所有的sendq中的g
for {
// ... 与接受者处理类似
}
unlock(&c.lock)
// 让所有的读/写g就绪
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
Send 详解 #
向channel发送最常见的就是如下2种方式:
// case 1
ch <- val
// case 2
select {
case ch <- 2:
// foo
default:
// bar
}
在编译时都会被变成成调用func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
的函数,但是两者在对block的赋值上并不相同,如下:
// case 1
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// case 2
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
也就是说在case2中,channel发送并不会阻塞当前的goroutine,就算是无缓冲,或者缓冲已满的情况。
发送期间处理流程,如下图:
Recv 详解 #
从channel中接收数据,也有两种方式 (普通用法和非阻塞用法):
// case 1
val := <- ch
// case 2
select {
case val := <- ch:
// foo
default:
// bar
}
// 此外接收,还有另外一个参数,ok表示已接收
val, ok := <- ch
在编译时都会被变成成调用func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
的函数,同样的两者在对block的赋值上并不相同,如下:
// case 1
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// 多返回值
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// case 2
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
// 多返回值
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
// TODO(khr): just return 2 values from this function, now that it is in Go.
selected, *received = chanrecv(c, elem, false)
return
}
相较于send场景,recv场景稍微复杂一些,但是大体流程上保持一致,如下图:
简单总结的说,就是recv为了实现在发送者阻塞
的场景下的FIFO,做了特殊处理。
使用场景和模式 #
下面列举了一些比较常用的channel使用模式及相关场景。这里主要参考了advanced-go-programming-book。
如有遗漏,敬请补充
Synchronous #
一般来说在需要同步goroutine的场景下,会使用sync.Mutex或者sync.WaitGroup来同步。这里用channel来实现一下goroutine同步。
func main() {
done := make(chan int, 1)
go func(){
println("i'm running")
done <- 1
}()
// 如果有多个goroutine, 可以对done接收计数,或者检测goroutine关闭来判断同步完成
<-done
println("main goroutine done")
}
Producer-Consumer #
func Producer(ch chan int) {
for {
ch <- 1
}
}
func Consumer(ch chan int) {
for {
v <- ch
println(v)
}
}
func main() {
ch := make(chan int, 2)
go Producer(ch)
go Producer(ch)
go Consumer(ch)
go Consumer(ch)
select{}
}
Pub-Sub #
发布订阅与生产者消费者类似,唯一区别在于发布订阅模型中,发布者不关心有多少个订阅者即每个订阅者都应该收到消息,而在生产者消费者模型中,消费者是集群消费(集群消费是说每一条消息只有一个消费者可以消费)。使用channel也可以很简单的实现Pub/Sub这种模型:
type topic struct{
chPub chan int
chSub []chan int
bufsize int
}
func newTopic(bufsize int) *topic {
return &topic{
bufsize: bufsize,
chPub: make(chan int, bufsize),
chSub: make([]chan int, 0, 5),
}
}
// 这里不是并发安全的!!!
func (t *topic) Sub() chan int {
ch := make(chan int, t.bufsize)
t.chSub = append(t.chSub, ch)
return ch
}
func (t *topic) Pub(v int) {
t.chPub <- v
// 分发
go func() {
v2 := <- t.chPub
for idx := range t.chSub {
t.chSub[idx] <- v2
}
}()
}
// Q:如何实现topic的关闭方法
func main() {
topic := newTopic(topic)
// 开启5个订阅
for i := 0; i < 5; i++ {
go func() {
ch := topic.Sub()
for v := range ch {
println("recv topic", v)
}
}()
}
// 发布消息
for v := 10; v < 100; v++{
topic.Pub(v)
time.Sleep(100 *time.Millisecond)
}
}
Concurrent Pool #
并发控制也是channel的重要应用场景,利用channel会阻塞的特性,得以控制最大goroutine数。在Web应用中可以用于实现限流。下面实现的方式,类似与令牌桶,另外推荐一个chanpool的实现以供参考。
type obj int
func factory() obj {
return "newobj"
}
type pool struct {
ch chan obj
}
func newpool() pool {
p := pool{
ch: make(chan obj, 10),
}
for i := 0; i < 10; i++ {
p.put(obj(i))
}
}
// 需要增加并发安全!!!
func (p pool) put(obj obj) {
p.ch <- obj
}
// 需要增加并发安全!!!
func (p pool) get() obj {
v := <- p.ch
return v
}
func main() {
pool := newpool()
go func() {
for {
println("num of running g:", runtime.NumGoroutine())
time.Sleep(100 * time.Millisecond)
}
}()
for i := 0; i < 100; i++ {
obj := pool.get()
go func() {
defer pool.put(obj)
// do some work
time.Sleep(100 *time.Millisecond)
}()
}
}
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
总结 #
掌握channel的原理,能帮助我们在实际应用中快速定位和分析并发带来的问题。也能在高并发场景下,合理利用channel。
水平有限,如有错误,欢迎勘误指正🙏。
参考 #
- https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/
- https://github.com/golang/go/tree/release-branch.go1.14
- https://github.com/eapache/channels
- https://colobu.com/2018/03/26/channel-patterns/
- https://juejin.im/post/5decff136fb9a016544bce67
- https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-06-goroutine.html