Skip to content

Go 语言实战 第 6 章 并发

🏷️ Go 《Go 语言实战》

6.1 并发和并行

Go 语言里的并发指的是能让某个函数独立于其它函数的能力。

Go 语言的并发同步模型来自一个叫做 通信顺序进程(Communicating Sequential Processes,CSP 的泛型(paradigm)。
CSP 是一种消息传递模型,通过在 goroutine 之间传递数据来传递消息,而不是对数据进行加锁来实现同步访问。
用于在 goroutine 之间同步和传递数据的关键数据类型叫做 通道(channel

线程(thread & 进程(process

当运行一个应用程序时,操作系统会为这个应用程序启动一个进程。
可以将这个进程看作一个包含了应用程序在运行时需要用到和维护的各种资源的容器。

一个线程是一个执行空间,这个空间会被操作系统调度来运行函数中所写的代码。
每个进程至少包含一个线程,每个进程的初始线程被称作主线程
当主线程终止时,应用程序也会终止。
操作系统将线程调度到处理器上运行,这个处理器并不一定是进程所在的处理器。

操作系统会在物理处理器上调度线程来运行,而 Go 语言的运行时会在逻辑处理器上调度 goroutine 来运行。
每个逻辑处理器都分别绑定到单个操作系统线程。
在 1.5 版本上,Go 语言运行时默认会为每个物理处理器分配一个逻辑处理器。
在 1.5 版本之前的版本中,默认给整个应用程序分配一个逻辑处理器。
这些逻辑处理器会用于执行所有被创建的 goroutine
即便只有一个逻辑处理器,Go 也可以以神奇的效率和性能,并发调度无数个 goroutine

正在运行的 goroutine 需要执行一个阻塞的系统调用时,线程和 goroutine 会从逻辑处理器上分离,该线程会继续阻塞,等待系统调用的返回。
与此同时,这个逻辑处理器就失去了用来运行的线程。
调度器会创建一个新线程,并将其绑定到该逻辑处理器上。
之后,调度器会从本地运行队列里选择另一个 goroutine 来运行。
一旦被阻塞的系统调用执行完成并返回,对应的 goroutine 会放回到本地运行队列,而且之前的线程会保存好,以便之后可以继续使用。

如果一个 goroutine 需要做一个网络 I/O 调用,流程上会有些不一样。
此时,goroutine 会和逻辑处理器分离,并移到集成了网络轮询器的运行时。
一旦该轮询器指示某个网络读或者写操作已经就绪,对应的 goroutine 就会重新分配到逻辑处理器上来完成操作。

调度器对可以创建的逻辑处理器的数量没有限制,但语言运行时默认限制每个程序最多创建 10000 个线程。
这个限制值可以通过调用 runtime/debug 包的 SetMaxThreads 方法来更改。如果程序试图使用更多的线程,就会崩溃。

JiaJia:

逻辑处理器的数量不是默认为物理处理器的数量么?
根据之前的说明,逻辑处理器的数量和线程应该不是一对一的关系才对。
是我理解错了?还是翻译问题?

并发(concurrency不是并行(parallelism
并行是让不同的代码片段同时在不同的物理处理器上执行。
并行的关键是同时做很多事情,而并发是指同时管理很多事情,这些事情可能只做了一半就被暂停去做别事情了。
在很多情况下,并发的效果比并行好,因为操作系统和硬件的总资源一半很少,但能支持系统同时做很多事情。

6.2 goroutine

go
// This sample program demonstrates how to create goroutines and
// how the scheduler behaves.
package main

import (
    "fmt"
    "runtime"
    "sync"
)

// main is the entry point for all Go programs.
func main() {
    // Allocate 1 logical processor for the scheduler to use.
    runtime.GOMAXPROCS(1)

    // wg is used to wait for the program to finish.
    // Add a count of two, one for each goroutine.
    var wg sync.WaitGroup
    wg.Add(2)

    fmt.Println("Start Goroutines")

    // Declare an anonymous function and create a goroutine.
    go func() {
        // Schedule the call to Done to tell main we are done.
        defer wg.Done()

        // Display the alphabet three times
        for count := 0; count < 3; count++ {
            for char := 'a'; char < 'a'+26; char++ {
                fmt.Printf("%c ", char)
            }
        }
    }()

    // Declare an anonymous function and create a goroutine.
    go func() {
        // Schedule the call to Done to tell main we are done.
        defer wg.Done()

        // Display the alphabet three times
        for count := 0; count < 3; count++ {
            for char := 'A'; char < 'A'+26; char++ {
                fmt.Printf("%c ", char)
            }
        }
    }()

    // Wait for the goroutines to finish.
    fmt.Println("Waiting To Finish")
    wg.Wait()

    fmt.Println("\nTerminating Program")
}

WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine 。如果 WaitGroup 的值大于 0,Wait 方法就会阻塞。

基于调度器的内部算法,一个正运行的 goroutine 在工作结束前,可以被停止并重新调度。
调度器这样做的目的是防止某个 goroutine 长时间占用逻辑处理器。当 goroutine 占用时间过长时,调度器会停止当前正运行的 goroutine,并给其它可运行的 goroutine 运行的机会。

Go 标准库的 runtime 包里有一个名为 GOMAXPROCS 的函数,通过它可以指定调度器可用的逻辑处理器的数量。
用这函数,可以给每个可用的物理处理器在运行的时候分配一个逻辑处理器。

go
import "runtime"

// 给每个可用的核心分配一个逻辑处理器
runtime.GOMAXPROCS(runtime.NumCPU())

需要强调的是,使用多个逻辑处理器并不意味着性能更好。
在修改任何语言运行时配置参数时,都需要配合基准测试来评估程序的运行效果。

6.3 竞争状态

如果两个或多个 goroutine 在没有互相同步的情况下,访问某个共享资源,并试图同时读和写这个资源,就处于互相竞争的状态,这种情况被称作竞争状态(race condition
对一个共享资源的读和写操作必须是原子化的,换句话说,同一时刻只能有一个 goroutine 对共享资源进行读和写操作。

这是一个存在竞争并导致运行结果不及预期的示例:

go
// This sample program demonstrates how to create race
// conditions in our programs. We don't want to do this.
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    // counter is a variable incremented by all goroutines.
    counter int

    // wg is used to wait for the program to finish.
    wg sync.WaitGroup
)

// main is the entry point for all Go programs.
func main() {
    // Add a count of two, one for each goroutine.
    wg.Add(2)

    // Create two goroutines.
    go incCounter(1)
    go incCounter(2)

    // Wait for the goroutines to finish.
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}

// incCounter increments the package level counter variable.
func incCounter(id int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()

    for count := 0; count < 2; count++ {
        // Capture the value of Counter.
        value := counter

        // Yield the thread and be placed back in queue.
        runtime.Gosched()

        // Increment our local value of Counter.
        value++

        // Store the value back into Counter.
        counter = value
    }
}

可以通过 go build -race 来检测是否存在竞争。

JiaJia:

执行了但是没有效果,没有显示错误消息

6.4 锁住共享资源

Go 语言提供了传统的同步 goroutine 机制,就是对共享资源加锁。

原子函数能够以很底层的加锁机制来同步访问整型变量和指针。

go
import "sync/atomic"

var counter int64

atomic.AddInt64(&counter, 1)

AddInt64 方法强制同一时刻只能有一个 goroutine 运行并完成这个加法操作。

另外两个有用的原子函数是 LoadInt64StoreInt64
这两个函数提供了一种安全地读和写一个整型值的方式。

另一种同步访问共享资源的方式是使用互斥锁(mutex
互斥锁用户在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界区代码。

go
// This sample program demonstrates how to use a mutex
// to define critical sections of code that need synchronous
// access.
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    // counter is a variable incremented by all goroutines.
    counter int

    // wg is used to wait for the program to finish.
    wg sync.WaitGroup

    // mutex is used to define a critical section of code.
    mutex sync.Mutex
)

// main is the entry point for all Go programs.
func main() {
    // Add a count of two, one for each goroutine.
    wg.Add(2)

    // Create two goroutines.
    go incCounter(1)
    go incCounter(2)

    // Wait for the goroutines to finish.
    wg.Wait()
    fmt.Printf("Final Counter: %d\n", counter)
}

// incCounter increments the package level Counter variable
// using the Mutex to synchronize and provide safe access.
func incCounter(id int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()

    for count := 0; count < 2; count++ {
        // Only allow one goroutine through this
        // critical section at a time.
        mutex.Lock()
        {
            // Capture the value of counter.
            value := counter

            // Yield the thread and be placed back in queue.
            runtime.Gosched()

            // Increment our local value of counter.
            value++

            // Store the value back into counter.
            counter = value
        }
        mutex.Unlock()
        // Release the lock and allow any
        // waiting goroutine through.
    }
}

mutex.Lock()mutex.Unlock() 之间的代码即是临界区。
大括号是为了让临界区看起来更清晰,并不是必需的。

6.5 通道

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。
声明通道时,需要指定将要被共享的数据的类型。
可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。

使用 make 方法创建通道:

go
// 创建无缓冲的整型通道
unbuffered := make(chan int)

// 创建有缓冲的字符串通道
buffered := make(chan string, 10)

向通道发送值(<-):

go
// 创建有缓冲的字符串通道
buffered := make(chan string, 10)

// 通过通道发送一个字符串
buffered <- "JiaJia"

从通道里接受值:

go
// 从通道里接收一个字符串
value := <-buffered

无缓冲的通道(unbuffered channel 是指在接收前没有能力保存任何值的通道。
这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接受操作。
如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接受操作的 goroutine 阻塞等待。
这种对通道进行发送和接受的交互行为本身就是同步的。
其中任意一个操作都无法离开另一个操作单独存在。

go
// This sample program demonstrates how to use an unbuffered
// channel to simulate a relay race between four goroutines.
package main

import (
    "fmt"
    "sync"
    "time"
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

// main is the entry point for all Go programs.
func main() {
    // Create an unbuffered channel.
    baton := make(chan int)

    // Add a count of one for the last runner.
    wg.Add(1)

    // First runner to his mark.
    go Runner(baton)

    // Start the race.
    baton <- 1

    // Wait for the race to finish.
    wg.Wait()
}

// Runner simulates a person running in the relay race.
func Runner(baton chan int) {
    var newRunner int

    // Wait to receive the baton.
    runner := <-baton

    // Start running around the track.
    fmt.Printf("Runner %d Running With Baton\n", runner)

    // New runner to the line.
    if runner != 4 {
        newRunner = runner + 1
        fmt.Printf("Runner %d To The Line\n", newRunner)
        go Runner(baton)
    }

    // Running around the track.
    time.Sleep(100 * time.Millisecond)

    // Is the race over.
    if runner == 4 {
        fmt.Printf("Runner %d Finished, Race Over\n", runner)
        wg.Done()
        return
    }

    // Exchange the baton for the next runner.
    fmt.Printf("Runner %d Exchange With Runner %d\n",
        runner,
        newRunner)

    baton <- newRunner
}

有缓冲的通道(buffered channel 是一种在被接收前能存储一个或者多个值的通道。
这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。
通道会阻塞发送和接收动作的条件也会不同。
只有在通道中没有要接收的值时,接收动作才会阻塞。
只有在通道没有可用缓存区容纳被发送的值时,发送动作才会阻塞。

无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;
有缓冲的通道没有这种保证。

通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。
如果在获取通道时还加入了可选的标志,就能得到通道的状态信息。

go
// This sample program demonstrates how to use a buffered
// channel to work on multiple tasks with a predefined number
// of goroutines.
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

const (
    numberGoroutines = 4  // Number of goroutines to use.
    taskLoad         = 10 // Amount of work to process.
)

// wg is used to wait for the program to finish.
var wg sync.WaitGroup

// init is called to initialize the package by the
// Go runtime prior to any other code being executed.
func init() {
    // Seed the random number generator.
    rand.Seed(time.Now().Unix())
}

// main is the entry point for all Go programs.
func main() {
    // Create a buffered channel to manage the task load.
    tasks := make(chan string, taskLoad)

    // Launch goroutines to handle the work.
    wg.Add(numberGoroutines)
    for gr := 1; gr <= numberGoroutines; gr++ {
        go worker(tasks, gr)
    }

    // Add a bunch of work to get done.
    for post := 1; post <= taskLoad; post++ {
        tasks <- fmt.Sprintf("Task : %d", post)
    }

    // Close the channel so the goroutines will quit
    // when all the work is done.
    close(tasks)

    // Wait for all the work to get done.
    wg.Wait()
}

// worker is launched as a goroutine to process work from
// the buffered channel.
func worker(tasks chan string, worker int) {
    // Report that we just returned.
    defer wg.Done()

    for {
        // Wait for work to be assigned.
        task, ok := <-tasks
        if !ok {
            // This means the channel is empty and closed.
            fmt.Printf("Worker: %d : Shutting Down\n", worker)
            return
        }

        // Display we are starting the work.
        fmt.Printf("Worker: %d : Started %s\n", worker, task)

        // Randomly wait to simulate work time.
        sleep := rand.Int63n(100)
        time.Sleep(time.Duration(sleep) * time.Millisecond)

        // Display we finished the work.
        fmt.Printf("Worker: %d : Completed %s\n", worker, task)
    }
}

6.6 小结

  • 并发是指 goroutine 运行的时候是相互独立的。
  • 使用关键字 go 创建 goroutine 来运行函数。
  • goroutine 在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列。
  • 竞争状态是指两个或多个 goroutine 试图访问同一个资源。
  • 原子函数和互斥锁提供了一种防止出现竞争状态的方法。
  • 无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证。