Channel in Go小结

Channel in Go小结

在其他编程语言中,如果想要在线程中通信,最常用的手段是共享内存。然而考虑到线程冲突问题,不得不考虑加锁,以保证并发安全,加锁也一定会带来额外的开销,对性能产生影响。

CSP模型 #

在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,也就是通信顺序进程(Communicating sequential processes,CSP)1。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Go 语言中的 Goroutine 会通过 Channel 传递数据。

使用示例 #

在使用之前,需要对channel有个整体的印象:

  • FIFO (First In First Out)
  • 分为有缓冲和无缓冲两种
  • 在使用过程中会阻塞(无缓冲时,只操作读或写;有缓冲已满时,只操作读或者写)
  • 接受者和发送者都是goroutine

参考下图:

func main() {
    // Q: 有缓冲和无缓冲在使用上有什么区别?
    // ch := make(chan int) // 无缓冲
    ch := make(chan int, 1) // 有缓冲,大小为1

    // 发送
    ch <- 1

    fmt.Println(<-ch)
}

注意事项 #

在使用channel时,需要注意一下事项:

操作\CH状态 ch为空 ch已关闭 ch正常
发送 ch <- 死锁 panic 成功或阻塞
接收 <-ch 死锁 成功或空值 成功或阻塞
关闭 close(ch) panic panic 成功

Q: 这里考虑下如何优雅的关闭channel (避免panic)?

// A: 代码上从发送端控制channel的关闭,同时为了避免重复关闭,使用sync.Once来协助。
// create
ch := make(chan int, 233)
// sending
ch <- 1
// close from sender
once.Do(close(ch))

Make 详解 #

channel通过make关键字创建,并分配缓冲区大小。

// runtime/chan.go#makechan
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
    // ...

    // elem.size * size 是否溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    // 大意是是说,当存储在buf的元素中不包含指针时,gc对此不感兴趣?(留坑)因此才会将buf和hchan分配在同一片内存空间。
	var c *hchan
	switch {
	case mem == 0:
		// 无缓冲(只分配hchan)
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 不含指针(hchan和缓冲区在一起)
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // buf的起始地址 = 基地址 + hchan结构体大小偏移量
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 有缓冲,含指针(hchan和缓冲区不在一起)
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
    // ...
	return c
}

Close 详解 #

close(ch) 可以关闭channel。已经关闭的channel不可以发送,也不能再被关闭,但是还能接收。如果缓冲数据被处理完那么会接受到空值。

// runtime/chan.go#closechan
func closechan(c *hchan) {
	if c == nil {
		// 如果channel为空,则会触发panic
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		// 关闭已经关闭的channel,也会触发panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	// ...
	// 设置已关闭标志位
	c.closed = 1

	var glist gList

	// 释放所有的recvq中的g
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		// 清空数据
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		// ...
		glist.push(gp)
	}

	// 释放所有的sendq中的g
	for {
		// ... 与接受者处理类似
	}
	unlock(&c.lock)

	// 让所有的读/写g就绪
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

Send 详解 #

向channel发送最常见的就是如下2种方式:

// case 1
ch <- val

// case 2 
select {
    case ch <- 2:
        // foo
    default:
        // bar
}

在编译时都会被变成成调用func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool的函数,但是两者在对block的赋值上并不相同,如下:

// case 1
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

// case 2
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

也就是说在case2中,channel发送并不会阻塞当前的goroutine,就算是无缓冲,或者缓冲已满的情况。

发送期间处理流程,如下图:

Recv 详解 #

从channel中接收数据,也有两种方式 (普通用法和非阻塞用法):

// case 1
val := <- ch

// case 2 
select {
    case val := <- ch:
        // foo
    default:
        // bar
}

// 此外接收,还有另外一个参数,ok表示已接收
val, ok := <- ch

在编译时都会被变成成调用func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)的函数,同样的两者在对block的赋值上并不相同,如下:

// case 1
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}
// 多返回值
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

// case 2
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}
// 多返回值
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	// TODO(khr): just return 2 values from this function, now that it is in Go.
	selected, *received = chanrecv(c, elem, false)
	return
}

相较于send场景,recv场景稍微复杂一些,但是大体流程上保持一致,如下图:

简单总结的说,就是recv为了实现在发送者阻塞的场景下的FIFO,做了特殊处理。

使用场景和模式 #

下面列举了一些比较常用的channel使用模式及相关场景。这里主要参考了advanced-go-programming-book

如有遗漏,敬请补充

Synchronous #

一般来说在需要同步goroutine的场景下,会使用sync.Mutex或者sync.WaitGroup来同步。这里用channel来实现一下goroutine同步。

func main() {
    done := make(chan int, 1)

    go func(){
        println("i'm running")
        done <- 1
    }()

    // 如果有多个goroutine, 可以对done接收计数,或者检测goroutine关闭来判断同步完成
    <-done
    println("main goroutine done")
}

Producer-Consumer #

func Producer(ch chan int) {
    for {
        ch <- 1
    }
}

func Consumer(ch chan int) {
    for {
        v <- ch
        println(v)
    }
}

func main() {
    ch := make(chan int, 2)
	go Producer(ch)
	go Producer(ch)
	go Consumer(ch)
	go Consumer(ch)

    select{}
}

Pub-Sub #

发布订阅与生产者消费者类似,唯一区别在于发布订阅模型中,发布者不关心有多少个订阅者即每个订阅者都应该收到消息,而在生产者消费者模型中,消费者是集群消费(集群消费是说每一条消息只有一个消费者可以消费)。使用channel也可以很简单的实现Pub/Sub这种模型:

type topic struct{
	chPub chan int
	chSub []chan int
	bufsize int
}

func newTopic(bufsize int) *topic {
	return &topic{
		bufsize: bufsize,
		chPub:   make(chan int, bufsize),
		chSub:   make([]chan int, 0, 5),
	}
}

// 这里不是并发安全的!!!
func (t *topic) Sub() chan int  {
	ch := make(chan int, t.bufsize)
	t.chSub = append(t.chSub, ch)
	return ch
}

func (t *topic) Pub(v int) {
	t.chPub <- v
	// 分发
	go func() {
		v2 := <- t.chPub
		for idx := range t.chSub {
			t.chSub[idx] <- v2
		}
	}()
}

// Q:如何实现topic的关闭方法
func main() {
	topic := newTopic(topic)
	// 开启5个订阅
	for i := 0; i < 5; i++ {
		go func() {
			ch := topic.Sub()
			for v := range ch {
				println("recv topic", v)
			}
		}()
	}
	// 发布消息
	for v := 10; v < 100; v++{
		topic.Pub(v)
		time.Sleep(100 *time.Millisecond)
	}
}

Concurrent Pool #

并发控制也是channel的重要应用场景,利用channel会阻塞的特性,得以控制最大goroutine数。在Web应用中可以用于实现限流。下面实现的方式,类似与令牌桶,另外推荐一个chanpool的实现以供参考。

type obj int

func factory() obj {
	return "newobj"
}

type pool struct {
	ch chan obj
}

func newpool() pool {
	p := pool{
		ch: make(chan obj, 10),
	}
	for i := 0; i < 10; i++ {
		p.put(obj(i))
	} 
}

// 需要增加并发安全!!!
func (p pool) put(obj obj)  {
	p.ch <- obj
}

// 需要增加并发安全!!!
func (p pool) get() obj {
	v := <- p.ch
	return v
}

func main() {
	pool := newpool()

	go func() {
		for {
			println("num of running g:", runtime.NumGoroutine())
			time.Sleep(100 * time.Millisecond)
		}
	}()

	for i := 0; i < 100; i++ {
		obj := pool.get()
		go func() {
			defer pool.put(obj)
			// do some work
			time.Sleep(100 *time.Millisecond)			
		}()
	}
}

// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12
// num of running g: 12

总结 #

掌握channel的原理,能帮助我们在实际应用中快速定位和分析并发带来的问题。也能在高并发场景下,合理利用channel。

水平有限,如有错误,欢迎勘误指正🙏。

参考 #

访问量 访客数