Go 言語による並行処理 読書メモ

Nov 24, 2019 ( Feb 11, 2022 更新 )

Go 言語による並行処理 の読書メモです。 読書メモなので私の独自解釈も入ります。気になった方は原著を買ってください。

1章 並行処理入門

競合状態

  • データ競合
    • ある処理がデータを読み取ろうとしている時に、別の処理が書き込みをしようとしている
  • アトミック性
  • 操作対象が特定のコンテキストの中で分割不可能、中断不可能である
  • コンテキスト
    • 別の言い方でスコープ
      • ある操作がアトミックであると言える境界
    • OS のコンテキスト、プログラムのコンテキストは異なる。プログラムではアトミックな操作も OS のコンテキストではアトミックではないかもしれない
  • アトミック
    • i++ では「i の値を取得する」「i の値を 1 増やす」「i の値を保存する」という単体ではアトミックな操作は、組み合わせるとアトミックでは無くなる
    • アトミックな操作を組み合わせても大きなアトミックな操作を作ることはできない
    • 操作がアトミックになるかはコンテキストに依存する
  • 排他的なアクセスが必要な箇所を「クリティカルセクション」と呼ぶ
    • 書籍の例では goroutine, if, print
  • ある種の問題はメモリへの並行アクセスで解決することができるが、パフォーマンスに悪影响がある。次のようなことを考えなければいけなくなる:
    • クリティカルセクションが繰り返されていないか
    • クリティカルセクションの大きさはどのくらいに留めるべきか

デッドロック

  • すべての並行なプログラムがお互いの処理を待っている状態
  • デッドロックが発生するために存在しなければならない条件は、「Coffman 条件」として発表されている
    • 相互排他
      • ある並行プロセスが、リソースに対して排他的な権利をどの時点においても保持している
    • 条件待ち
      • ある並行プロセスは、リソースの保持と追加のリソース待ちを同時に行わなければいけない
    • 横取り不可
      • ある並行プロセスが保持しているリソースは、そのプロセスによってのみ開放される
    • 循環待ち
      • ある並行プロセス P1 は、別の並行プロセス P2 を待たなければいけない。そして、 P2 は P1 を待っている
  • Coffman 条件の少なくとも 1 つが真にならなければデッドロックの発生を防げる

ライブロック

  • 並行操作を待っているが、その操作はプログラムの状態をまったく進めていないプログラム
  • 廊下ですれ違おうとしている 2 人が、お互い避けようとして永遠に続くような状態

リソース枯渇

  • 貪欲にワークグループ全体で共有ロックを保持すると、他のプロセスが効率的に仕事をできなくなる。あるいは完全に仕事を邪魔されている状態になる
  • リソース枯渇は CPU, memory などにも適用される

  • 並行処理を使う時は問題空間(問題を解決するにあたり考察しなければいけない範囲)を定める
    • コメントを書く。以下の3つについて書くと良い:
      • 誰が並行処理を担っているか
      • 問題空間がどのように並行処理と関わっているか
      • 誰が同期処理を担っているか
    • シグネチャでわかりやすくする
      • 戻り値を chan にする
        • 他の goroutine を用意する必要がないことがシグネチャから読み取れる

2章 並行性を同モデル化するか: CSP とは何か

  • 並行性はコードの性質
  • 並列性は動作しているプログラムの性質
  • ※ exploit; エクスプロイト = 脆弱性を利用するツールのこと
  • Go は、 OS のスレッドの上にさらに抽象化層を追加するのではなく、新たなプリミティブとして goroutine を定義している
    • 問題空間を考えるときに OS スレッドについて考える必要がほとんどないことを意味する

CSP; Communicating Sequential Processes

  • メッセージパッシングとガード
    • Go の channel, select にあたる
    • ※ Erlang とも少し似ていると思った(wikipedia)

ある言語に、並列処理に関する懸念を抽象化により見えないようにしてくれるフレームワークがある というだけで、この、並行処理を自然に書ける性質がどうでも良くなるわけではありません!誰かがそ のフレームワークを書かなければなりませんし、あなたのコードはそのフレームワーク作者が取り組む 複雑さの上に成り立っているのです。複雑さが見えないところに隠されているからといって、存在しな いというわけではありません。そして複雑さはバグの温床です。 (p.30 より引用)

  • Go のランタイムは goroutine を OS スレッドへ自動的にマルチプレキシングし、そのスケジューリングもする
    • goroutine が IO 待ち等をしていることをランタイムが認識して、 IO 待ちをしていない OS スレッドをその goroutine に再割当てするなどの最適化が行われている

Go の並行処理における哲学

  • Go では、低水準の API も用意しているものの、特に必要でなければ高水準の API を使いましょうという話

syncパッケージでは排他制御といった基本的な同期のためのプリミティブを提供します。 Once型 と WaitGroup型以外は、低水準ライブラリ内で利用されることを想定しています。高水準の同期 はチャネルや通信によって行われたほうが良いでしょう。 (p.32 より引用)

  • 「通信によってメモリを共有する」という哲学がある

ミューテックスに関して言えば、syncパッケージがそれを実装していますが、Go のプログラミン グスタイルでは、高水準の技術を使うことを推奨しています。特に、プログラムを書く際にはある 瞬間にただ 1 つのゴルーチンがある特定のデータの責任を持つように心がけてください。 メモリを共有することで通信してはいけません。かわりに、通信することでメモリを共有しましょう。 (p.32 より引用)


Go の Wiki には以下のような指針が書かれている:

Go のモットーの 1 つに「通信によってメモリを共有し、メモリの共有によって通信してはいけない」 というものがあります。 とは言っても、Go では syncパッケージで伝統的なロック機構を提供しています。ロックに関する たいていの問題はチャネルか伝統的なロックを用いることで解決します。 ではどちらを使うべきなのでしょう。 最も適切に表現できて、かつ、あるいはまたは、最も簡潔に書けるならどちらでも構いません。 (p.32, 33 より引用)

筆者はこれは良いアドバイスだが少し曖昧だと書いている。また、以下の決定木を書いている1:

chart

@startuml
hide empty description

[*] --> 1
1 --> 2: no
1 --> primitive: yes
2 --> 3: no
2 --> chan: yes
3 --> 4: no
3 --> primitive: yes
4 --> primitive: no
4 --> chan: yes

state "パフォーマンスクリティカルセクションか?" as 1
state "データの所有権を移動しようとしているか?" as 2
state "構造体の内部の状態を保護しようとしているか?" as 3
state "複数のロジックを協調させようとしているか?" as 4
state "チャネルを使う" as chan
state "プリミティブを使う" as primitive
@enduml

データの所有権を移動しようとしているか?

  • 何かしらの処理の結果を生成するコードがあり、その結果を別のコードに共有したい場合、これはデータの所有権の移動ということになる
  • データには所有権がある。並行プログラムを安全にする方法の1つとしては、一度に一つの並行処理のコンテキストのみがデータの所有権を持つようにすることが挙げられる
  • バッファ付きチャネルを作成して、インメモリのキューを実装し Producer, Consumer を切り離すことができる

構造体の内部の状態を保護しようとしているか?

  • = メモリアクセス同期を使うか?
  • メモリアクセス同期を使うことで、クリティカルセクションをロックする複雑な実装を隠蔽することができる
type Counter struct {
  mu sync.Mutex
  value int
}

// Increment 呼び出し側はロック処理について気にかける必要がない
func (c *Counter) Increment() {
  c.mu.Lock()
  defer c.mu.Unlock()
  c.value++
}
  • 型を越えてロックを公開すると危険。ロックは小さなレキシカルスコープ内に制限するべき

複数のロジックを協調させようとしているか?

並行なコードがどのように動作しているか、なぜデッドロックや競合が発生しているか、なぜプリミティブを使っているのかがわからなくて苦戦しているのであれば、おそらくチャネルを使うべき良いサインです。 (p.34 より引用)

パフォーマンスクリティカルセクションか?

  • まずプログラムを再設計できないか考える
  • プロファイルを取って、該当箇所が他の箇所よりもオーダー数桁遅いのであれば、メモリアクセス同期のプリミティブを使うことで負荷がかかってもよりよく動作する

  • 問題空間を goroutine に当てはめられるようにすると並行処理が書きやすくなる

Go の並行処理における哲学は次のようにまとめられます。簡潔さを求め、チャネルをできる限り使い、ゴルーチンを湯水のように使いましょう。 (p.35 より引用)

3章 Go における並行処理の構成要素

goroutine

  • goroutine は OS スレッドでもグリーンスレッド(言語のランタイムにより管理されるスレッド)ではない
  • goroutine は coroutine として知られる高水準の抽象化
    • preemptive ではない並行処理。つまり、割り込みをされることがない
    • preemption とは、マルチタスクのコンピュータシステムが実行中のタスクを一時的に中断する動作
  • coroutine が暗黙的に並列というわけではない
  • goroutine をホストする機構では M:N スケジューラという実装を使っている。 M 個のグリーンスレッドを N 個の OS スレッドに対応させる
    • goroutine はグリーンスレッドにスケジュールされる
  • Go は fork-join モデルにしたがう
    • fork は親から派生した子の処理を分岐させて、親と並行に実行すること
    • join は分岐した時点から先で、分岐がふたたび合流すること

chart

sayHello := func() {
  fmt.Println("hello")
}
go sayHello()

// ...something...

上記のコードでは、並行処理の合流ポイントがないため、 “hello” の出力がないまま main goroutine が終了してしまう。(終了を待つ保証がない)

var wg sync.WaitGroup
sayHello := func() {
  defer wg.Done()
  fmt.Println("hello")
}
wg.Add(1)
go sayHello()

上記のコードでは sync.WaitGroup を使って同期処理を実装している。 sayHello 関数をホストしている goroutine が終了するまで main goroutine をブロックし、 2つの goroutine の間に合流ポイントを持つ。

var wg sync.WaitGroup
salutation := "hello"
wg.Add(1)
go func() {
  defer wg.Done()
  salutation = "welcome"
}()
wg.Wait()
fmt.Println(salutation)

上記のコードでは

welcome

を表示して終了する。 goroutine は同じアドレス空間を共有している。

var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
  wg.Add(1)
  go func() {
    fmt.Println(salutation)
  }()
}
wg.Wait()

上記の例では以下のように出力される結果となる:

good day
good day
good day

goroutine は未来の任意のタイミングにスケジュールされる。 そのため、 goroutine が開始する前に range のループが終了してしまい、変数 salutation に assign された値が “good day” の状態で goroutine が実行されている。

このような振る舞いを防ぎ、 range ループで変数を変えながら goroutine を実行したい場合はクロージャに salutation のコピーを渡して、 goroutine が実行される前に変数にそれぞれの値が assign されているように書く必要がある:

var wg sync.WaitGroup
for _, salutation := range []string{"hello", "greetings", "good day"} {
  wg.Add(1)
  go func(salutation string) {
    defer wg.Done()
    fmt.Println(salutation)
  }(salutation) // クロージャに range でループ中に取得できる salutation を渡す
}
wg.Wait()

上記のコードを実行すると、以下のように string の slice の文字列がランダムな順に表示される:

good day
hello
greetings

goroutine のガベージコレクターは、破棄された(ブロックした状態になっている) goroutine を回収するようなことはしない。

go func () {
  // 永久にブロックする操作
}
// 何かの処理

上記のようなコードを書くと、この例の goroutine はプロセスが終了するまで存在し続ける。

終了しない goroutine をずっと起動させていても、メモリ使用量、コンテキストスイッチのコストは小さい。 1件の goroutine で使用するメモリは約 2KB である。 コンテキストスイッチにかかる時間も非常に短い。書籍の例ではベンチ結果は 0.255 マイクロ秒となっていた。

sync パッケージ

WaitGroup

  • ひとまとまりの並行処理があったとき、その結果を気にしない、もしくは他に結果を取得する手段がある場合に有効。それ以外の場合は select を使う
  • Add の呼び出しは goroutine の外側で行わないと競合状態を起こしてしまう
    • goroutine のスケジュールされるタイミングに関しては何の保証もないため
      • goroutine が開始する前に Wait の呼び出しが起こることがある
  • Add の呼び出しはできる限り監視対象の goroutine の開始直前に書く慣習がある

以下の例では、まとめて監視している:

hello := func(wg *sync.WaitGroup, id int) {
  defer wg.Done()
  fmt.Printf("Hello from %v!\n", id)
}

const numGreeters = 5
var wg sync.WaitGroup
wg.Add(numGreeters)

for i := 0; i < numGreeters; i++ {
  go hello(&wg, i+1)
}
wg.Wait()

Mutex と RWMutex

Mutex
  • Mutex は相互排他 = mutual exclusion の略
    • プログラムのクリティカルセクションを保護する方法のひとつ
  • チャネルは通信によりメモリを共有し、 Mutex は開発者がメモリの同期アクセスの慣習を踏襲することでメモリを共有する(開発者が自分でメモリアクセスを調整する)

以下の例では、 Mutex を使って変数へのアクセスを同期する:

https://play.golang.org/p/XyYVIpDVSjg

package main

import (
	"fmt"
	"sync"
)

func main() {

	var count int
	var lock sync.Mutex

	increment := func() {
		lock.Lock()
		defer lock.Unlock()
		count++
		fmt.Printf("Incrementing: %d\n", count)
	}

	decrement := func() {
		lock.Lock()
		defer lock.Unlock()
		count--
		fmt.Printf("Decrementing: %d\n", count)
	}

	var arithmetic sync.WaitGroup
	// it starts goroutine to increment count
	for i := 0; i <= 5; i++ {
		arithmetic.Add(1)
		go func() {
			defer arithmetic.Done()
			increment()
		}()
	}
	// it starts goroutine to decrement count
	for i := 0; i <= 5; i++ {
		arithmetic.Add(1)
		go func() {
			defer arithmetic.Done()
			decrement()
		}()
	}
	arithmetic.Wait()

	fmt.Println("Arithmetic complete.")
}

出力:

Incrementing: 1
Decrementing: 0
Incrementing: 1
Incrementing: 2
Decrementing: 1
Decrementing: 0
Decrementing: -1
Decrementing: -2
Decrementing: -3
Incrementing: -2
Incrementing: -1
Incrementing: 0
Arithmetic complete.
  • Unlock の呼び出しを常に defer の中で行っているのは、 panic になったときも確実に Unlock してデッドロックを回避するため
RWMutex
  • クリティカルセクションへの出入りはコストが高いので、一般的にはクリティカルセクションで消費される時間を極力短くする
  • 複数の並行処理でメモリの読み込みと書き込み両方を必要とする場合は sync.RWMutex を使う

※ playground だと sleep がうまくできないのでローカルで実行 ※ これは書籍の例のような値にならなかったのでよくわからなかった…助けて…

package main

import (
	"fmt"
	"math"
	"os"
	"sync"
	"text/tabwriter"
	"time"
)

func main() {
	// sync.Locker interface は Mutex, RWMutex 型に対応する
	
	producer := func(wg *sync.WaitGroup, l sync.Locker) {
		defer wg.Done()

		for i := 5; i > 0; i-- {
			l.Lock()
			l.Unlock()
			time.Sleep(500 * time.Millisecond) // producer は observer よりも非活発
		}
	}

	observer := func(wg *sync.WaitGroup, l sync.Locker) {
		defer wg.Done()
		l.Lock()
		defer l.Unlock()
	}

	test := func(count int, mutex, rwMutex sync.Locker) time.Duration {
		var wg sync.WaitGroup
		wg.Add(count + 1)
		beginTestTime := time.Now()
		go producer(&wg, mutex)
		for i := count; i > 0; i-- {
			go observer(&wg, rwMutex)
		}

		wg.Wait()
		return time.Since(beginTestTime)
	}

	tw := tabwriter.NewWriter(os.Stdout, 0, 1, 2, ' ', 0)
	defer tw.Flush()

	var m sync.RWMutex
	fmt.Fprintf(tw, "Readers\tRWMutex\tMutex\n")
	for i := 0; i < 20; i++ {
		count := int(math.Pow(2, float64(i)))
		fmt.Fprintf(
			tw,
			"%d\t%v\t%v\n",
			count,
			test(count, &m, m.RLocker()),
			test(count, &m, &m),
		)
	}
}

実行結果は以下のようになる:

Readers  RWMutex       Mutex
1        2.513963092s  2.51326826s
2        2.520748098s  2.515845614s
4        2.525559604s  2.516974614s
8        2.511131417s  2.524797169s
16       2.512699531s  2.510748044s
32       2.51328422s   2.522571567s
64       2.520854595s  2.524809336s
128      2.520614031s  2.516868729s
256      2.514211922s  2.508447365s
512      2.521896006s  2.523828742s
1024     2.520091659s  2.52464577s
2048     2.523356004s  2.521354608s
4096     2.511012844s  2.516553132s
8192     2.512248693s  2.515231684s
16384    2.521550223s  2.515150217s
32768    2.5025898s    2.504263721s
65536    2.506279475s  2.512076756s
131072   2.522027041s  2.518880735s
262144   2.51496602s   2.507557332s
524288   2.516493778s  2.510624845s

Cond

  • sync.Cond は goroutine の待機やイベントの発生を知らせるために使える。
  • Wait の呼び出しはブロックするだけでなく、現在の goroutine を一時停止する。そうすることで、他の goroutine が同じ OS スレッド上で動作できるようになる
  • Wait を呼び出すと Cond の引数である Locker の Unlock が呼ばれる
  • Wait から抜けると、Lock の Lock が呼ばれる

goroutine 上で処理をするときに、イベントを受け取るために待機するときに以下のようなコードを書くことがある:

for conditionTrue() == false {
  time.Sleep(1*time.Millisecond)
}

これは、以下のコードで書き換えることができる:

c := sync.NewCond(&sync.Mutex{}) // Cond のインスタンスを作る。引数として sync.Locker interface を満たす型を取る
c.L.Lock() // Cond で wait しているときにロックする
for conditionTrue() == false {
  c.Wait() // ロックした状態で待つ
}
c.L.Unlock() // ロック解除

書籍の例: https://play.golang.org/p/kbQlXC6Mcve

独自解釈すると、以下の for とフラグを使って待つコードは、

https://play.golang.org/p/EovrRliiFJC

package main

import (
	"fmt"
	"time"
)

func main() {
	var count int
	said := false

	sayFunc := func() {
		fmt.Printf("hello %d\n", count)
		said = true
	}

	for i := 0; i < 10; i++ {
		count++
		go sayFunc()
		for !said {
			time.Sleep(1 * time.Second)
		}
		said = false
	}
}

以下のように sync.Cond を使って書き換えることができる:

https://play.golang.org/p/KJi6ckGSVkE

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	c := sync.NewCond(&sync.Mutex{})
	var count int

	sayFunc := func() {
		time.Sleep(1 * time.Second)
		c.L.Lock()
		fmt.Printf("hello %d\n", count)
		c.L.Unlock()
		c.Signal()
	}

	for i := 0; i < 10; i++ {
		c.L.Lock()
		count++
		go sayFunc()
		c.Wait()
		c.L.Unlock()
	}
}

以下は Broadcast の例:

https://play.golang.org/p/9NeOfTSXcO_S

package main

import (
	"fmt"
	"sync"
)

func main() {
	type Button struct {
		Clicked *sync.Cond
	}
	button := Button{
		Clicked: sync.NewCond(&sync.Mutex{}),
	}

	subscribe := func(c *sync.Cond, fn func()) {
		var goroutineRunning sync.WaitGroup
		goroutineRunning.Add(1)
		go func() {
			goroutineRunning.Done()
			c.L.Lock()
			defer c.L.Unlock()
			c.Wait() // broadcast を待つ
			fn()     // clickRegistered.Done が実行される
		}()
		goroutineRunning.Wait() // goroutine の発火まで待つ
	}

	var clickRegistered sync.WaitGroup // subscribe を待つための WaitGroup
	clickRegistered.Add(3)

	subscribe(button.Clicked, func() {
		fmt.Println("Maximizing window.")
		clickRegistered.Done()
	})
	subscribe(button.Clicked, func() {
		fmt.Println("Displaying annoying dialog box!")
		clickRegistered.Done()
	})
	subscribe(button.Clicked, func() {
		fmt.Println("Mouse clicked.")
		clickRegistered.Done()
	})

	button.Clicked.Broadcast()
	clickRegistered.Wait() // subscribe 実行を待つ
}

Cond の使い所は、条件のスコープを cond 変数の参照のみに制限できるので、複雑な条件がある場合にそれを待つ側が cond の参照以外考慮しなくてよい(処理側のスコープに限定できる)ことだと思った。

Once

  • sync.Once.Do の処理が1回しか実行されないことを保証する

Pool

  • Get, PutNew を使って必要なときに安全に New する仕組みをつくることができる

以下は sync.Pool を使っている場合と使っていない場合の比較。 どちらも go test -benchtime=10s -bench=. で計測。

  • サーバへの接続を time.Sleep で表現している
  • sync.Pool を使った場合は毎回新規接続で1秒使うことがないので、それは早くなりますよね…という話ではある
package pool

import (
	"fmt"
	"io/ioutil"
	"log"
	"net"
	"sync"
	"testing"
	"time"
)

// daemon が起動完了するまで待つ
func init() {
	daemonStarted := startNetworkDaemon()
	daemonStarted.Wait()
}

func connectToService() interface{} {
	time.Sleep(1 * time.Second)
	return struct{}{}
}

func startNetworkDaemon() *sync.WaitGroup {
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		server, err := net.Listen("tcp", "localhost:8080")
		if err != nil {
			log.Fatalf("cannot listen: %v\n", err)
		}
		defer server.Close()

		wg.Done() // 起動完了

		for {
			conn, err := server.Accept()
			if err != nil {
				log.Printf("cannnot accept connection: %v\n", err)
				continue
			}
			connectToService() // 1 秒間待つ
			fmt.Fprintln(conn, "")
			conn.Close()
		}
	}()
	return &wg
}

func BenchmarkNetworkRequest(b *testing.B) {
	for i := 0; i < b.N; i++ {
		conn, err := net.Dial("tcp", "localhost:8080")
		if err != nil {
			b.Fatalf("cannot dial host: %v", err)
		}
		if _, err := ioutil.ReadAll(conn); err != nil {
			b.Fatalf("cannot read: %v", err)
		}
		conn.Close()
	}
}
BenchmarkNetworkRequest-8             10        1003391695 ns/op

以下は sync.Pool を使っている場合のコードとベンチ結果:

package pool2

import (
	"fmt"
	"io/ioutil"
	"log"
	"net"
	"sync"
	"testing"
	"time"
)

// daemon が起動完了するまで待つ
func init() {
	daemonStarted := startNetworkDaemonWithCache()
	daemonStarted.Wait()
}

func connectToService() interface{} {
	time.Sleep(1 * time.Second)
	return struct{}{}
}

func warmServiceConnCache() *sync.Pool {
	p := &sync.Pool{
		New: connectToService,
	}
	for i := 0; i < 10; i++ {
		p.Put(p.New())
	}
	return p
}

func startNetworkDaemonWithCache() *sync.WaitGroup {
	var wg sync.WaitGroup
	wg.Add(1)

	go func() {
		connPool := warmServiceConnCache()

		server, err := net.Listen("tcp", "localhost:8080")
		if err != nil {
			log.Fatalf("cannot listen: %v", err)
		}
		defer server.Close()

		wg.Done()

		for {
			conn, err := server.Accept()
			if err != nil {
				log.Printf("cannot accept connection: %v", err)
				continue
			}
			svcConn := connPool.Get()
			fmt.Fprintln(conn, "")
			connPool.Put(svcConn)
			conn.Close()
		}
	}()
	return &wg
}

func BenchmarkNetworkRequestWithCache(b *testing.B) {
	for i := 0; i < b.N; i++ {
		conn, err := net.Dial("tcp", "localhost:8080")
		if err != nil {
			b.Fatalf("cannot dial host: %v", err)
		}
		if _, err := ioutil.ReadAll(conn); err != nil {
			b.Fatalf("cannot read: %v", err)
		}
		conn.Close()
	}
}
BenchmarkNetworkRequestWithCache-8          4713           2465680 ns/op

Pool を使うときの注意点:

  • スレッドセーフな New メンバー変数を用意する
  • Get で対象を取得するときに、受け取るものに関しての状態の想定をしてはいけない
    • 状態を考えなくてもよい実装にすべき
  • Get で取得したものの利用が終わったら Put を確実に呼ぶこと。通常は defer を使う
  • プール内のオブジェクトはおよそ均質なものであるべき
    • ※いろんなものを突っ込むなという意図と受け取った

channel

var c chan interface{}
c := make(chan interface{})

送信専用 channel の場合は:

var c chan<- interface{}
c := make(chan<- interface{})

受信専用 channel の場合は:

var c <-chan interface{}
c := make(chan<- interface{})

Go は channel を必要に応じて一方向 channel に変換することができる。一方向 channel は、関数の引数や戻り値の型として使われる。

以下のような書き方が可能:

var receiveChan <-chan interface{}
var sendChan chan<- interface{}

dataStream := make(chan interface{})

channel はブロックする。 キャパシティがいっぱいの channel に書き込もうとする goroutine は書き込めるまでブロックする。 空の channel から読み込もうとしている goroutine は少なくとも要素が 1 つ入るまではブロックする。

以下の例は、 ch から文字列を読み込むまでブロックするため、必ず “hello” を出力する。

package channel

import "testing"

func TestReceive(t *testing.T) {
	ch := make(chan string)

	go func() {
		ch <- "hello"
	}()

	println(<-ch)
}

プログラムを正しく書かないと、 goroutine のデッドロックを引き起こしてしまう。 以下の例では、(無意味な condition を加えていることによって)デッドロックを起こす:

package channel

import "testing"

func TestReceive(t *testing.T) {
	ch := make(chan string)

	go func() {
		if true {
			return
		}
		ch <- "hello"
	}()

	println(<-ch)
}
fatal error: all goroutines are asleep - deadlock!

Go はすべての goroutine が休止しているのを検知し、デッドロックを報告して終了する。

以下の例では、受信 channel から2つの変数を受け取る:

ch := make(chan string)

go func() {
	ch <- "hello"
}()

salutation, ok := <-ch
fmt.Printf("(%v): %v\n", ok, salutation)

2つ目の変数 ok には、

  • channel から受信できたら true の bool
  • 閉じた channel から生成されたデフォルト値

が代入される。

閉じた channel

channel は close キーワードで閉じることができる。 閉じた channel から読み込むことはできない。2つの変数に channel から読み込んだときは、 false と型の初期値が返る。

ch := make(chan int)

go func() {
  ch <- 100
}()

close(ch)
salutation, ok := <-ch
fmt.Printf("(%v): %v\n", ok, salutation)
(false): 0
ch := make(chan int)

close(ch)
go func() {
  ch <- 100
}()

salutation, ok := <-ch
fmt.Printf("(%v): %v\n", ok, salutation)
(false): 0

たとえ channel から読み込みが終わっていない値があっても、 close されたらそれ以降は読み込むことができなくなる:

ch := make(chan int)

go func() {
  ch <- 100
  close(ch)
}()

salutation, ok := <-ch
fmt.Printf("(%v): %v\n", ok, salutation)
(false): 0

前述した sync.Cond の broadcast のときのような、複数の goroutine を解法する例は channel の close を使うことによって書くこともできる2:

beginCh := make(chan interface{})
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
	wg.Add(1)
	go func(i int) {
		defer wg.Done()
		<-beginCh // channel から読み込みできるようになる(close される)まで待機する
		fmt.Printf("%v has bigun\n", i)
	}(i)
}
fmt.Println("unblocking goroutine...")
close(beginCh) // close して他の goroutine がブロックをやめるようになる
wg.Wait()

バッファ付き channel

バッファ付き channel はバッファに空きが出るまで送信側の goroutine をブロックする

var stdBuf bytes.Buffer
defer stdBuf.WriteTo(os.Stdout) // 終了前にバッファから stdout に出力

intCh := make(chan int, 4) // キャパシティが 4 の channel を作成
go func() {
	defer close(intCh)
	defer fmt.Fprintln(&stdBuf, "Producer Done.")
	for i := 0; i < 5; i++ {
		fmt.Fprintf(&stdBuf, "Sending: %d\n", i) // stdout バッファに溜める
		intCh <- i
	}
}()

for integer := range intCh { // バッファに溜まった int を読み込む
	fmt.Fprintf(&stdBuf, "Received: %v.\n", integer)
}
Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Producer Done.
Received: 0.
Received: 1.
Received: 2.
Received: 3.
Received: 4.

nil channel

nil channel への送信、受信はどちらも panic になる。また、 nil channel を close することも panic を引き起こす。

var nilCh chan int
<-nilCh // panic
var nilCh chan interface
nilCh <- struct{}{} // panic
var nilCh chan interface
close(nilCh) // panic

異なる型の channel をどう組み合わせるか

channel を正しいコンテキストにいれるために始めるべきことは、 channel の所有権を割り振ること

ここで言う「所有権」とは、 channel に対して以下の操作を行う goroutine として定義する:

  • 初期化する
  • 送信する
  • 閉じる

  • ガベージコレクションがない言語でのメモリのように、プログラムで論理的に判断していくためには、どの goroutine が channel を所有しているかはっきりさせることが重要
  • 単方向 channel は、以下のように goroutine の channel に対する所有権を区別するのに有用:
    • channel を所有する goroutine
    • channel を利用する goroutine
  • channel を所有する goroutine と、利用する goroutine で責任を分ける
channel の所有者

channel を所有する goroutine は以下の手順を踏むべき:

  • channel を初期化する
  • 送信を行うか、他の goroutine に所有権を渡す
  • channel を閉じる
  • 上記の 3 つの操作をカプセル化して受信 channel を経由して公開する

これらの責任を channel の所有者に与えることで、以下のように安全な操作が行える:

  • channel を初期化する gorotine なので nil channel に送信してデッドロックする危険がなくなる
  • channel を初期化する gorotine なので nil channel を閉じることによって起こる panic の危険がなくなる
  • channel を閉じるタイミングを決める goroutine なので、 channel を 2 回以上閉じてしまうことによって起こる panic の危険がなくなる
  • コンパイル時に型チェックを行って、 channel に対する不適切な書き込みを防ぐ
channel の消費者

受信側は以下のことに気をつける:

  • channel がいつ閉じられたかを把握する
    • 受信の演算子(:= など)を使って、 2 つ目の値を確認する
  • ブロックする操作は責任を持って扱う
    • 受信時の操作はブロックしうる

select

select は

  • 受信の場合は channel に送信があったか閉じられたか
  • 送信の場合はキャパシティいっぱいになっていないものがないか

を確認する。 どの channel も準備できていない場合は、 select 全体がブロックする。 channel が 1 つでも準備完了したらその操作が行われ、対応する文が実行される。

以下の例は、5 秒間ののちに channel を閉じている。 select は、 c から受信(close されているため) できるようになるまでブロックする。

start := time.Now()
c := make(chan interface{})

go func() {
	time.Sleep(5 * time.Second)
	close(c)
}()

fmt.Println("blocking on read...")
select {
case <-c:
	fmt.Printf("unblocked %v latter\n", time.Since(start))
}
blocking on read...
unblocked 5.004482617s latter

以下の例は、同時に 2 つの channel が select の条件に入ったときの例である。 select は、ランダムにどちらかの条件を取るようになっている。

c1 := make(chan interface{})
close(c1)
c2 := make(chan interface{})
close(c2)

var c1Count, c2Count int
for i := 1000; i >= 0; i-- {
	select {
	case <-c1:
		c1Count++
	case <-c2:
		c2Count++
	}
}
fmt.Printf("c1Count: %d\nc2Count: %d\n", c1Count, c2Count)
c1Count: 479
c2Count: 522

1 つも受信ができなかった場合はタイムアウトさせるやり方がある。 time.After は与えた期間経過後の現在時刻を送信する channel を返す。

c := make(<-chan int)
select {
case <-c:
case <-time.After(5 * time.Second):
	fmt.Println("timed out")
}
timed out

以下の例は、 for と select を組み合わせて、 goroutine の結果の報告を待つ前に別の処理を行う。 この場合、ループは何かの仕事をしていて、 select によりときおりそれを止めるかどうか確認している。

done := make(chan interface{})
go func() {
	time.Sleep(5 * time.Second)
	close(done)
}()

workCounter := 0
loop:
for {
	select {
	case <-done: // done が close した場合に break する
		break loop
	default: // done が close していない場合は以下に続く
	}
	// simulate work
	workCounter++
	time.Sleep(1 * time.Second)
}
fmt.Printf("archived %v cycles of work before signalled to stop\n", workCounter)
archived 5 cycles of work before signalled to stop

空 select 文

以下は、 case 節がなく、永久にブロックする select 文である。

select {}

GOMAXPROCS

  • 通常、 GOMAXPROCS はホストマシンの論理CPUの数に設定されていてそれを調整する必要はない
  • しかし、競合状態を発生させやすくするために GOMAXPROCS の値を大きくしてテストを実行することにより、問題の再現がしやすくなる場合がある
  • GOMAXPROCS を調整することで性能を向上させるということは、コードのコミット、マシン、Go のバージョンが変わるごとに調整が必要ということになる
    • それは抽象化と長期的な性能の安定性を犠牲にする場合がある

4章 Go での並行処理パターン

拘束

並行なコードを扱う時に、操作を安全にするには

  • メモリを安全に使うための同期のプリミティブを使う(Mutex)
  • 通信による動機(channel)を使う

ことができるが、暗黙的に安全な方法がほかにもある:

  • イミュータブルなデータ
    • 各並行プロセスは、同じデータに対して操作できるが、それを変更できないようにする
    • Go の場合は、メモリ内の値へのポインタの代わりに値のコピーを使うようにすることで実現できる
  • 拘束によって保護されたデータ
    • アドホックとレキシカルの 2 種類がある
      • アドホック拘束は、拘束を規約によって達成する場合を指す

アドホック拘束の例

// data はメイン goroutine からもアクセスできるが、規約により loopData からのみアクセスすることにしている
data := make([]int, 4)

loopData := func(handleData chan<- int) {
	defer close(handleData)
	for i := range data {
		handleData <- data[i]
	}
}

handleData := make(chan int)
go loopData(handleData)

for num := range handleData {
	fmt.Println(num)
}
0
0
0
0

レキシカル拘束の例

レキシカルスコープを使って、channel を必要とする並行プロセスに channel への読み書きのうち必要な権限のみを渡す。

printData := func(wg *sync.WaitGroup, data []byte) {
	defer wg.Done()

	var buff bytes.Buffer
	for _, b := range data {
		fmt.Fprintf(&buff, "%c", b)
	}
	fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3])
go printData(&wg, data[3:])

wg.Wait()

このようにすることによって、パフォーマンスの向上と可読性の向上がねらえる。 クリティカルセクションを持たなければ、同期のコストをかける必要がなくなる。

for-select ループ

for select ループを使うのは以下のような場合である:

channel から繰り返し変数を送出する

for _, s := range []string{"a", "b", "c"} {
	select {
	case <- done:
		return
	case stringCh <- s:
	}
}

停止シグナルを持つ無限ループ

done := make(chan struct{})
 for {
 	select {
 	case <-done:
 		return
 	default:
 	}
 	// done が閉じられていなければ処理をする
 }

あるいは、

done := make(chan struct{})
for {
	select {
	case <-done:
		return
	default:
		// done が閉じられていなければ処理をする
	}
}

goroutine のリークを避ける

goroutine はランタイムによってガベージコレクションされない。 子の goroutine を生成した親の goroutine で終了にも責任を持つのが望ましい。

goroutine の親から子に、 channel を使ってキャンセルのシグナルを送る例は以下。

newRandChan := func(done <-chan interface{}) <- chan int {
	randChan := make(chan int)
	go func() {
		defer fmt.Println("rand chan closure exited")
		defer close(randChan)

		for {
			select {
			case randChan <- rand.Int(): // 送信可能時に rand.Int() を送信する
			case <- done: // done が受信されたら終了する
				return
			}

		}
	}()

	return randChan
}

done := make(chan interface{})
randChan := newRandChan(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
	fmt.Printf("%d: %d\n", i, <-randChan) // randChan から受信
}
close(done) // goroutine を終了

// なにかの処理を継続する
time.Sleep(1*time.Second)

or channel

// or は引数に取った channel のどれか1つが close したら他の channel もクローズする
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	orDone := make(chan interface{})
	go func() {
		defer close(orDone)

		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-or(append(channels[3:], orDone)...):
			}
		}
	}()
	return orDone
}

sig := func(after time.Duration) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		time.Sleep(after)
	}()
	return c
}

start := time.Now()
<-or(
	sig(2*time.Hour),
	sig(5*time.Minute),
	sig(1*time.Second),
	sig(1*time.Hour),
	sig(1*time.Minute),
)
fmt.Printf("done after %v\n", time.Since(start))

エラーハンドリング

  • 複数の goroutine の結果とエラーを対にして返すようにする
type Result struct {
	Error error
	Response *http.Response
}

checkStatus := func(done <-chan interface{}, urls ...string) <- chan Result {
	results := make(chan Result)
	go func() {
		defer close(results)

		for _, url := range urls {
			var result Result
			resp, err := http.Get(url)
			result = Result{Error: err, Response: resp}
			// done がクローズされるまで results に結果を送信し続ける
			select {
			case <-done:
				return
			case results <- result:
			}
		}
	}()
	return results
}

done := make(chan interface{})
defer close(done)

urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
	if result.Error != nil {
		fmt.Printf("error: %v\n", result.Error)
		continue
	}
	fmt.Printf("Response: %v\n", result.Response.Status)
}

https://tour.golang.org/concurrency/4

The loop for i := range c receives values from the channel repeatedly until it is closed.

channel の range による読み込みは、 channel が closes するまでブロックする。


パイプライン

パイプラインのステージの性質は次のようになる:

  • 受け取るものと返すものが同じ型

  • ステージは引き回せるように具体化されていなければいけない。(Go の関数は具体化されているのでこの条件に適合する)

    • 具体化とは、言語が開発はに概念を公開して直接扱えるようにするという意味。例えば、関数シグネチャの型を持つ変数を定義し、それをプログラムの中で変数として扱える場合、具体化できているといえる
  • バッチ処理とストリーム処理

    • バッチ処理: データの塊をいっぺんに処理する
    • ストリーム処理: ステージが要素を1つずつ受け取って、1つずつ渡していく

パイプライン構築のためのベストプラクティス


  1. これは私が解釈したものを書き直したものなので、正確な図は原著を参照してください。 ↩︎

  2. 書いてて思ったけど、順序性が必要な goroutine の実行は sync.Cond のほうがわかりやすく書けそうな気がする…。 ↩︎

Return to top