Effective Go-5

Concurrency

Share by communicating

Do not communication by sharing memory; instead,share memory by communicating

通过通讯来共享内存,而不是通过共享内存来通讯

Goroutines

go的协程并发在同一地址空间, 更轻量级。

package goroutines_test

import (
    "fmt"
    "sync"
    "testing"
    "time"
)

var wg sync.WaitGroup

func Announce(message string, delay time.Duration) {
    wg.Add(1)
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
        defer wg.Done()
    }()
}

func TestAnnouce(t *testing.T) {

    Announce("helloworld", time.Second)
    Announce("helloweida", time.Second)
    Announce("hellosanding", time.Second)
    Announce("hellogarlic", time.Second)
    Announce("helloding", time.Second)

    wg.Wait()
}

Channels

可以通过make创建通道,可以通过第二个参数设置是否带有缓冲

ch:=make(chan int, 0)
ch:=make(chan int, 100)

可以通过chan完成协程等待, 如果无缓冲通道, 接收者会立即引发等待, 有缓冲通道,数据填满缓冲区时开始等待。

package channel_test

import (
    "fmt"
    "sort"
    "testing"
)

func TestChannel(t *testing.T) {
    list := []int{11, 2, 3, 44, 0}
    c := make(chan int)
    go func() {
        sort.Ints(list)
        c <- 1
        fmt.Println(list)
    }()
    <-c
}

带缓冲的通道可以被用作信号量。

  • 版本1: 为每一个请求分配一个协程

可以看到sem缓冲区为10, 控制了同时运行的协程数量,当并行大于10请求到来时,协程还是会创建仅是进入等待状态,这将消耗大量资源。

package channel_test

import (
    "fmt"
    "sync"
    "testing"
)

var MaxOutstanding int = 10

type Request int

var sem = make(chan int, MaxOutstanding)

func process(r *Request) {
    fmt.Println(r)
}

func handle(r *Request) {
    sem <- 1
    process(r)
    <-sem
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)
    }
}

func TestServe(t *testing.T) {
    queue := make(chan *Request, 100)
    var r Request
    r = 0
    for i := 0; i < 100; i++ {
        queue <- &r
    }
    Serve(queue)
}
  • 版本2:在额定阈值(信号量的值)内,为每一个请求创建一个协程

下面的写法编译器会有一个警告:

loop variable req captured by func literalloopclosure

func Serve2(queue chan *Request) {
    for req := range queue {
        sem <- 1
        //reqnew := req
        //req := req
        go func() {
            process(req)
            <-sem
        }()
    }
}

func TestServe2(t *testing.T) {
    queue := make(chan *Request, 100)
    var r Request
    r = 1
    for i := 0; i < 100; i++ {
        queue <- &r
    }
    Serve2(queue)
}

 

  • 版本3:修复第二个版本将 req 赋值为局部变量
func Serve2(queue chan *Request) {
    for req := range queue {
        sem <- 1
        //reqnew := req
        req := req
        go func() {
            process(req)
            <-sem
        }()
    }
}

func TestServe2(t *testing.T) {
    queue := make(chan *Request, 100)
    var r Request
    r = 1
    for i := 0; i < 100; i++ {
        queue <- &r
    }
    Serve2(queue)
}

 

  • 版本4:修复第二个版本将 req 做为参数传入实现参数复制
func Serve3(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}
func TestServe3(t *testing.T) {
    queue := make(chan *Request, 100)
    var r Request
    r = 1
    for i := 0; i < 100; i++ {
        queue <- &r
    }
    Serve3(queue)
}
  • 版本5:启动固定的协程,接收通道中数据, 当收到退出信号后退出。
func handle2(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve4(clientRequests chan *Request, quit chan bool) {
    for i := 0; i < MaxOutstanding; i++ {
        go handle2(clientRequests)
    }
    <-quit
}

func TestServe4(t *testing.T) {
    queue := make(chan *Request, 100)
    quit := make(chan bool)

    var wg sync.WaitGroup

    go func(queue chan *Request) {
        wg.Add(1)
        var r Request
        r = 1
        for i := 0; i < 11; i++ {
            queue <- &r
        }
        defer wg.Done()
    }(queue)

    go func(quit chan bool) {
        wg.Wait()
        quit <- true
    }(quit)

    Serve4(queue, quit)
}
Channels of channels

定义一个请求结构体并定义回复客户端使用的应答channel

type Request2 struct {
    args       []int
    f          func([]int) int
    resultChan chan int
}

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

func handle3(queue chan *Request2) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

func TestServe5(t *testing.T) {
    clientRequests := make(chan *Request2)
    var request *Request2

    go func(request *Request2) {
        request = &Request2{[]int{3, 4, 5}, sum, make(chan int)}
        clientRequests <- request
        fmt.Printf("answer :%d\n", <-request.resultChan)
    }(request)

    go func(clientRequests chan *Request2) {
        handle3(clientRequests)
    }(clientRequests)

    select {}
}
Parallelization 

parallization & concurrency 并发更多依赖于设计因为本要处理很多事情, 而并行更依赖已系统处理能力,大多情况下他做的事情很单一。

package para_test

import "testing"

type Vector []float64

func (v Vector) Op(s float64) (r float64) {
    r += 0.1
    return r
}

func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1
}

const numCPU = 4

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }

    for i := 0; i < numCPU; i++ {
        <-c
    }

}

func TestPara(t *testing.T) {
    var v Vector
    v = Vector{0.1, 1.1, 2.2, 3.3, 4.4}
    t.Log(len(v))
    v.DoAll(v)
    t.Log(v)
}

另外可以通过 runtime.NumCPU 获取CPU信息

缓存泄露

client处理足够快的话, 而server较慢情况下,会一直创建buffer,如果server发送到freelist请求缓冲通道数据已满,导致协程处于hanging状态,这时垃圾回收器对buffer回收也是无能为力。(这块原文是说由于缓冲满了导致垃圾回收造成泄露,理解是由于hanging无法回收造成泄露)

import (
    "fmt"
    "testing"
)

type Buffer []int

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func load(b *Buffer) {
    fmt.Println(b, "load b ...")
}

func client() {
    for {
        var b *Buffer
        select {
        case b = <-freeList:
            fmt.Println(" Got one; nothing more to do.")
        default:
            fmt.Println(" None free, so allocate a new one")
            b = new(Buffer)
        }
        load(b)
        serverChan <- b
    }
}

func process(b *Buffer) {
    fmt.Println(b, "process b ...")
}

func server() {
    for {
        b := <-serverChan
        process(b)
        select {
        case freeList <- b:
            fmt.Println(" Buffer on free list; nothing more to do.")
        default:
            fmt.Println(" Free list full, just carry on.")

        }
    }
}

func TestFreelist(t *testing.T) {
    go server()
    client()
}

 

Errors

go支持多返回值,非常方便的可以返回相关错误信息,而不需要单独设置变量来区分具体错误。

error

error类型是一个内置的接口类型

type error interface {
    Error() string
}

开发者可以实现更丰富的错误信息

// PathError records an error and the operation and
// file path that caused it.
type PathError struct {
    Op string    // "open", "unlink", etc.
    Path string  // The associated file.
    Err error    // Returned by the system call.
}

func (e *PathError) Error() string {
    return e.Op + " " + e.Path + ": " + e.Err.Error()
}
open /etc/passwx: no such file or directory

当文件未找到时可以显示出更加详细的信息。比如在错误信息中输出包名及详细的错误信息。

另外可以通过详细信息判断是否可以重试, 当然如果是我设计的话,不会这么处理,一般清理文件由单独应用处理。

for try := 0; try < 2; try++ {
    file, err = os.Create(filename)
    if err == nil {
        return
    }
    if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
        deleteTempFiles()  // Recover some space.
        continue
    }
    return
}

另外使用到类型断言 type assertion 如果失败ok将返回false, e为nil, 如果ok返回true那么比较一下错误码

panic

应用无法继续运行的时候使用panic

// A toy implementation of cube root using Newton's method.
func CubeRoot(x float64) float64 {
    z := x/3   // Arbitrary initial value
    for i := 0; i < 1e6; i++ {
        prevz := z
        z -= (z*z*z-x) / (3*z*z)
        if veryClose(z, prevz) {
            return z
        }
    }
    // A million iterations has not converged; something is wrong.
    panic(fmt.Sprintf("CubeRoot(%g) did not converge", x))
}

在程序无法得到收敛的时候选择调用panic, 当然如果是库函数还是避免panic。一般panic场景是无法继续工作。如果必须这么做的话可以通过另外一个程序根据需要重新唤起应用。

另外执行panic退出时,执行defer函数,os.Exit 退出则不会。

package panic_test

import (
    _ "errors"
    "fmt"
    "os"
    "testing"
)


func TestPanicVsExit(t *testing.T) {
    defer func(){
       fmt.Println("Finally")
    }()

    fmt.Println("Start")
    //panic(errors.New("Something wrong!"))
    os.Exit(-1)
  
}

//输出

=== RUN TestPanicVsExit
Start
FAIL loop/panic_test 0.139s

panic输出还是打印堆栈信息

package panic_test

import (
    "errors"
    "fmt"
    _ "os"
    "testing"
)

func TestPanicVsExit(t *testing.T) {
    defer func() {
        fmt.Println("Finally")
    }()

    fmt.Println("Start")
    panic(errors.New("Something wrong!"))
    //os.Exit(-1)

}


=== RUN   TestPanicVsExit
Start
Finally
--- FAIL: TestPanicVsExit (0.00s)
panic: Something wrong! [recovered]
    panic: Something wrong!

goroutine 19 [running]:
testing.tRunner.func1.2(0xd45be0, 0xc000088520)
    C:/Program Files/Go/src/testing/testing.go:1144 +0x345
testing.tRunner.func1(0xc000084480)
    C:/Program Files/Go/src/testing/testing.go:1147 +0x4b6
panic(0xd45be0, 0xc000088520)
    C:/Program Files/Go/src/runtime/panic.go:965 +0x1c7
loop/panic_test_test.TestPanicVsExit(0xc000084480)
    f:/GO/go_test/panic_test/panic_test.go:16 +0xe5
testing.tRunner(0xc000084480, 0xd76068)
    C:/Program Files/Go/src/testing/testing.go:1194 +0xef
created by testing.(*T).Run
    C:/Program Files/Go/src/testing/testing.go:1239 +0x2b3
FAIL    loop/panic_test 0.701s

 

recover
  • recover需要在defer中调用,
  • 调用recover后将不再打印相关堆栈信息
  • recover需要和panic在一个协程中才起作用
package recover_test

import (
    "fmt"
    "runtime/debug"
    "sync"
    "testing"
)

var wg sync.WaitGroup

func server(workChan <-chan *Work) {
    for work := range workChan {
        //fmt.Printf("go work %+v\n", work)
        go safelyDo(work)
    }

}

func servernoblock(workChan <-chan *Work) {
    var work *Work
    for {
        select {
        case work = <-workChan:
            go safelyDo(work)
        default:
            fmt.Println("quit")
            return
        }
    }
}

type Work struct {
    id   int
    stat int
}

func do(work *Work) {

    if work.stat == 0 {
        fmt.Printf("pannic work... %+v\n", work)
        s := fmt.Sprintf("panic work %+v ", work)
        panic(s)
    } else {
        fmt.Printf("normal work... %+v\n", work)
    }
}

func safelyDo(work *Work) {
    defer func() {  
        if err := recover(); err != nil {
            fmt.Println("work failed: ", err)
            debug.PrintStack()
        }
        wg.Done()
    }()
    wg.Add(1)
    do(work)
}

func TestRecover(t *testing.T) {
    var (
        work1 Work
        work2 Work
        work3 Work
    )
    work1.id = 1
    work1.stat = 0
    work2.id = 2
    work2.stat = 1
    work3.id = 3
    work3.stat = 1

    workchan := make(chan *Work, 100)

    workchan <- &work1
    workchan <- &work2
    workchan <- &work3

    servernoblock(workchan)
    wg.Wait()
    close(workchan)

}

由于recover,panic的堆栈信息未打印, 可以通过debug.PrintStack进行打印。 通过printstack重新调整了一下调用栈的打印。

package recover_test

import (
    "bufio"
    "fmt"
    "runtime"
    "strings"
    "sync"
    "testing"
)

var wg sync.WaitGroup

func server(workChan <-chan *Work) {
    for work := range workChan {
        //fmt.Printf("go work %+v\n", work)
        go safelyDo(work)
    }

}

func servernoblock(workChan <-chan *Work) {
    var work *Work
    for {
        select {
        case work = <-workChan:
            go safelyDo(work)
        default:
            fmt.Println("quit")
            return
        }
    }
}

type Work struct {
    id   int
    stat int
}

func do(work *Work) {

    if work.stat == 0 {
        fmt.Printf("pannic work... %+v\n", work)
        s := fmt.Sprintf("panic work %+v ", work)
        panic(s)
    } else {
        fmt.Printf("normal work... %+v\n", work)
    }
}

func safelyDo(work *Work) {
    defer func() {
        if err := recover(); err != nil {
            fmt.Println("work failed: ", err)
            //debug.PrintStack()
            buf := make([]byte, 1<<16)
            stackSize := runtime.Stack(buf, true)
            printoriginal(string(buf[0:stackSize]))
        }
        wg.Done()
    }()
    wg.Add(1)
    do(work)
}

func printoriginal(s string) {
    scanner := bufio.NewScanner(strings.NewReader(s))
    flag := true
    for scanner.Scan() {
        s := strings.ToLower(scanner.Text())
        if strings.HasPrefix(s, "goroutine") {
            fmt.Println(scanner.Text())
            flag = false
        }
        if flag {
            fmt.Println(scanner.Text())
        }
        if flag == false && strings.HasPrefix(s, "panic") {
            flag = true
            scanner.Scan()
        }

    }
}

func TestRecover(t *testing.T) {
    var (
        work1 Work
        work2 Work
        work3 Work
    )
    work1.id = 1
    work1.stat = 0
    work2.id = 2
    work2.stat = 1
    work3.id = 3
    work3.stat = 1

    workchan := make(chan *Work, 100)

    workchan <- &work1
    workchan <- &work2
    workchan <- &work3

    servernoblock(workchan)
    wg.Wait()
    close(workchan)

}

 

A web server

示例是通过调用google服务QR码的功能。

package webserver_test

import (
    "flag"
    "html/template"
    "log"
    "net/http"
    "testing"
)

var addr = flag.String("addr", ":1718", "http service address")

var templ = template.Must(template.New("qr").Parse(templateStr))

func TestWebserver(t *testing.T) {
    flag.Parse()
    http.Handle("/", http.HandlerFunc(QR))
    err := http.ListenAndServe(*addr, nil)
    if err != nil {
        log.Fatal("ListenAndServe:", err)
    }
}

func QR(w http.ResponseWriter, req *http.Request) {
    templ.Execute(w, req.FormValue("s"))
}

const templateStr = `
<html>
<head>
<title>QR Link Generator</title>
</head>
<body>
{{if .}}
<img src="http://chart.apis.google.com/chart?chs=300x300&cht=qr&choe=UTF-8&chl={{.}}" />
<br>
{{.}}
<br>
<br>
{{end}}
<form action="/" name=f method="GET">
    <input maxLength=1024 size=70 name=s value="" title="Text to QR Encode">
    <input type=submit value="Show QR" name=qr>
</form>
</body>
</html>
`

通过addr 设置服务监听端口, QR只是接收包含表单数据的请求,并以名为s的表单值对数据执行模板。templateStr 中 {{if .}} 到 {{end}} 中间部分代码, {{.}}非空时执行

 

参考及引用

Photo by Bruno Henrique from Pexels

https://golang.org/doc/effective_go

 

Comments are closed.