本文基于 Go1.14

源码阅读

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
package runtime

// 此文件实现了 Go 的 channel

// 变种:
//  c.sendq 和 c.recvq 中至少一个为空,除非是 unbuffered channle 和单个 goroutine
//  阻塞在 select 语句中同时使用发送和接受的这种情况。这时 c.sendq 和 c.recvq 的长度由
//  select 语句的大小限制。
//
// 对于 buffered channel,同样:
//  c.qcount > 0 隐含 c.recvq 为空
//  c.qcount < c.dataqsiz 隐含 c.sendq 为空

import (
	"runtime/internal/atomic"
	"runtime/internal/math"
	"unsafe"
)

const (
	maxAlign  = 8                                                                           // 按 8 字节对齐
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) // 计算 hchan 占用内存大小,并对齐内存
)

// channel 结构体
type hchan struct {
	qcount   uint           // chan 里元素数量
	dataqsiz uint           // chan 底层环形队列的大小
	buf      unsafe.Pointer // chan 中指向底层环形队列的指针
	elemsize uint16         // chan 中元素的大小
	closed   uint32         // chan 是否关闭的标识,0 未关闭,1 关闭
	elemtype *_type         // chan 中元素类型
	sendx    uint           // 发送索引
	recvx    uint           // 接收索引
	recvq    waitq          // 接收队列
	sendq    waitq          // 发送队列
	lock     mutex          // 保护 hchan 中的字段
}

// waitq 是 sudog 的一个双向链表
type waitq struct {
	first *sudog // 队首指针
	last  *sudog // 队尾指针
}

// sudog 表示了一个等待队列中的 g,例如在一个 channel 中进行发送和接受
// sudog 是必要的,因为 g <-> 同步对象之间的关系是多对多。一个 g 可以在多个等待列表上,
// 因此可以有很多的 sudog 为一个 g 服务;并且很多 g 可能在等待同一个同步对象,
// 因此也会有很多 sudog 为一个同步对象服务。
// 所有的 sudog 分配在一个特殊的池中。使用 acquireSudog 和 releaseSudog 来分配并释放它们。
type sudog struct {
	// 下面的字段由这个 sudog 阻塞的通道的 hchan.lock 进行保护。
	// shrinkstack 依赖于它服务于 sudog 相关的 channel 操作。

	g           *g             // 当前 goroutine
	next        *sudog         // 后一个 sudog
	prev        *sudog         // 前一个 sudog
	elem        unsafe.Pointer // 数据元素
	acquiretime int64          // 获取时间
	releasetime int64          // 释放时间
	ticket      uint32         // 信号量中使用
	isSelect    bool           // isSelect 表示 g 正在参与一个 select,因此 g.selectDone 必须以 CAS 的方式来避免唤醒时候的 data race。
	parent      *sudog         // semaRoot 二叉树
	waitlink    *sudog         // g.waiting 列表或 semaRoot
	waittail    *sudog         // semaRoot
	c           *hchan         // 当前 channel
}

//go:linkname reflect_makechan reflect.makechan
// 对应 reflect 包里面的 makechan 方法,这里用到了 go:linkname 指令
func reflect_makechan(t *chantype, size int) *hchan {
	return makechan(t, size)
}

// 本质还是调用 makechan
func makechan64(t *chantype, size int64) *hchan {
	if int64(int(size)) != size {
		panic(plainError("makechan: size out of range"))
	}

	return makechan(t, int(size))
}

// 通用初始化 channel 方法
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// 编译器检查
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	// 计算存放数据元素的内存大小以及是否溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0: // 不带缓存,只需要申请 hchanSize 大小的内存即可
		c = (*hchan)(mallocgc(hchanSize, nil, true)) // 申请内存
		c.buf = c.raceaddr()                         // race 检查使用此位置进行同步
	case elem.ptrdata == 0: // 存放的元素不包含指针,只需要申请 hchanSize+mem 大小的内存即可
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 申请内存
		c.buf = add(unsafe.Pointer(c), hchanSize)        // 计算 buf 指向的位置
	default: // 存放的元素包含指针,这里需要两次申请内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)   // 元素大小赋值
	c.elemtype = elem                // 元素类型赋值
	c.dataqsiz = uint(size)          // 缓存大小赋值
	lockInit(&c.lock, lockRankHchan) // 初始化锁

	return c
}

// chanbuf 获取 i 索引位置对应的缓存数据,类似 c 里面的指针操作
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

// full reports whether a send on c would block (that is, the channel is full).
// It uses a single word-sized read of mutable state, so although
// the answer is instantaneously true, the correct answer may have changed
// by the time the calling function receives the return value.
func full(c *hchan) bool {
	// c.dataqsiz is immutable (never written after the channel is created)
	// so it is safe to read at any time during channel operation.
	if c.dataqsiz == 0 {
		// Assumes that a pointer read is relaxed-atomic.
		return c.recvq.first == nil
	}
	// Assumes that a uint read is relaxed-atomic.
	return c.qcount == c.dataqsiz
}

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

// channel 发送数据
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil { // 判断 channel 是否为 nil
		if !block { // 如果非阻塞,直接返回 false
			return false
		}

		// 当向 nil channel 发送数据时,会调用 gopark
		// 而 gopark 会将当前的 goroutine 休眠,并用过第一个参数的 unlockf 来回调唤醒
		// 但此处传递的参数为 nil,因此向 channel 发送数据的 goroutine 和接收数据的 goroutine 都会阻塞,进而死锁
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 忽略 race 检查...

	// 对于不阻塞的 send,快速检测失败场景
	// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
	// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
	// 2. channel 是缓冲型的,但循环数组已经装满了元素
	// 主要用于 select 语句中,涉及到指令重排队+可观测性,不展开讲...
	if !block && c.closed == 0 && full(c) {
		return false
	}

	// 加锁
	lock(&c.lock)

	// 再次检查 channel 是否已关闭,不允许向关闭的 channel 发送数据
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 从 recvq 队首取出一个接受者,如果接受者不是 nil,就绕过环形队列直接把 ep 拷贝给 sg,并释放锁
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 如果环形队列还未满
	if c.qcount < c.dataqsiz {
		// 拿到 sendx 索引的位置
		qp := chanbuf(c, c.sendx)

		// 直接把数据从 qp 拷贝到 qp
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	// 非阻塞,直接返回错误
	if !block {
		unlock(&c.lock)
		return false
	}

	gp := getg()           // 获取当前 goroutine
	mysg := acquireSudog() // 从对象池获取 sudog
	mysg.elem = ep         // 保存 send 数据
	mysg.g = gp            // 当前协程
	mysg.isSelect = false  // 是否在 select 中
	mysg.c = c             // 保存当前 channel
	gp.waiting = mysg      // 保存当前 sudog,下面要用到做校验
	gp.param = nil         // 这里为 nil,唤醒时保存传递的参数
	c.sendq.enqueue(mysg)  // 存入队列

	// 挂起当前的 g,将当前的 g 移出调度器的队列
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

	// 当 g 被重新调度时,从这里继续往下执行
	KeepAlive(ep)

	// 检验是否为当前的 sudog
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil

	// 正常唤醒时 param 保存的是传递的参数,为 nil 时则直接 panic
	if gp.param == nil {
		// 如果 channel 已被关闭,则为虚假唤醒
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	// 参数置空
	gp.param = nil

	// 取消与之前阻塞的 channel 的关联
	mysg.c = nil

	// 将 sudog 放回对象池
	releaseSudog(mysg)
	return true
}

// send 函数处理向一个空的 channel 发送操作
// ep 指向被发送的元素,会被直接拷贝到接收的 goroutine
// 之后,接收的 goroutine 会被唤醒
// c 必须是空的(因为等待队列里有 goroutine,肯定是空的)
// c 必须被上锁,发送操作执行完后,会使用 unlockf 函数解锁
// sg 必须已经从等待队列里取出来了
// ep 必须是非空,并且它指向堆或调用者的栈
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 忽略 race 检查
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep) // 直接拷贝到接受者内存,使用写屏障
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	// 唤醒接收者的 goroutine. skip 和打印栈相关,先忽略
	goready(gp, skip+1)
}

// 向一个非缓冲型的 channel 发送数据、从一个无元素的(非缓冲型或缓冲型但空)的 channel
// 接收数据,都会导致一个 goroutine 直接操作另一个 goroutine 的栈
// 由于 GC 假设对栈的写操作只能发生在 goroutine 正在运行中并且由当前 goroutine 来写
// 所以这里实际上违反了这个假设。可能会造成一些问题,所以需要用到写屏障来规避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
	// 直接进行内存"搬迁"
	// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
	// 就不能修改真正的 dst 位置的值了
	// 因此需要在读和写之前加上一个屏障
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	// 拷贝内存
	memmove(dst, src, t.size)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	// The channel is locked, so src will not move during this
	// operation.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

// 关闭 channel
func closechan(c *hchan) {
	// 判断 channel 是否为 nil
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	// 加锁保护
	lock(&c.lock)

	// 判断 channel 是否已关闭,重复关闭报 panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	// 关闭 channel
	c.closed = 1

	// gList 是通过 g.schedlink 链接 G 的列表,一个 G 只能是一次在一个 gQueue 或 gList 上
	// gList 模拟的是栈操作(FILO)
	// gQueue 模拟的是队列操作(FIFO)
	var glist gList

	// 释放所有接收者
	for {
		sg := c.recvq.dequeue()
		// sg == nil,表示接收队列已为空,跳出循环
		if sg == nil {
			break
		}

		// 如果 elem 不为空说明未忽略接收值,赋值为该类型的零值
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = nil
		glist.push(gp) // 放入 glist 队列
	}

	// 释放所有的发送者(它们将会 panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		gp := sg.g
		gp.param = nil
		glist.push(gp)
	}

	// 释放锁
	unlock(&c.lock)

	// 循环读取 glist 里面的数据
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// 唤醒 goroutine 等待调度
		goready(gp, 3)
	}
}

// empty reports whether a read from c would block (that is, the channel is
// empty).  It uses a single atomic read of mutable state.
func empty(c *hchan) bool {
	// c.dataqsiz is immutable.
	if c.dataqsiz == 0 {
		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
	}
	return atomic.Loaduint(&c.qcount) == 0
}

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

// 对应 v,ok := <- c
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 忽略 debug ...

	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 这块主要用在 select 语句中,先大概了解下,比较难懂。。。
	// 快速路径: 在不需要锁的情况下检查失败的非阻塞操作
	// 注意到 channel 不能由已关闭转换为未关闭,则失败的条件是:
	// 1. channel 是非缓冲型的,recvq 队列为空
	// 2. channel 是缓冲型的,buf 为空
	if !block && empty(c) {
		// 此处的 c.closed 必须在条件判断之后进行验证,
		if atomic.Load(&c.closed) == 0 {
			// 因为指令重排后,如果先判断 c.closed,得出 channel 未关闭,无法判断失败条件中
			// channel 是已关闭还是未关闭(从而需要 atomic 操作)
			return
		}

		// 再次检查 channel 是否为空
		if empty(c) {
			// 接受者不为 nil 时返回该类型的零值
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			// (true,fasle)里面的 true:表示被 select case 选中,fasle 表示是否正常收到数据
			return true, false
		}
	}

	// 加锁
	lock(&c.lock)

	// 处理 channel 关闭 && 无缓存元素的情况
	if c.closed != 0 && c.qcount == 0 {
		// race 检查...
		unlock(&c.lock)
		if ep != nil { // 从已关闭的 channel 继续接收值
			typedmemclr(c.elemtype, ep) // 清理该值的内存数据,返回该类型的零值
		}
		return true, false
	}

	// 发送队列不为空
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 如果环形队列不为空,直接从队列接收数据
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx) // 获取索引位置
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) //内存拷贝
		}
		typedmemclr(c.elemtype, qp) // 内存数据清理
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	// 如果是非阻塞,直接返回 fasle, false
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 读取不到数据,就阻塞当前 goroutine
	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	mysg.g = gp
	gp.waiting = mysg
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg) // 写入接收队列
	// 挂起当前 goroutine,使其不再被调度
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// 唤醒后,继续往下执行

	// 检查是否是当前 fgoroutine
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	// 清理数据
	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil

	// 将 sudog 放回对象池
	releaseSudog(mysg)
	return true, !closed
}

// 等待发送队列里有 goroutine 存在,说明 buf 是满的
// 这有可能是:
// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,直接进行内存拷贝
// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 非缓冲型的 channel
	if c.dataqsiz == 0 {
		if ep != nil {
			// 直接进行内存拷贝
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// 缓冲型的 channel
		qp := chanbuf(c, c.recvx)

		// 从队列 recvx 位置拷贝数据到接收者
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 从发送者把数据写入 recvx,然后修改 recvx 位置
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	// 唤醒发送者 goroutine
	goready(gp, skip+1)
}

func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
	// There are unlocked sudogs that point into gp's stack. Stack
	// copying must lock the channels of those sudogs.
	gp.activeStackChans = true
	unlock((*mutex)(chanLock))
	return true
}

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbrecv(&v, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
	selected, _ = chanrecv(c, elem, false)
	return
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if c != nil && selectnbrecv2(&v, &ok, c) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
	// TODO(khr): just return 2 values from this function, now that it is in Go.
	selected, *received = chanrecv(c, elem, false)
	return
}

//go:linkname reflect_chansend reflect.chansend
func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
	return chansend(c, elem, !nb, getcallerpc())
}

//go:linkname reflect_chanrecv reflect.chanrecv
// 对应 reflect 包里面的 chanrecv
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
	return chanrecv(c, elem, !nb)
}

//go:linkname reflect_chanlen reflect.chanlen
func reflect_chanlen(c *hchan) int {
	if c == nil {
		return 0
	}
	return int(c.qcount)
}

//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
func reflectlite_chanlen(c *hchan) int {
	if c == nil {
		return 0
	}
	return int(c.qcount)
}

//go:linkname reflect_chancap reflect.chancap
func reflect_chancap(c *hchan) int {
	if c == nil {
		return 0
	}
	return int(c.dataqsiz)
}

//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
	closechan(c)
}

// 放回对象池
func (q *waitq) enqueue(sgp *sudog) {
	sgp.next = nil
	x := q.last
	if x == nil {
		sgp.prev = nil
		q.first = sgp
		q.last = sgp
		return
	}
	sgp.prev = x
	x.next = sgp
	q.last = sgp
}

// 从对象池取出 sudog
func (q *waitq) dequeue() *sudog {
	for {
		sgp := q.first
		if sgp == nil {
			return nil
		}
		y := sgp.next
		if y == nil {
			q.first = nil
			q.last = nil
		} else {
			y.prev = nil
			q.first = y
			sgp.next = nil // mark as removed (see dequeueSudog)
		}

		// 这里涉及到 select,可先忽略这块。。。
		//
		// 如果一个 goroutine 由于一个 select 被放到此队列,则在 goroutine 唤醒前
		// 如果由于 select 而将 goroutine 放在此队列中,则在由不同的情况唤醒的 goroutine 之间
		// 存在一个小窗口并且它获取 channel lock。 一旦锁定,它会将自己从队列中删除,
		// 因此我们不会在那之后看到它。
		// 我们在 G 结构中使用一个标志来告诉我们其他人何时获得了这个竞争条件以告知这个 goroutine,
		// 但是 goroutine 还没有将自己从队列中移除。
		// 正在参与一个 select,如果将 g 标记为已完成失败,则说明此 sgp 已经不在队列中,继续循环并重新出队
		if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
			continue
		}

		return sgp
	}
}

参考资料

深度解密Go语言之channel

曹大 golang notes

Gopher 2019 Go并发编程的分享