疯狂编程网

  • 首页
  • 后端
    • GOLANG
    • PHP
  • 前端
  • 客户端
  • 服务器
  • AIGC
  • 开发工具
  • 代码人生
  • 关于本站
    • 联系我们
    • 免责声明
  1. 首页
  2. 后端
  3. GOLANG
  4. 正文

go channel原理及使用场景

2023年5月10日 472点热度 0人点赞 0条评论

转载自:go channel原理及使用场景

源码解析

type hchan struct {
	qcount   uint           // Channel 中的元素个数
	dataqsiz uint           // Channel 中的循环队列的长度
	buf      unsafe.Pointer // Channel 的缓冲区数据指针
	elemsize uint16 // 当前 Channel 能够收发的元素大小
	closed   uint32
	elemtype *_type // 当前 Channel 能够收发的元素类型
	sendx    uint   // Channel 的发送操作处理到的位置
	recvx    uint   // Channel 的接收操作处理到的位置
  recvq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)
	sendq    waitq  // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

https://imgs.icrazycode.com/website/20230510/021640/3537785/AAmdBF6jy4zffaBhjB0oNxT0ed1h5p9Y.png

创建channel

channel的初始化有2种,一种是没有缓冲区的channel,一种是有缓冲区的channel。对应的初始化之后hchan也是有区别的。

无缓冲区的channel,初始化的时候只为channel分配内存,缓冲区dataqsiz的长度为0

有缓冲的channel,初始化时会为channel和缓冲区分配内存,dataqsiz长度大于0

同时channel的元素大小和缓冲区的长度都是有大小限制的

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

  // 如果内存超了,或者分配的内存大于channel最大分配内存,或者分配的size小于0,直接Panic
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// 如果没有缓冲区,分配一段内存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 有缓冲时,如果元素不包含指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 有缓冲区,且元素包含指针类型,channel和buf数组各自分配内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

  // 元素大小,元素类型,循环数组长度,更新到channel
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

发送数据(ch ← i)

  • 发送数据前会加锁,防止多个线程并发修改数据。如果channel已经关闭,直接Panic

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      lock(&c.lock)
    
      if c.closed != 0 {
      	unlock(&c.lock)
      	panic(plainError("send on closed channel"))
      }
    
  • 当存在等待的接收者时,通过 runtime.send 直接将数据发送给阻塞的接收者

    当channel的recvq队列不为空,而且channel是没有数据数据写入的。这个时候如果有数据写入,会直接把数据拷贝到接收者变量所在的内存地址上。即使这是一个有缓冲的channel,当有等待的接收者时,也是直接给接收者,不会先保存到循环队列

    // 如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据
    if sg := c.recvq.dequeue(); sg != nil {
      	send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      	return true
      }
    
    // 
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      if sg.elem != nil {
        // 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
      	sendDirect(c.elemtype, sg, ep)
      	sg.elem = nil
      }
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      // 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
      // 需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine
      goready(gp, skip+1)
    }
    
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // 如果当前元素数小于循环队列的长度
      if c.qcount < c.dataqsiz {
        // 使用 runtime.chanbuf 计算出下一个可以存储数据的位置
      	qp := chanbuf(c, c.sendx)
        // 将发送的数据拷贝到缓冲区中
      	typedmemmove(c.elemtype, qp, ep)
        // 发送的位置索引+1
      	c.sendx++
        // 如果循环队列满了就从0开始
        // 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置
      	if c.sendx == c.dataqsiz {
      		c.sendx = 0
      	}
        // 增加当前元素数
      	c.qcount++
      	unlock(&c.lock)
      	return true
      }
      ...
    }
    
  • 当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据

    当因为不存在缓冲区或者缓冲区已满无法写入时,会构造sudog等待执行的gorutine结构,放到hchan的等待队列中,直到被唤醒,把数据放到缓冲区或者直接拷贝给接收者

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // 使用 select 关键字可以向 Channel 非阻塞地发送消息
      if !block {
      	unlock(&c.lock)
      	return false
      }
    
      // 获取发送数据使用的 Goroutine
      gp := getg()
      //  获取 runtime.sudog 结构
      mysg := acquireSudog()
      // 设置待发送数据的内存地址
      mysg.elem = ep
      // 设置发送数据的goroutine
      mysg.g = gp
      mysg.isSelect = false
      // 设置发送的channel
      mysg.c = c
      // 设置到goroutine的waiting上
      gp.waiting = mysg
      // 加入到发送等待队列
      c.sendq.enqueue(mysg)
      // 阻塞等待唤醒
      atomic.Store8(&gp.parkingOnChan, 1)
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
      KeepAlive(ep)
    
      // someone woke us up.
      if mysg != gp.waiting {
      	throw("G waiting list is corrupted")
      }
      gp.waiting = nil
      gp.activeStackChans = false
      closed := !mysg.success
      gp.param = nil
      if mysg.releasetime > 0 {
      	blockevent(mysg.releasetime-t0, 2)
      }
      mysg.c = nil
      releaseSudog(mysg)
      if closed {
      	if c.closed == 0 {
      		throw("chansend: spurious wakeup")
      	}
      	panic(plainError("send on closed channel"))
      }
      return true
    }
    

接收数据(<- ch)

  • 从一个空 Channel 接收数据

    goroutine会让出使用权,并阻塞等待

      if c == nil {
      	if !block {
      		return
      	}
        // 让出使用权
      	gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      	throw("unreachable")
      }
    
      // 不获取锁的情况下,检查失败的非阻塞操作
      if !block && empty(c) {
      	// 显示未关闭,继续返回false,因为channel不会重新打开
      	if atomic.Load(&c.closed) == 0 {
      		return
      	}
    
      	if empty(c) {
      		// The channel is irreversibly closed and empty.
      		if raceenabled {
      			raceacquire(c.raceaddr())
      		}
          // Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回
      		if ep != nil {
      			typedmemclr(c.elemtype, ep)
      		}
      		return true, false
      	}
      }
    
      var t0 int64
      if blockprofilerate > 0 {
      	t0 = cputicks()
      }
    
      lock(&c.lock)
    
      if c.closed != 0 && c.qcount == 0 {
      	if raceenabled {
      		raceacquire(c.raceaddr())
      	}
      	unlock(&c.lock)
        // Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回
      	if ep != nil {
      		typedmemclr(c.elemtype, ep)
      	}
      	return true, false
      }
    
  • 当存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据

    如果是无缓冲的channel,当有接收者进来时,会直接从阻塞的发送者拷贝数据

    如果是有缓冲的channel,当有接收者进来时,会先从缓冲区拿数据,接着等待的发送者会把数据拷贝到缓冲区

    注意这个时候并没有直接去唤醒发送者,而是放到下次p的执行队列中中,下次调度时会唤醒发送者,发送者会做一些释放资源的操作

    if sg := c.sendq.dequeue(); sg != nil {
      	recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      	return true, true
      }
    
    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      if c.dataqsiz == 0 {
      	if raceenabled {
      		racesync(c, sg)
      	}
      	if ep != nil {
      		// 如果无缓存,直接从发送者拷贝数据
      		recvDirect(c.elemtype, sg, ep)
      	}
      } else {
      	// 由于队列已满,接收数据的索引和发送数据的索引一致
        qp := chanbuf(c, c.recvx)
      	if raceenabled {
      		racenotify(c, c.recvx, nil)
      		racenotify(c, c.recvx, sg)
      	}
      	// 数据从队列拷贝到目标内存地址
      	if ep != nil {
      		typedmemmove(c.elemtype, ep, qp)
      	}
      	// 数据从发送者拷贝到缓冲区
      	typedmemmove(c.elemtype, qp, sg.elem)
      	c.recvx++
      	if c.recvx == c.dataqsiz {
      		c.recvx = 0
      	}
      	c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
      sg.elem = nil
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      sg.success = true
      if sg.releasetime != 0 {
      	sg.releasetime = cputicks()
      }
      // 无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
      goready(gp, skip+1)
    }
    
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据

    if c.qcount > 0 {
      	// 直接从队列取数据
      	qp := chanbuf(c, c.recvx)
      	if raceenabled {
      		racenotify(c, c.recvx, nil)
      	}
      	// 放到目标内存
      	if ep != nil {
      		typedmemmove(c.elemtype, ep, qp)
      	}
      	// 清空队列中对应的元素
      	typedmemclr(c.elemtype, qp)
      	// 接收索引+1
      	c.recvx++
      	if c.recvx == c.dataqsiz {
      		c.recvx = 0
      	}
      	// 队列元素-1
      	c.qcount--
      	unlock(&c.lock)
      	return true, true
      }
    
  • 当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据

    if !block {
      	unlock(&c.lock)
      	return false, false
      }
    
      // no sender available: block on this channel.
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
      	mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      gp.waiting = mysg
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.param = nil
      c.recvq.enqueue(mysg)
      // 阻塞等待,让出使用权
      atomic.Store8(&gp.parkingOnChan, 1)
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
      // 唤醒之后清空sudog
      if mysg != gp.waiting {
      	throw("G waiting list is corrupted")
      }
      gp.waiting = nil
      gp.activeStackChans = false
      if mysg.releasetime > 0 {
      	blockevent(mysg.releasetime-t0, 2)
      }
      success := mysg.success
      gp.param = nil
      mysg.c = nil
      releaseSudog(mysg)
      return true, success
    

关闭channel

  • 当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常

    func closechan(c *hchan) {
      if c == nil {
      	panic(plainError("close of nil channel"))
      }
    
      lock(&c.lock)
      if c.closed != 0 {
      	unlock(&c.lock)
      	panic(plainError("close of closed channel"))
      }
    
  • 将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素

    c.closed = 1
    
      var glist gList
    
      // release all readers
      for {
      	sg := c.recvq.dequeue()
      	if sg == nil {
      		break
      	}
      	if sg.elem != nil {
      		typedmemclr(c.elemtype, sg.elem)
      		sg.elem = nil
      	}
      	if sg.releasetime != 0 {
      		sg.releasetime = cputicks()
      	}
      	gp := sg.g
      	gp.param = unsafe.Pointer(sg)
      	sg.success = false
      	if raceenabled {
      		raceacquireg(gp, c.raceaddr())
      	}
      	glist.push(gp)
      }
    
      // release all writers (they will panic)
      for {
      	sg := c.sendq.dequeue()
      	if sg == nil {
      		break
      	}
      	sg.elem = nil
      	if sg.releasetime != 0 {
      		sg.releasetime = cputicks()
      	}
      	gp := sg.g
      	gp.param = unsafe.Pointer(sg)
      	sg.success = false
      	if raceenabled {
      		raceacquireg(gp, c.raceaddr())
      	}
      	glist.push(gp)
      }
      unlock(&c.lock)
    
      // 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。
      for !glist.empty() {
      	gp := glist.pop()
      	gp.schedlink = 0
      	goready(gp, 3)
      }
    

使用场景

报错情形

  • 往一个关闭的channel发送数据会报错:panic: send on closed channel
  • 关闭一个nil的chan会报错:panic: close of nil channel
  • 关闭一个已经关闭的channel报错:panic: close of closed channel

1、一个经典的算法题

有4个goroutine,编号为1、2、3、4。每秒钟会有一个goroutine打印出自己的编号,要求写一个程序,让输出的编号总是按照1、2、3、4、1、2、3、4...的顺序打印出来

package main

import (
	"fmt"
	"time"
)

func main() {
  // 4个channel
	chs := make([]chan int, 4)
	for i, _ := range chs {
		chs[i] = make(chan int)
    // 开4个协程
		go func(i int) {
			for {
        // 获取当前channel值并打印
				v := <-chs[i]
				fmt.Println(v + 1)
				time.Sleep(time.Second)
        // 把下一个值写入下一个channel,等待下一次消费
				chs[(i+1)%4] <- (v + 1) % 4
			}

		}(i)
	}

  // 往第一个塞入0
	chs[0] <- 0
	select {}
}

2、限流器

package main

import (
	"fmt"
	"time"
)

func main() {
	// 每次处理3个请求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		go func(i int) {
			fmt.Println("下游服务处理逻辑...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
		}(i)
	}
	time.Sleep(30 * time.Second)
}

如果觉得sleep太丑太暴力,可以用waitGroup控制结束时机

package main

import (
	"fmt"
	"sync"
	"time"
)

var wg sync.WaitGroup

func main() {
	// 每次处理3个请求
	chLimit := make(chan struct{}, 3)
	for i := 0; i < 20; i++ {
		chLimit <- struct{}{}
		wg.Add(1)
		go func(i int) {
			fmt.Println("下游服务处理逻辑...", i)
			time.Sleep(time.Second * 3)
			<-chLimit
			wg.Done()
		}(i)
	}
	wg.Wait()
}

3、优雅退出

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	var closing = make(chan struct{})
	var closed = make(chan struct{})

	go func() {
		for {
			select {
			case <-closing:
				return
			default:
				fmt.Println("业务逻辑...")
				time.Sleep(1 * time.Second)
			}
		}
	}()

	termChan := make(chan os.Signal)
  // 监听退出信号
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	<-termChan

  // 退出中
	close(closing)

	// 退出之前清理一下
	go doCleanup(closed)

	select {
	case <-closed:
	case <-time.After(time.Second):
		log.Println("清理超时不等了")
	}

	log.Println("优雅退出")
}

func doCleanup(closed chan struct{}) {
	time.Sleep(time.Minute)
  // 清理完后退出
	close(closed)
}

4、实现互斥锁

初始化一个缓冲区为1的channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,释放锁的时候再把这个元素放回channel

package main

import (
	"log"
	"time"
)

type Mutex struct {
	ch chan struct{}
}

// 初始化锁
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}

// 加锁,阻塞获取
func (m *Mutex) Lock()  {
	<- m.ch
}

// 释放锁
func (m *Mutex) Unlock()  {
	select {
    // 成功写入channel代表释放成功
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:

	}
	return false
}

func (m *Mutex) LockTimeout(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)

	select {
	case <-m.ch:
    // 成功获取锁关闭定时器
		timer.Stop()
		return true
	case <-timer.C:

	}
  // 获取锁超时
	return false
}

// 是否上锁
func (m *Mutex) IsLocked() bool {
	return len(m.ch) == 0
}


func main()  {
	m := NewMutex()
	ok := m.TryLock()
	log.Printf("locked v %v\n", ok)
	ok = m.TryLock()
	log.Printf("locked v %v\n", ok)

	go func() {
		time.Sleep(5*time.Second)
		m.Unlock()
	}()

	ok = m.LockTimeout(10*time.Second)
	log.Printf("LockTimeout v %v\n", ok)
}

参考:

极刻时间《go 并发编程实战》

标签: golang
最后更新:2023年5月10日

大明

靠写代码养家的开发者。

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

COPYRIGHT © 2023 疯狂编程网. ALL RIGHTS RESERVED.

京ICP备2022013580号-1