Concurrency
Do not communication by sharing memory; instead,share memory by communicating
通过通讯来共享内存,而不是通过共享内存来通讯
Goroutines
go的协程并发在同一地址空间, 更轻量级。
package goroutines_test import ( "fmt" "sync" "testing" "time" ) var wg sync.WaitGroup func Announce(message string, delay time.Duration) { wg.Add(1) go func() { time.Sleep(delay) fmt.Println(message) defer wg.Done() }() } func TestAnnouce(t *testing.T) { Announce("helloworld", time.Second) Announce("helloweida", time.Second) Announce("hellosanding", time.Second) Announce("hellogarlic", time.Second) Announce("helloding", time.Second) wg.Wait() }
Channels
可以通过make创建通道,可以通过第二个参数设置是否带有缓冲
ch:=make(chan int, 0) ch:=make(chan int, 100)
可以通过chan完成协程等待, 如果无缓冲通道, 接收者会立即引发等待, 有缓冲通道,数据填满缓冲区时开始等待。
package channel_test import ( "fmt" "sort" "testing" ) func TestChannel(t *testing.T) { list := []int{11, 2, 3, 44, 0} c := make(chan int) go func() { sort.Ints(list) c <- 1 fmt.Println(list) }() <-c }
带缓冲的通道可以被用作信号量。
- 版本1: 为每一个请求分配一个协程
可以看到sem缓冲区为10, 控制了同时运行的协程数量,当并行大于10请求到来时,协程还是会创建仅是进入等待状态,这将消耗大量资源。
package channel_test import ( "fmt" "sync" "testing" ) var MaxOutstanding int = 10 type Request int var sem = make(chan int, MaxOutstanding) func process(r *Request) { fmt.Println(r) } func handle(r *Request) { sem <- 1 process(r) <-sem } func Serve(queue chan *Request) { for { req := <-queue go handle(req) } } func TestServe(t *testing.T) { queue := make(chan *Request, 100) var r Request r = 0 for i := 0; i < 100; i++ { queue <- &r } Serve(queue) }
- 版本2:在额定阈值(信号量的值)内,为每一个请求创建一个协程
下面的写法编译器会有一个警告:
loop variable req captured by func literalloopclosure
func Serve2(queue chan *Request) { for req := range queue { sem <- 1 //reqnew := req //req := req go func() { process(req) <-sem }() } } func TestServe2(t *testing.T) { queue := make(chan *Request, 100) var r Request r = 1 for i := 0; i < 100; i++ { queue <- &r } Serve2(queue) }
- 版本3:修复第二个版本将 req 赋值为局部变量
func Serve2(queue chan *Request) { for req := range queue { sem <- 1 //reqnew := req req := req go func() { process(req) <-sem }() } } func TestServe2(t *testing.T) { queue := make(chan *Request, 100) var r Request r = 1 for i := 0; i < 100; i++ { queue <- &r } Serve2(queue) }
- 版本4:修复第二个版本将 req 做为参数传入实现参数复制
func Serve3(queue chan *Request) { for req := range queue { sem <- 1 go func(req *Request) { process(req) <-sem }(req) } } func TestServe3(t *testing.T) { queue := make(chan *Request, 100) var r Request r = 1 for i := 0; i < 100; i++ { queue <- &r } Serve3(queue) }
- 版本5:启动固定的协程,接收通道中数据, 当收到退出信号后退出。
func handle2(queue chan *Request) { for r := range queue { process(r) } } func Serve4(clientRequests chan *Request, quit chan bool) { for i := 0; i < MaxOutstanding; i++ { go handle2(clientRequests) } <-quit } func TestServe4(t *testing.T) { queue := make(chan *Request, 100) quit := make(chan bool) var wg sync.WaitGroup go func(queue chan *Request) { wg.Add(1) var r Request r = 1 for i := 0; i < 11; i++ { queue <- &r } defer wg.Done() }(queue) go func(quit chan bool) { wg.Wait() quit <- true }(quit) Serve4(queue, quit) }
Channels of channels
定义一个请求结构体并定义回复客户端使用的应答channel
type Request2 struct { args []int f func([]int) int resultChan chan int } func sum(a []int) (s int) { for _, v := range a { s += v } return } func handle3(queue chan *Request2) { for req := range queue { req.resultChan <- req.f(req.args) } } func TestServe5(t *testing.T) { clientRequests := make(chan *Request2) var request *Request2 go func(request *Request2) { request = &Request2{[]int{3, 4, 5}, sum, make(chan int)} clientRequests <- request fmt.Printf("answer :%d\n", <-request.resultChan) }(request) go func(clientRequests chan *Request2) { handle3(clientRequests) }(clientRequests) select {} }
Parallelization
parallization & concurrency 并发更多依赖于设计因为本要处理很多事情, 而并行更依赖已系统处理能力,大多情况下他做的事情很单一。
package para_test import "testing" type Vector []float64 func (v Vector) Op(s float64) (r float64) { r += 0.1 return r } func (v Vector) DoSome(i, n int, u Vector, c chan int) { for ; i < n; i++ { v[i] += u.Op(v[i]) } c <- 1 } const numCPU = 4 func (v Vector) DoAll(u Vector) { c := make(chan int, numCPU) for i := 0; i < numCPU; i++ { go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c) } for i := 0; i < numCPU; i++ { <-c } } func TestPara(t *testing.T) { var v Vector v = Vector{0.1, 1.1, 2.2, 3.3, 4.4} t.Log(len(v)) v.DoAll(v) t.Log(v) }
另外可以通过 runtime.NumCPU 获取CPU信息
缓存泄露
client处理足够快的话, 而server较慢情况下,会一直创建buffer,如果server发送到freelist请求缓冲通道数据已满,导致协程处于hanging状态,这时垃圾回收器对buffer回收也是无能为力。(这块原文是说由于缓冲满了导致垃圾回收造成泄露,理解是由于hanging无法回收造成泄露)
import ( "fmt" "testing" ) type Buffer []int var freeList = make(chan *Buffer, 100) var serverChan = make(chan *Buffer) func load(b *Buffer) { fmt.Println(b, "load b ...") } func client() { for { var b *Buffer select { case b = <-freeList: fmt.Println(" Got one; nothing more to do.") default: fmt.Println(" None free, so allocate a new one") b = new(Buffer) } load(b) serverChan <- b } } func process(b *Buffer) { fmt.Println(b, "process b ...") } func server() { for { b := <-serverChan process(b) select { case freeList <- b: fmt.Println(" Buffer on free list; nothing more to do.") default: fmt.Println(" Free list full, just carry on.") } } } func TestFreelist(t *testing.T) { go server() client() }
Errors
go支持多返回值,非常方便的可以返回相关错误信息,而不需要单独设置变量来区分具体错误。
error
error类型是一个内置的接口类型
type error interface { Error() string }
开发者可以实现更丰富的错误信息
// PathError records an error and the operation and // file path that caused it. type PathError struct { Op string // "open", "unlink", etc. Path string // The associated file. Err error // Returned by the system call. } func (e *PathError) Error() string { return e.Op + " " + e.Path + ": " + e.Err.Error() }
open /etc/passwx: no such file or directory
当文件未找到时可以显示出更加详细的信息。比如在错误信息中输出包名及详细的错误信息。
另外可以通过详细信息判断是否可以重试, 当然如果是我设计的话,不会这么处理,一般清理文件由单独应用处理。
for try := 0; try < 2; try++ { file, err = os.Create(filename) if err == nil { return } if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC { deleteTempFiles() // Recover some space. continue } return }
另外使用到类型断言 type assertion 如果失败ok将返回false, e为nil, 如果ok返回true那么比较一下错误码
panic
应用无法继续运行的时候使用panic
// A toy implementation of cube root using Newton's method. func CubeRoot(x float64) float64 { z := x/3 // Arbitrary initial value for i := 0; i < 1e6; i++ { prevz := z z -= (z*z*z-x) / (3*z*z) if veryClose(z, prevz) { return z } } // A million iterations has not converged; something is wrong. panic(fmt.Sprintf("CubeRoot(%g) did not converge", x)) }
在程序无法得到收敛的时候选择调用panic, 当然如果是库函数还是避免panic。一般panic场景是无法继续工作。如果必须这么做的话可以通过另外一个程序根据需要重新唤起应用。
另外执行panic退出时,执行defer函数,os.Exit 退出则不会。
package panic_test import ( _ "errors" "fmt" "os" "testing" ) func TestPanicVsExit(t *testing.T) { defer func(){ fmt.Println("Finally") }() fmt.Println("Start") //panic(errors.New("Something wrong!")) os.Exit(-1) } //输出 === RUN TestPanicVsExit Start FAIL loop/panic_test 0.139s
panic输出还是打印堆栈信息
package panic_test import ( "errors" "fmt" _ "os" "testing" ) func TestPanicVsExit(t *testing.T) { defer func() { fmt.Println("Finally") }() fmt.Println("Start") panic(errors.New("Something wrong!")) //os.Exit(-1) } === RUN TestPanicVsExit Start Finally --- FAIL: TestPanicVsExit (0.00s) panic: Something wrong! [recovered] panic: Something wrong! goroutine 19 [running]: testing.tRunner.func1.2(0xd45be0, 0xc000088520) C:/Program Files/Go/src/testing/testing.go:1144 +0x345 testing.tRunner.func1(0xc000084480) C:/Program Files/Go/src/testing/testing.go:1147 +0x4b6 panic(0xd45be0, 0xc000088520) C:/Program Files/Go/src/runtime/panic.go:965 +0x1c7 loop/panic_test_test.TestPanicVsExit(0xc000084480) f:/GO/go_test/panic_test/panic_test.go:16 +0xe5 testing.tRunner(0xc000084480, 0xd76068) C:/Program Files/Go/src/testing/testing.go:1194 +0xef created by testing.(*T).Run C:/Program Files/Go/src/testing/testing.go:1239 +0x2b3 FAIL loop/panic_test 0.701s
recover
- recover需要在defer中调用,
- 调用recover后将不再打印相关堆栈信息
- recover需要和panic在一个协程中才起作用
package recover_test import ( "fmt" "runtime/debug" "sync" "testing" ) var wg sync.WaitGroup func server(workChan <-chan *Work) { for work := range workChan { //fmt.Printf("go work %+v\n", work) go safelyDo(work) } } func servernoblock(workChan <-chan *Work) { var work *Work for { select { case work = <-workChan: go safelyDo(work) default: fmt.Println("quit") return } } } type Work struct { id int stat int } func do(work *Work) { if work.stat == 0 { fmt.Printf("pannic work... %+v\n", work) s := fmt.Sprintf("panic work %+v ", work) panic(s) } else { fmt.Printf("normal work... %+v\n", work) } } func safelyDo(work *Work) { defer func() { if err := recover(); err != nil { fmt.Println("work failed: ", err) debug.PrintStack() } wg.Done() }() wg.Add(1) do(work) } func TestRecover(t *testing.T) { var ( work1 Work work2 Work work3 Work ) work1.id = 1 work1.stat = 0 work2.id = 2 work2.stat = 1 work3.id = 3 work3.stat = 1 workchan := make(chan *Work, 100) workchan <- &work1 workchan <- &work2 workchan <- &work3 servernoblock(workchan) wg.Wait() close(workchan) }
由于recover,panic的堆栈信息未打印, 可以通过debug.PrintStack进行打印。 通过printstack重新调整了一下调用栈的打印。
package recover_test import ( "bufio" "fmt" "runtime" "strings" "sync" "testing" ) var wg sync.WaitGroup func server(workChan <-chan *Work) { for work := range workChan { //fmt.Printf("go work %+v\n", work) go safelyDo(work) } } func servernoblock(workChan <-chan *Work) { var work *Work for { select { case work = <-workChan: go safelyDo(work) default: fmt.Println("quit") return } } } type Work struct { id int stat int } func do(work *Work) { if work.stat == 0 { fmt.Printf("pannic work... %+v\n", work) s := fmt.Sprintf("panic work %+v ", work) panic(s) } else { fmt.Printf("normal work... %+v\n", work) } } func safelyDo(work *Work) { defer func() { if err := recover(); err != nil { fmt.Println("work failed: ", err) //debug.PrintStack() buf := make([]byte, 1<<16) stackSize := runtime.Stack(buf, true) printoriginal(string(buf[0:stackSize])) } wg.Done() }() wg.Add(1) do(work) } func printoriginal(s string) { scanner := bufio.NewScanner(strings.NewReader(s)) flag := true for scanner.Scan() { s := strings.ToLower(scanner.Text()) if strings.HasPrefix(s, "goroutine") { fmt.Println(scanner.Text()) flag = false } if flag { fmt.Println(scanner.Text()) } if flag == false && strings.HasPrefix(s, "panic") { flag = true scanner.Scan() } } } func TestRecover(t *testing.T) { var ( work1 Work work2 Work work3 Work ) work1.id = 1 work1.stat = 0 work2.id = 2 work2.stat = 1 work3.id = 3 work3.stat = 1 workchan := make(chan *Work, 100) workchan <- &work1 workchan <- &work2 workchan <- &work3 servernoblock(workchan) wg.Wait() close(workchan) }
A web server
示例是通过调用google服务QR码的功能。
package webserver_test import ( "flag" "html/template" "log" "net/http" "testing" ) var addr = flag.String("addr", ":1718", "http service address") var templ = template.Must(template.New("qr").Parse(templateStr)) func TestWebserver(t *testing.T) { flag.Parse() http.Handle("/", http.HandlerFunc(QR)) err := http.ListenAndServe(*addr, nil) if err != nil { log.Fatal("ListenAndServe:", err) } } func QR(w http.ResponseWriter, req *http.Request) { templ.Execute(w, req.FormValue("s")) } const templateStr = ` <html> <head> <title>QR Link Generator</title> </head> <body> {{if .}} <img src="http://chart.apis.google.com/chart?chs=300x300&cht=qr&choe=UTF-8&chl={{.}}" /> <br> {{.}} <br> <br> {{end}} <form action="/" name=f method="GET"> <input maxLength=1024 size=70 name=s value="" title="Text to QR Encode"> <input type=submit value="Show QR" name=qr> </form> </body> </html> `
通过addr 设置服务监听端口, QR只是接收包含表单数据的请求,并以名为s的表单值对数据执行模板。templateStr 中 {{if .}}
到 {{end}}
中间部分代码, {{.}}非空时执行
参考及引用
Photo by Bruno Henrique from Pexels
https://golang.org/doc/effective_go
Comments are closed.