go的channel_go channel原理

go的channel_go channel原理Gochannel系列:"channel入门""为select设置超时时间""nilchannel用法示例""双层c

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

Go channel系列

channel基础

channel用于goroutines之间的通信,让它们之间可以进行数据交换。像管道一样,一个goroutine_A向channel_A中放数据,另一个goroutine_B从channel_A取数据

channel是指针类型的数据类型,通过make来分配内存。例如:

ch := make(chan int)

这表示创建一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另一端只能从此channel中读出int类型的值。

需要注意,chan TYPE才表示channel的类型。所以其作为参数或返回值时,需指定为xxx chan int类似的格式。

向ch这个channel放数据的操作形式为:

ch <- VALUE

从ch这个channel读数据的操作形式为:

<-ch             // 从ch中读取一个值
val = <-ch
val := <-ch      // 从ch中读取一个值并保存到val变量中
val,ok = <-ch    // 从ch读取一个值,判断是否读取成功,如果成功则保存到val变量中

其实很简单,当ch出现在<-的左边表示send,当ch出现在<-的右边表示recv。

例如:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)
	go sender(ch)         // sender goroutine
	go recver(ch)         // recver goroutine
	time.Sleep(1e9)
}

func sender(ch chan string) {
	ch <- "malongshuai"
	ch <- "gaoxiaofang"
	ch <- "wugui"
	ch <- "tuner"
}

func recver(ch chan string) {
	var recv string
	for {
		recv = <-ch
		fmt.Println(recv)
	}
}

输出结果:

malongshuai
gaoxiaofang
wugui
tuner

上面激活了一个goroutine用于执行sender()函数,该函数每次向channel ch中发送一个字符串。同时还激活了另一个goroutine用于执行recver()函数,该函数每次从channel ch中读取一个字符串。

注意上面的recv = <-ch,当channel中没有数据可读时,recver goroutine将会阻塞在此行。由于recver中读取channel的操作放在了无限for循环中,表示recver goroutine将一直阻塞,直到从channel ch中读取到数据,读取到数据后进入下一轮循环由被阻塞在recv = <-ch上。直到main中的time.Sleep()指定的时间到了,main程序终止,所有的goroutine将全部被强制终止。

因为receiver要不断从channel中读取可能存在的数据,所以receiver一般都使用一个无限循环来读取channel,避免sender发送的数据被丢弃。

channel的属性和分类

channel的3种操作

每个channel都有3种操作:send、receive和close

  • send:表示sender端的goroutine向channel中投放数据
  • receive:表示receiver端的goroutine从channel中读取数据
  • close:表示关闭channel
    • 关闭channel后,send操作将导致painc
    • 关闭channel后,recv操作将返回对应类型的0值以及一个状态码false
    • close并非强制需要使用close(ch)来关闭channel,在某些时候可以自动被关闭
    • 如果使用close(),建议条件允许的情况下加上defer
    • 只在sender端上显式使用close()关闭channel。因为关闭通道意味着没有数据再需要发送

例如,判断channel是否被关闭:

val, ok := <-counter
if ok {
    fmt.Println(val)
}

因为关闭通道也会让recv成功读取(只不过读取到的值为类型的空值),使得原本阻塞在recv操作上的goroutine变得不阻塞,借此技巧可以实现goroutine的执行先后顺序。具体示例见后文:指定goroutine的执行顺序

channel的两种分类

channel分为两种:unbuffered channel和buffered channel

  • unbuffered channel:阻塞、同步模式
    • sender端向channel中send一个数据,然后阻塞,直到receiver端将此数据receive
    • receiver端一直阻塞,直到sender端向channel发送了一个数据
  • buffered channel:非阻塞、异步模式
    • sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞
    • receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据

可以认为阻塞和不阻塞是由channel控制的,无论是send还是recv操作,都是在向channel发送请求

  • 对于unbuffered channel,sender发送一个数据,channel暂时不会向sender的请求返回ok消息,而是等到receiver准备接收channel数据了,channel才会向sender和receiver双方发送ok消息。在sender和receiver接收到ok消息之前,两者一直处于阻塞。
  • 对于buffered channel,sender每发送一个数据,只要channel容量未满,channel都会向sender的请求直接返回一个ok消息,使得sender不会阻塞,直到channel容量已满,channel不会向sender返回ok,于是sender被阻塞。对于receiver也一样,只要channel非空,receiver每次请求channel时,channel都会向其返回ok消息,直到channel为空,channel不会返回ok消息,receiver被阻塞。

buffered channel的两个属性

buffered channel有两个属性:容量和长度:和slice的capacity和length的概念是一样的

  • capacity:表示bufffered channel最多可以缓冲多少个数据
  • length:表示buffered channel当前已缓冲多少个数据
  • 创建buffered channel的方式为make(chan TYPE,CAP)

unbuffered channel可以认为是容量为0的buffered channel,所以每发送一个数据就被阻塞。注意,不是容量为1的buffered channel,因为容量为1的channel,是在channel中已有一个数据,并发送第二个数据的时候才被阻塞。

换句话说,send被阻塞的时候,其实是没有发送成功的,只有被另一端读走一个数据之后才算是send成功。对于unbuffered channel来说,这是send/recv的同步模式。而buffered channel则是在每次发送数据到通道的时候,(通道)都向发送者返回一个消息,容量未满的时候返回成功的消息,发送者因此而不会阻塞,容量已满的时候因为已满而迟迟不返回消息,使得发送者被阻塞

实际上,当向一个channel进行send的时候,先关闭了channel,再读取channel时会发现错误在send,而不是recv。它会提示向已经关闭了的channel发送数据。

func main() {
	counter := make(chan int)
	go func() {
		counter <- 32
	}()
	close(counter)
	fmt.Println(<-counter)
}

输出报错:

panic: send on closed channel

所以,在Go的内部行为中,send和recv是一个整体行为,数据未读就表示未send成功

两种特殊的channel

有两种特殊的channel:nil channel和channal类型的channel。

当未为channel分配内存时,channel就是nil channel,例如var ch1 chan int。nil channel会永远阻塞对该channel的读、写操作。

nil channel在某些时候有些妙用,例如在select(关于select,见后文)的某个case分支A将其它某case分支B所操作的channel突然设置为nil,这将会禁用case分支B。

当channel的类型为一个channel时,就是channel的channel,也就是双层通道。例如:

var chch1 chan chan int

channel的channel是指通道里的数据是通道,可以认为通道里面嵌套了一个或多个通道:只能将整个通道发送到外层通道,读取外层通道时获取到的是内层通道,然后可以操作内层通道。

go的channel_go channel原理

// 发送通道给外层通道
chch1 <-ch1
chch1 <-ch2

// 从外层通道取出内层通道
c <-chch1

// 操作取出的内层通道
c <-123
val := <-c

channel of channel的妙用之一是将外层通道作为通道的加工厂:在某个goroutine中不断生成通道,在其它goroutine可以不断取出通道来操作。

死锁(deadlock)

当channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作时,也就是说两端都因为对方而使得自己当前处于阻塞状态,这时将会出现死锁问题。

更通俗地说,只要所有goroutine都被阻塞,就会出现死锁

比如,在main函数中,它有一个默认的goroutine,如果在此goroutine中创建一个unbuffered channel,并在main goroutine中向此channel中发送数据并直接receive数据,将会出现死锁:

package main 

import (
	"fmt"
)

func main (){
	goo(32)
}

func goo(s int) {
	counter := make(chan int)
	counter <- s
	fmt.Println(<-counter)
}

在上面的示例中,向unbuffered channel中send数据的操作counter <- s是在main goroutine中进行的,从此channel中recv的操作<-counter也是在main goroutine中进行的。send的时候会直接阻塞main goroutine,使得recv操作无法被执行,go将探测到此问题,并报错:

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:

要修复此问题,只需将send操作放在另一个goroutine中执行即可:

package main

import (
	"fmt"
)

func main() {
	goo(32)
}

func goo(s int) {
	counter := make(chan int)
	go func() {
		counter <- s
	}()
	fmt.Println(<-counter)
}

或者,将counter设置为一个容量为1的buffered channel:

counter := make(chan int,1)

这样放完一个数据后send不会阻塞(被recv之前放第二个数据才会阻塞),可以执行到recv操作。

unbuffered channel同步通信示例

下面通过sync.WaitGroup类型来等待程序的结束,分析多个goroutine之间通信时状态的转换。因为创建的channel是unbuffered类型的,所以send和recv都是阻塞的。

package main

import (
	"fmt"
	"sync"
)

// wg用于等待程序执行完成
var wg sync.WaitGroup

func main() {
	count := make(chan int)

	// 增加两个待等待的goroutines
	wg.Add(2)
	fmt.Println("Start Goroutines")

	// 激活一个goroutine,label:"Goroutine-1"
	go printCounts("Goroutine-1", count)
	// 激活另一个goroutine,label:"Goroutine-2"
	go printCounts("Goroutine-2", count)

	fmt.Println("Communication of channel begins")
	// 向channel中发送初始数据
	count <- 1

	// 等待goroutines都执行完成
	fmt.Println("Waiting To Finish")
	wg.Wait()
	fmt.Println("\nTerminating the Program")
}
func printCounts(label string, count chan int) {
	// goroutine执行完成时,wg的计数器减1
	defer wg.Done()
	for {
		// 从channel中接收数据
		// 如果无数据可recv,则goroutine阻塞在此
		val, ok := <-count
		if !ok {
			fmt.Println("Channel was closed:",label)
			return
		}
		fmt.Printf("Count: %d received from %s \n", val, label)
		if val == 10 {
			fmt.Printf("Channel Closed from %s \n", label)
			// Close the channel
			close(count)
			return
		}
		// 输出接收到的数据后,加1,并重新将其send到channel中
		val++
		count <- val
	}
}

上面的程序中,激活了两个goroutine,激活这两个goroutine后,向channel中发送一个初始数据值1,然后main goroutine将因为wg.Wait()等待2个goroutine都执行完成而被阻塞。

再看这两个goroutine,这两个goroutine执行完全一样的函数代码,它们都接收count这个channel的数据,但可能是goroutine1先接收到channel中的初始值1,也可能是goroutine2先接收到初始值1。接收到数据后输出值,并在输出后对数据加1,然后将加1后的数据再次send到channel,每次send都会将自己这个goroutine阻塞(因为unbuffered channel),此时另一个goroutine因为等待recv而执行。当加1后发送给channel的数据为10之后,某goroutine将关闭count channel,该goroutine将退出,wg的计数器减1,另一个goroutine因等待recv而阻塞的状态将因为channel的关闭而失败,ok状态码将让该goroutine退出,于是wg的计数器减为0,main goroutine因为wg.Wait()而继续执行后面的代码。

使用for range迭代channel

前面都是在for无限循环中读取channel中的数据,但也可以使用range来迭代channel,它会返回每次迭代过程中所读取的数据,直到channel被关闭。必须注意,只要channel未关闭,range迭代channel就会一直被阻塞。

例如,将上面示例中的printCounts()改为for-range的循环形式。

func printCounts(label string, count chan int) {
	defer wg.Done()
	for val := range count {
		fmt.Printf("Count: %d received from %s \n", val, label)
		if val == 10 {
			fmt.Printf("Channel Closed from %s \n", label)
			close(count)
			return
		}
		val++
		count <- val
	}
}

多个”管道”:输出作为输入

channel是goroutine与goroutine之间通信的基础,一边产生数据放进channel,另一边从channel读取放进来的数据。可以借此实现多个goroutine之间的数据交换,例如goroutine_1->goroutine_2->goroutine_3,就像bash的管道一样,上一个命令的输出可以不断传递给下一个命令的输入,只不过golang借助channel可以在多个goroutine(如函数的执行)之间传,而bash是在命令之间传。

以下是一个示例,第一个函数getRandNum()用于生成随机整数,并将生成的整数放进第一个channel ch1中,第二个函数addRandNum()用于接收ch1中的数据(来自第一个函数),将其输出,然后对接收的值加1后放进第二个channel ch2中,第三个函数printRes接收ch2中的数据并将其输出。

如果将函数认为是Linux的命令,则类似于下面的命令行:ch1相当于第一个管道,ch2相当于第二个管道

getRandNum | addRandNum | printRes

以下是代码部分:

package main

import (
	"fmt"
	"math/rand"
	"sync"
)

var wg sync.WaitGroup

func main() {
	wg.Add(3)
    // 创建两个channel
	ch1 := make(chan int)
	ch2 := make(chan int)

    // 3个goroutine并行
	go getRandNum(ch1)
	go addRandNum(ch1, ch2)
	go printRes(ch2)

	wg.Wait()
}

func getRandNum(out chan int) {
	// defer the wg.Done()
	defer wg.Done()

	var random int
    // 总共生成10个随机数
	for i := 0; i < 10; i++ {
        // 生成[0,30)之间的随机整数并放进channel out
		random = rand.Intn(30)
		out <- random
	}
	close(out)
}

func addRandNum(in,out chan int) {
	defer wg.Done()
	for v := range in {
        // 输出从第一个channel中读取到的数据
        // 并将值+1后放进第二个channel中
		fmt.Println("before +1:",v)
		out <- (v + 1)
	}
	close(out)
}

func printRes(in chan int){
	defer wg.Done()
	for v := range in {
		fmt.Println("after +1:",v)
	}
}

指定channel的方向

上面通过两个channel将3个goroutine连接起来,其中起连接作用的是第二个函数addRandNum()。在这个函数中使用了两个channel作为参数:一个channel用于接收、一个channel用于发送。

其实channel类的参数变量可以指定数据流向:

  • in <-chan int:表示channel in通道只用于接收数据
  • out chan<- int:表示channel out通道只用于发送数据

go的channel_go channel原理

只用于接收数据的通道<-chan不可被关闭,因为关闭通道是针对发送数据而言的,表示无数据再需发送。对于recv来说,关闭通道是没有意义的。

所以,上面示例中三个函数可改写为:

func getRandNum(out chan<- int) {
    ...
}

func addRandNum(in <-chan int, out chan<- int) {
    ...
}

func printRes(in <-chan int){
    ...
}

buffered channel异步队列请求示例

下面是使用buffered channel实现异步处理请求的示例。

在此示例中:

  • 有(最多)3个worker,每个worker是一个goroutine,它们有worker ID。
  • 每个worker都从一个buffered channel中取出待执行的任务,每个任务是一个struct结构,包含了任务id(JobID),当前任务的队列号(ID)以及任务的状态(worker是否执行完成该任务)。
  • 在main goroutine中将每个任务struct发送到buffered channel中,这个buffered channel的容量为10,也就是最多只允许10个任务进行排队。
  • worker每次取出任务后,输出任务号,然后执行任务(run),最后输出任务id已完成。
  • 每个worker执行任务的方式很简单:随机睡眠0-1秒钟,并将任务标记为完成。

以下是代码部分:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Task struct {
	ID         int
	JobID      int
	Status     string
	CreateTime time.Time
}

func (t *Task) run() {
	sleep := rand.Intn(1000)
	time.Sleep(time.Duration(sleep) * time.Millisecond)
	t.Status = "Completed"
}

var wg sync.WaitGroup

// worker的数量,即使用多少goroutine执行任务
const workerNum = 3

func main() {
	wg.Add(workerNum)

	// 创建容量为10的buffered channel
	taskQueue := make(chan *Task, 10)

	// 激活goroutine,执行任务
	for workID := 0; workID <= workerNum; workID++ {
		go worker(taskQueue, workID)
	}
	// 将待执行任务放进buffered channel,共15个任务
	for i := 1; i <= 15; i++ {
		taskQueue <- &Task{
			ID:         i,
			JobID:      100 + i,
			CreateTime: time.Now(),
		}
	}
	close(taskQueue)
	wg.Wait()
}

// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
	defer wg.Done()
	for v := range in {
		fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
		v.run()
		fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
	}
}

select多路监听

很多时候想要同时操作多个channel,比如从ch1、ch2读数据。Go提供了一个select语句块,它像switch一样工作,里面放一些case语句块,用来轮询每个case语句块的send或recv情况。

select

用法格式示例:

select {
	// ch1有数据时,读取到v1变量中
	case v1 := <-ch1:
		...
	// ch2有数据时,读取到v2变量中
	case v2 := <-ch2:
		...
	// 所有case都不满足条件时,执行default
	default:
		...
}

defalut语句是可选的,不允许fall through行为,但允许case语句块为空块。select会被return、break关键字中断:return是退出整个函数,break是退出当前select。

select的行为模式主要是对channel是否可读进行轮询,但也可以用来向channel发送数据。它的行为如下:

  • 如果所有的case语句块评估时都被阻塞,则阻塞直到某个语句块可以被处理
  • 如果多个case同时满足条件,则随机选择一个进行处理,对于这一次的选择,其它的case都不会被阻塞,而是处理完被选中的case后进入下一轮select(如果select在循环中)或者结束select(如果select不在循环中或循环次数结束)
  • 如果存在default且其它case都不满足条件,则执行default。所以default必须要可执行而不能阻塞

如果有所疑惑,后文的”select超时时间”有更有助于理解select的说明和示例。

所有的case块都是按源代码书写顺序进行评估的。当select未在循环中时,它将只对所有case评估一次,这次结束后就结束select。某次评估过程中如果有满足条件的case,则所有其它case都直接结束评估,并退出此次select

其实如果注意到select语句是在某一个goroutine中评估的,就不难理解只有所有case都不满足条件时,select所在goroutine才会被阻塞,只要有一个case满足条件,本次select就不会出现阻塞的情况。

需要注意的是,如果在select中执行send操作,则可能会永远被send阻塞。所以,在使用send的时候,应该也使用defalut语句块,保证send不会被阻塞。如果没有default,或者能确保select不阻塞的语句块,则迟早会被send阻塞。在后文有一个select中send永久阻塞的分析:双层channel的一个示例

一般来说,select会放在一个无限循环语句中,一直轮询channel的可读事件。

下面是一个示例,pump1()和pump2()都用于产生数据(一个产生偶数,一个产生奇数),并将数据分别放进ch1和ch2两个通道,suck()则从ch1和ch2中读取数据。然后在无限循环中使用select轮询这两个通道是否可读,最后main goroutine在1秒后强制中断所有goroutine。

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	go pump1(ch1)
	go pump2(ch2)
	go suck(ch1, ch2)
	time.Sleep(1e9)
}
func pump1(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 0 {
			ch <- i
		}
	}
}
func pump2(ch chan int) {
	for i := 0; i <= 30; i++ {
		if i%2 == 1 {
			ch <- i
		}
	}
}
func suck(ch1 chan int, ch2 chan int) {
	for {
		select {
		case v := <-ch1:
			fmt.Printf("Recv on ch1: %d\n", v)
		case v := <-ch2:
			fmt.Printf("Recv on ch2: %d\n", v)
		}
	}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/167632.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 深度学习:文本CNN-textcnn

    深度学习:文本CNN-textcnn对于文本分类问题,常见的方法无非就是抽取文本的特征,比如使用doc2evc或者LDA模型将文本转换成一个固定维度的特征向量,然后在基于抽取的特征训练一个分类器。然而研究证明,TextCnn在文本分类问题上有着更加卓越的表现。从直观上理解,TextCNN通过一维卷积来获取句子中n-gram的特征表示。TextCNN对文本浅层特征的抽取能力很强,在短文本领域如搜索、对话领域专注…

  • 计算机病毒的算法,计算机病毒从算法划分为几个类型

    计算机病毒的算法,计算机病毒从算法划分为几个类型随着计算机应用的日益普及,计算机病毒也开始入侵并呈不断蔓延趋势,对计算机的正常运行造成了威胁。那么,计算机病毒从算法划分为几个类型那?就让佰佰安全网的小编和你一起去了解一下吧!计算机病毒从算法划分为以下几个类型:1、伴随型病毒,这一类病毒并不改变文件本身,它们根据算法产生EXE文件的伴随体,具有同样的名字和不同的扩展名(COM),例如:XCOPY.EXE的伴随体是XCOPY.COM。病毒把自身写入…

  • 使用nodejs进行微信公众号网页开发(一)验证服务器「建议收藏」

    使用nodejs进行微信公众号网页开发(一)验证服务器「建议收藏」微信公众号网页开发(一)验证服务器前言一、pandas是什么?二、使用步骤1.引入库2.读入数据总结前言微信公众号网页开发第一步是验证服务器这一步是必不可少的。我是用的是liunx系统搭配宝塔面板,基于node.js+nginx进行开发的。一、pandas是什么?示例:pandas是基于NumPy的一种工具,该工具是为了解决数据分析任务而创建的。二、使用步骤1.引入库代码如下(示例):importnumpyasnpimportpandasaspdimportmatpl

  • Mac 如何强制关机?「建议收藏」

    Mac 如何强制关机?「建议收藏」在通常情况下,MacOSX是非常稳定的,但是它偶尔也会发点小脾气,出现应用程序没有响应的情况。如果你正在运行的应用程序失去响应,强制退出一般都能解决,但是偶尔也会出现整个系统都失去响应,鼠标不能用,这时候你只能强制关机了。楼主使用Mac2年多了,只遇到过一次死机哈。下面介绍两种强制关机的解决办法:1、不用学就明白的,跟windows一样的,长按电源键不放,五秒之后电脑就会强行切断电源。不过它有个坏处,就是可能会损坏系统文件,所以建议不要使用这种方法。2、同时按住control+.

  • setAttribute改变属性,动态改变类

    setAttribute改变属性,动态改变类

  • 独有系统的两个潜在风险

    独有系统的两个潜在风险

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号