歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Go 語言的分布式讀寫互斥

Go 語言的分布式讀寫互斥

日期:2017/3/1 9:29:33   编辑:Linux編程

Go語言默認的 sync.RWMutex 實現在多核環境中表現並不佳,因為所有的讀者在進行原子增量操作時,會搶占相同的內存地址。該文探討了一種 n-way RWMutex,也可以稱為“大讀者(big reader)”鎖,它可以為每個 CPU 內核分配獨立的 RWMutex。讀者僅需在其核心中處理讀鎖,而寫者則須依次處理所有鎖。

查找當前 CPU

讀者使用 CPUID 指令來決定使用何種鎖,該指令僅需返回當前活動 CPU 的 APICID,而不需要發出系統調用指令抑或改變運行時。這在 Intel 或 AMD 處理器上均是可以的;ARM 處理器則需要使用 CPU ID 寄存器 。 對於超過 256 個處理器的系統,必須使用 x2APIC, 另外除了 CPUID 還要用到帶有EAX=0xb 的 EDX 寄存器。程序啟動時,會構建(通過 CPU 親和力系統調用) APICID 到 CPU 索引的映射, 該映射在處理器的整個生命周期中靜態存在。由於 CPUID 指令的開銷可能相當昂貴,goroutine 將只在其運行的內核中定期地更新狀態結果。頻繁更新可以減少內核鎖阻塞,但同時也會導致花在加鎖過程中的 CPUID 指令時間增加。

陳舊的 CPU 信息。如果加上鎖運行 goroutine 的 CPU 信息可能會是過時的 (goroutine 會轉移到另一個核心)。在 reader 記住哪個是上鎖的前提下,這只會影響性能,而不會影響准確性,當然,這樣的轉移也是不太可能的,就像操作系統內核嘗試在同一個核心保持線程來改進緩存命中率一樣。

性能

這個模式的性能特征會被大量的參數所影響。特別是 CPUID 檢測頻率,readers 的數量,readers 和 writers 的比率,還有 readers 持有鎖的時間,這些因素都非常重要。當在這個時間有且僅有一個 writer 活躍的時候,這個 writer 持有鎖的時期不會影響 sync.RWMutex 和 DRWMutex 之間的性能差異。

實驗證明DRWMutex表現勝過多核系統,特別writer小於1%的時候,CPUID會在最多每10個鎖之間被調用(這種變化取決於鎖被持有的持續時間)。甚至在少核的情況下,DRWMutex也在普遍選擇通過sync.Mutex使用sync.RWMutex的應用程序的情況下表現好過sync.RWMutex.

下圖顯示核數量使用增加每10個的平均性能:

drwmutex -i 5000 -p 0.0001 -w 1 -r 100 -c 100

錯誤條表示第25和第75個百分位。注意每第10核的下降;這是因為10個核組成一個運行標准檢查系統的機器上的NUMA節點, 所以一旦增加一個NUMA節點,跨線程通信量變得更加寶貴。對於DRWMutex來說,由於對比sync.RWMutex更多的reader能夠並行工作,所以性能也隨之提升。

查看go-nuts tread進一步討論

cpu_amd64.s

#include "textflag.h"

// func cpu() uint64
TEXT 路cpu(SB),NOSPLIT,$0-8
MOVL $0x01, AX // version information
MOVL $0x00, BX // any leaf will do
MOVL $0x00, CX // any subleaf will do

// call CPUID
BYTE $0x0f
BYTE $0xa2

SHRQ $24, BX // logical cpu id is put in EBX[31-24]
MOVQ BX, ret+0(FP)
RET

main.go

package main

import (
"flag"
"fmt"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"sync"
"syscall"
"time"
"unsafe"
)

func cpu() uint64 // implemented in cpu_amd64.s

var cpus map[uint64]int

// determine mapping from APIC ID to CPU index by pinning the entire process to
// one core at the time, and seeing that its APIC ID is.
func init() {
cpus = make(map[uint64]int)

var aff uint64
syscall.Syscall(syscall.SYS_SCHED_GETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))

n := 0
start := time.Now()
var mask uint64 = 1
Outer:
for {
for (aff & mask) == 0 {
mask <<= 1
if mask == 0 || mask > aff {
break Outer
}
}

ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(mask), uintptr(unsafe.Pointer(&mask)))
if ret != 0 {
panic(err.Error())
}

// what CPU do we have?
<-time.After(1 * time.Millisecond)
c := cpu()

if oldn, ok := cpus[c]; ok {
fmt.Println("cpu", n, "==", oldn, "-- both have CPUID", c)
}

cpus[c] = n
mask <<= 1
n++
}

fmt.Printf("%d/%d cpus found in %v: %v\n", len(cpus), runtime.NumCPU(), time.Now().Sub(start), cpus)

ret, _, err := syscall.Syscall(syscall.SYS_SCHED_SETAFFINITY, uintptr(0), unsafe.Sizeof(aff), uintptr(unsafe.Pointer(&aff)))
if ret != 0 {
panic(err.Error())
}
}

type RWMutex2 []sync.RWMutex

func (mx RWMutex2) Lock() {
for core := range mx {
mx[core].Lock()
}
}

func (mx RWMutex2) Unlock() {
for core := range mx {
mx[core].Unlock()
}
}

func main() {
cpuprofile := flag.Bool("cpuprofile", false, "enable CPU profiling")
locks := flag.Uint64("i", 10000, "Number of iterations to perform")
write := flag.Float64("p", 0.0001, "Probability of write locks")
wwork := flag.Int("w", 1, "Amount of work for each writer")
rwork := flag.Int("r", 100, "Amount of work for each reader")
readers := flag.Int("n", runtime.GOMAXPROCS(0), "Total number of readers")
checkcpu := flag.Uint64("c", 100, "Update CPU estimate every n iterations")
flag.Parse()

var o *os.File
if *cpuprofile {
o, _ := os.Create("rw.out")
pprof.StartCPUProfile(o)
}

readers_per_core := *readers / runtime.GOMAXPROCS(0)

var wg sync.WaitGroup

var mx1 sync.RWMutex

start1 := time.Now()
for n := 0; n < runtime.GOMAXPROCS(0); n++ {
for r := 0; r < readers_per_core; r++ {
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(rand.Int63()))
for n := uint64(0); n < *locks; n++ {
if r.Float64() < *write {
mx1.Lock()
x := 0
for i := 0; i < *wwork; i++ {
x++
}
_ = x
mx1.Unlock()
} else {
mx1.RLock()
x := 0
for i := 0; i < *rwork; i++ {
x++
}
_ = x
mx1.RUnlock()
}
}
}()
}
}
wg.Wait()
end1 := time.Now()

t1 := end1.Sub(start1)
fmt.Println("mx1", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t1.Seconds(), t1)

if *cpuprofile {
pprof.StopCPUProfile()
o.Close()

o, _ = os.Create("rw2.out")
pprof.StartCPUProfile(o)
}

mx2 := make(RWMutex2, len(cpus))

start2 := time.Now()
for n := 0; n < runtime.GOMAXPROCS(0); n++ {
for r := 0; r < readers_per_core; r++ {
wg.Add(1)
go func() {
defer wg.Done()
c := cpus[cpu()]
r := rand.New(rand.NewSource(rand.Int63()))
for n := uint64(0); n < *locks; n++ {
if *checkcpu != 0 && n%*checkcpu == 0 {
c = cpus[cpu()]
}

if r.Float64() < *write {
mx2.Lock()
x := 0
for i := 0; i < *wwork; i++ {
x++
}
_ = x
mx2.Unlock()
} else {
mx2[c].RLock()
x := 0
for i := 0; i < *rwork; i++ {
x++
}
_ = x
mx2[c].RUnlock()
}
}
}()
}
}
wg.Wait()
end2 := time.Now()

pprof.StopCPUProfile()
o.Close()

t2 := end2.Sub(start2)
fmt.Println("mx2", runtime.GOMAXPROCS(0), *readers, *locks, *write, *wwork, *rwork, *checkcpu, t2.Seconds(), t2)
}

Ubuntu 14.04 上搭建 Golang 開發環境配置 http://www.linuxidc.com/Linux/2015-02/113977.htm

Linux系統入門學習-在Linux中安裝Go語言 http://www.linuxidc.com/Linux/2015-02/113159.htm

Ubuntu 安裝Go語言包 http://www.linuxidc.com/Linux/2013-05/85171.htm

《Go語言編程》高清完整版電子書 http://www.linuxidc.com/Linux/2013-05/84709.htm

Go語言並行之美 -- 超越 “Hello World” http://www.linuxidc.com/Linux/2013-05/83697.htm

我為什麼喜歡Go語言 http://www.linuxidc.com/Linux/2013-05/84060.htm

Go語言內存分配器的實現 http://www.linuxidc.com/Linux/2014-01/94766.htm

Copyright © Linux教程網 All Rights Reserved