Go笔记5-Go并发编程

关于并发和多线程,我反而不想在这里写很多东西。为什么呢? 相信了解到Go语言并且愿意看到这里的人,应该是对各种语言的多线程的管理和锁机制应该都是有所感触,甚至对Java这类语言的多线程管理的复杂性和效率问题感到厌烦了。而近些年来越来越被开发者们对消息机制和分布式消息机制更加宠幸,因为多线程的开销太大,锁机制容易出现问题和效能的低下,CPU时间片的等待和浪费,让消息机制显出他的高效、non-blocking的本领。

其中最具典型的无非就是各种开源消息队列工具了,比如Scala的Akka,Kafka等。而消息机制的核心,就是通过互相隔离的线程、隔离的内存、隔离业务,实现对高性能高吞吐业务的解耦,以及高效能执行组件的分离。我们可以看出来消息机制本质是一种消费者生产者模式,而他们将不像是原来线程机制下共享内存,共享变量,加锁的方式来对变量做互斥处理,甚至不再共享一个线程,不再共享进程,再甚至可以不在一个主机上,当实现了进程、或者主机业务的解耦时,他们就成为了分布式组件,而让他们之间互相通讯的,就是消息。消息在组件之间通讯,这就是消息机制,它可以是分布式的。

注:实际上消息机制也是一种很古老的计算机编程模型。它异步的提交一个内存更改任务到另一个线程持有的队列当中,当队列被取出任务时,开始执行。我们可以在所有的操作系统上的任意组件,都可以看到消息机制的影子。分布式消息队列、分布式缓存,是后端开发者们毫无疑问经常用到的。在显示器上,Windows窗口的组件刷新,是窗口主线程的消息队列所管理的。Android系统的Handler消息处理系统,手机上的触摸事件系统,生产者消费者模型等。甚至生活中亦是如此。

有时任务繁多,在内核数相同的线程池中,每个线程持有一个可以执行的对象,当线程空闲时可以立即使用。最大化的使用CPU效能,这种方式比起线程任务的等待更加轻量和节省CPU等待时间,节省了CPU对线程上下文切换的开销,使得CPU的任务执行体更加轻量,这被称作协程。协程可以更加轻量的被创建、执行和调度。在Go语言中,提供了go关键字和channel关键字来实现对轻量级执行体goroutine和消息通道机制的实现。而在Java等语言中,只有库的方式的实现。然而在go语言当中,go关键字实际上使用非常简单。在方法前加go关键字,就可以实现并发执行。

go语言一书当中使用了一个使用了同步锁的机制的例子来对比。

go关键字

现在我们直接来看go关键字的例子:

package main

import (
    "fmt"
    "time"
)
//异步地准备,服务员在做的事情。
func prepareAsync(what string, seconds int) {
    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println(what, "上好了!")
}

func main() {
    //我在等饮料
    fmt.Println("Waiting..")
    //服务员与我是异步的,不共享任何量,无锁,所以服务员做什么事与我无关。
    go prepareAsync("红茶", 3)    //3 秒后上来
    go prepareAsync("牛奶", 5)    //5 秒后上来
    time.Sleep(6 * time.Second) //我要等待6 秒
    fmt.Println("所有饮料上齐了")      //6秒时 喊 上齐了
}

运行结果是:

Waiting..       [直接打印]
红茶 上好了!     [等待3秒后打印]
牛奶 上好了!     [等待5秒后打印]
所有饮料上齐了    [等待6秒后打印]

如果主函数不对goroutine等待,程序会立即结束,不会理会正在运行的goroutine,也就是说,他们不会在执行行完就结束,而是直接结束。然而我们不会总是使用sleep这种笨办法去控制goroutine。这时候channel就派上用场了。我们需要channel来让goroutine之间相互发送消息进行通讯。 跟创建数组切片map和一样,我们需要用make函数来创建channel:

var cchan1 chan int = make(chan int) //这样写未免太麻烦,不如下面的写法简练
chStr := make(chan string)
chInt := make(chan int)
chAny := make(chan interface{})

channel机制chan关键字

channel像一个UNIX通道,他为不同的goroutine之间传递消息进行通讯。一个channel跟一个变量区别不大,它也可以被赋值,但是它可以像消息一样被发送和接收数据。

我们看一下如何接收和发送的语法是如何:

cchan1 <- 1      //发送整数1到channel量到cchan1
<-cchan1            //从channel量cchan1中接收值
i := <-cchan1      //从channel量cchan1中接收值并且赋值给i

我们结合go关键字和chan关键字,看看如何实现通道为goroutine通讯:

// goroutine
package main

import (
    "fmt"
    "time"
)

var c chan int //传递杯数的通道量
var i int      //缓存通道接收的数据并显示

//异步地准备,服务员在做的事情。
func prepareAsync(what string, seconds int) {
    i = i + 1
    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println(what, "上好了!")
    //发给顾客的第几杯
    c <- i
}

func main() {
    i = 0
    c = make(chan int)
    //我在等饮料
    fmt.Println("Waiting..")
    num := 0
    //服务员与我是异步的,不共享任何量,无锁,所以服务员做什么事与我无关。
    go prepareAsync("红茶", 3) //服务员需要 3 秒准备
    num = <-c                //客户等待直到第二杯上来
    fmt.Println("第", num, "杯")

    go prepareAsync("牛奶", 5) //服务员需要 5 秒准备
    num = <-c                //客户等待直到第二杯上来
    fmt.Println("第", num, "杯")

    fmt.Println("所有饮料上齐了") //告知上齐了

}

执行结果:

Waiting..
红茶 上好了!   [3秒后显示]
第 1 杯
牛奶 上好了!   [5秒后显示]
第 2 杯
所有饮料上齐了

select关键字

我们可以看到,这样做其实并不太好,因为无法确定在什么时候会阻塞等待通道量传入,我们可以用过select case 把通道量的选取收集起来一起使用。这看起来很像是消息循环触发的特定的消息做法。当然只是形式上很像而已。然而实际上并不是这么回事。 select 实际上就是把通道操作当成switch的类似的方式进行操作的一个关键字当你使用了很多的go关键字运行了很多goroutine,然后使用了很多chan通道量的时候,要对不同的goroutine传递的通道量进行不同的操作,这时候select就派上用场了。 当然,我们还用上面的例子来改。不过上面的例子没有体现多通道量的操作。

// goroutine
package main

import (
    "fmt"
    "time"
)

var c chan int //传递杯数的通道量
var i int      //缓存通道接收的数据并显示

//异步地准备,服务员在做的事情。
func prepareAsync(what string, seconds int) {

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println(what, "上好了!")
    //发给顾客的第几杯
    c <- i
}

func main() {
    i = 0
    c = make(chan int)
    //我在等饮料
    fmt.Println("Waiting..")

    //服务员与我是异步的,不共享任何量,无锁,所以服务员做什么事与我无关。
    go prepareAsync("红茶", 3) //服务员需要 3 秒准备
    go prepareAsync("牛奶", 5) //服务员需要 5 秒准备
L:
    for {
        select {
        case <-c:
            i = i + 1
            fmt.Println("第", i, "杯")
            if i > 1 {
                fmt.Println("所有饮料上齐了") //告知上齐了
                break L
            }
        }
    }

}

最后的运行效果仍然和上例的运行效果一样。 当然,如果是多个chan量,也可以增加case:

// goroutine
package main

import (
    "fmt"
    "time"
)

var c chan int         //传递杯数的通道量
var smokingOK chan int //抽烟时间
var i int              //缓存通道接收的数据并显示

//异步地准备,服务员在做的事情。
func prepareAsync(what string, seconds int) {

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println(what, "上好了!")
    //发给顾客的第几杯
    c <- i //i在这里其实看起来没啥用了,只是起了一个通知的作用
}

//自己抽烟去
func smoking(seconds int) {
    fmt.Println("点完饮料开始抽烟")
    time.Sleep(time.Duration(seconds) * time.Second)
    smokingOK <- seconds
}

func main() {
    i = 0
    c = make(chan int)
    smokingOK = make(chan int)
    //我在等饮料
    go smoking(4)
    go prepareAsync("红茶", 3) //服务员需要 3 秒准备
    go prepareAsync("牛奶", 5) //服务员需要 5 秒准备
L:
    for {
        select {
        case <-c:
            i = i + 1
            fmt.Println("第", i, "杯")
            if i > 1 {
                fmt.Println("所有饮料上齐了") //告知上齐了
                break L
            }
        case timeDuring := <-smokingOK:
            fmt.Println("抽烟结束回到卡座,用时:", timeDuring)
        }
    }
}

运行结果:

点完饮料开始抽烟
红茶 上好了!
第 1 杯
抽烟结束回到卡座,用时: 4
牛奶 上好了!
第 2 杯
所有饮料上齐了

注: 这里可以配合runtime下的几个函数去根据不同的机器去优化性能,去使用一些常用的功能。比如: - runtime.Goexit() 退出当前goroutine - runtime.Gosched() 让渡当前Goroutine,延期继续执行 - runtime.NumCPU() 返回 CPU 核数量 - runtime.NumGoroutine() 返回一共多少goroutine - runtime.GOMAXPROCS() 设置可并行的CPU逻辑核心数量

case可以设置运行超时执行的方式:

case <- time.After(5 * time.Second)

上例可以再进行一点小修改:

// goroutine
package main

import (
    "fmt"
    "runtime"
    "time"
)

var c chan int         //传递杯数的通道量
var smokingOK chan int //抽烟时间
var i int              //缓存通道接收的数据并显示

//异步地准备,服务员在做的事情。
func prepareAsync(what string, seconds int) {

    time.Sleep(time.Duration(seconds) * time.Second)
    fmt.Println(what, "上好了!")
    //发给顾客的第几杯
    c <- i //i在这里其实看起来没啥用了,只是起了一个通知的作用
}

//自己抽烟去
func smoking(seconds int) {
    fmt.Println("点完饮料开始抽烟")
    time.Sleep(time.Duration(seconds) * time.Second)
    smokingOK <- seconds
}

func main() {
    i = 0
    c = make(chan int)
    smokingOK = make(chan int)
    fmt.Println("CPUs,", runtime.NumCPU())
    //我在等饮料
    go smoking(4)
    go prepareAsync("红茶", 3) //服务员需要 3 秒准备
    go prepareAsync("牛奶", 5) //服务员需要 5 秒准备
L:
    for {
        select {
        case <-c:
            i = i + 1
            fmt.Println("第", i, "杯")
            if i > 1 {
                fmt.Println("所有饮料上齐了") //告知上齐了
                break L
            }
        case timeDuring := <-smokingOK:
            fmt.Println("抽烟结束回到卡座,用时:", timeDuring)
        case <-time.After(2 * time.Second):
            fmt.Println("大喊一声:老子的牛奶怎么还没上?")
        }
    }
}

运行结果:

CPUs, 8
点完饮料开始抽烟
大喊一声:老子的牛奶怎么还没上?
红茶 上好了!
第 1 杯
抽烟结束回到卡座,用时: 4
牛奶 上好了!
第 2 杯
所有饮料上齐了

channel的缓冲机制

make函数实际上是内置的内存分配函数,这同样适用于chan通道量类型的变量。 这让一个通道量自身有了缓冲区大小,在这个缓冲区大小写满以前,写入不会被阻塞。这与无缓冲不一样,无缓冲是写入一个值立即通知通道量读取。 带缓冲的通道量定义:

c := make(chan int, 32)

读取时要读取所有的缓冲的数据:

for i := range c {
 fmt.Println("Received:",i)
}

举个小栗子验证用法:

package main

import (
    "fmt"
    "time"
)

var buffer chan int

func main() {
    buffer = make(chan int, 32)
    go readBuff()
    putBuff()

}
func putBuff() {
    for i := 0; i < 34; i++ {
        time.Sleep(time.Duration(1) * time.Second)
        buffer <- i
    }
}

func readBuff() {
    for c := range buffer {
        fmt.Println("Rec", c)
    }
}

如果你希望约束一个channel为单向数据通道,我们可以定义为单向读取还是单向写入。

var ch1 chan int // ch1是一个正常的channel,不是单向的
var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
var ch3 <-chan int // ch3是单向channel,只用于读取int数据

那么单向channel如何初始化呢?之前我们已经提到过, channel是一个原生类型,因此不仅支持被传递,还支持类型转换。只有在介绍了单向channel的概念后,读者才会明白类型转换对于channel的意义:就是在单向channel和双向channel之间进行转换。示例如下:

ch4 := make(chan int)
ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
ch6 := chan<- int(ch4) // ch6 是一个单向的写入channe

最后,关闭我们的channel,用一个内置函数close()即可。

同步锁

Go当中也提供了显式同步锁(Java中有隐式同步锁,Go里没有) sync.Mutex和sync.RWMutex。 Mutex是最简单的一种锁,RWMutex是读写锁。