Concurrency
Do not communication by sharing memory; instead,share memory by communicating
通过通讯来共享内存,而不是通过共享内存来通讯
Goroutines
go的协程并发在同一地址空间, 更轻量级。
package goroutines_test
import (
"fmt"
"sync"
"testing"
"time"
)
var wg sync.WaitGroup
func Announce(message string, delay time.Duration) {
wg.Add(1)
go func() {
time.Sleep(delay)
fmt.Println(message)
defer wg.Done()
}()
}
func TestAnnouce(t *testing.T) {
Announce("helloworld", time.Second)
Announce("helloweida", time.Second)
Announce("hellosanding", time.Second)
Announce("hellogarlic", time.Second)
Announce("helloding", time.Second)
wg.Wait()
}
Channels
可以通过make创建通道,可以通过第二个参数设置是否带有缓冲
ch:=make(chan int, 0) ch:=make(chan int, 100)
可以通过chan完成协程等待, 如果无缓冲通道, 接收者会立即引发等待, 有缓冲通道,数据填满缓冲区时开始等待。
package channel_test
import (
"fmt"
"sort"
"testing"
)
func TestChannel(t *testing.T) {
list := []int{11, 2, 3, 44, 0}
c := make(chan int)
go func() {
sort.Ints(list)
c <- 1
fmt.Println(list)
}()
<-c
}
带缓冲的通道可以被用作信号量。
- 版本1: 为每一个请求分配一个协程
可以看到sem缓冲区为10, 控制了同时运行的协程数量,当并行大于10请求到来时,协程还是会创建仅是进入等待状态,这将消耗大量资源。
package channel_test
import (
"fmt"
"sync"
"testing"
)
var MaxOutstanding int = 10
type Request int
var sem = make(chan int, MaxOutstanding)
func process(r *Request) {
fmt.Println(r)
}
func handle(r *Request) {
sem <- 1
process(r)
<-sem
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req)
}
}
func TestServe(t *testing.T) {
queue := make(chan *Request, 100)
var r Request
r = 0
for i := 0; i < 100; i++ {
queue <- &r
}
Serve(queue)
}
- 版本2:在额定阈值(信号量的值)内,为每一个请求创建一个协程
下面的写法编译器会有一个警告:
loop variable req captured by func literalloopclosure
func Serve2(queue chan *Request) {
for req := range queue {
sem <- 1
//reqnew := req
//req := req
go func() {
process(req)
<-sem
}()
}
}
func TestServe2(t *testing.T) {
queue := make(chan *Request, 100)
var r Request
r = 1
for i := 0; i < 100; i++ {
queue <- &r
}
Serve2(queue)
}
- 版本3:修复第二个版本将 req 赋值为局部变量
func Serve2(queue chan *Request) {
for req := range queue {
sem <- 1
//reqnew := req
req := req
go func() {
process(req)
<-sem
}()
}
}
func TestServe2(t *testing.T) {
queue := make(chan *Request, 100)
var r Request
r = 1
for i := 0; i < 100; i++ {
queue <- &r
}
Serve2(queue)
}
- 版本4:修复第二个版本将 req 做为参数传入实现参数复制
func Serve3(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
func TestServe3(t *testing.T) {
queue := make(chan *Request, 100)
var r Request
r = 1
for i := 0; i < 100; i++ {
queue <- &r
}
Serve3(queue)
}
- 版本5:启动固定的协程,接收通道中数据, 当收到退出信号后退出。
func handle2(queue chan *Request) {
for r := range queue {
process(r)
}
}
func Serve4(clientRequests chan *Request, quit chan bool) {
for i := 0; i < MaxOutstanding; i++ {
go handle2(clientRequests)
}
<-quit
}
func TestServe4(t *testing.T) {
queue := make(chan *Request, 100)
quit := make(chan bool)
var wg sync.WaitGroup
go func(queue chan *Request) {
wg.Add(1)
var r Request
r = 1
for i := 0; i < 11; i++ {
queue <- &r
}
defer wg.Done()
}(queue)
go func(quit chan bool) {
wg.Wait()
quit <- true
}(quit)
Serve4(queue, quit)
}
Channels of channels
定义一个请求结构体并定义回复客户端使用的应答channel
type Request2 struct {
args []int
f func([]int) int
resultChan chan int
}
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
func handle3(queue chan *Request2) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
func TestServe5(t *testing.T) {
clientRequests := make(chan *Request2)
var request *Request2
go func(request *Request2) {
request = &Request2{[]int{3, 4, 5}, sum, make(chan int)}
clientRequests <- request
fmt.Printf("answer :%d\n", <-request.resultChan)
}(request)
go func(clientRequests chan *Request2) {
handle3(clientRequests)
}(clientRequests)
select {}
}
Parallelization
parallization & concurrency 并发更多依赖于设计因为本要处理很多事情, 而并行更依赖已系统处理能力,大多情况下他做的事情很单一。
package para_test
import "testing"
type Vector []float64
func (v Vector) Op(s float64) (r float64) {
r += 0.1
return r
}
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1
}
const numCPU = 4
func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU)
for i := 0; i < numCPU; i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
for i := 0; i < numCPU; i++ {
<-c
}
}
func TestPara(t *testing.T) {
var v Vector
v = Vector{0.1, 1.1, 2.2, 3.3, 4.4}
t.Log(len(v))
v.DoAll(v)
t.Log(v)
}
另外可以通过 runtime.NumCPU 获取CPU信息
缓存泄露
client处理足够快的话, 而server较慢情况下,会一直创建buffer,如果server发送到freelist请求缓冲通道数据已满,导致协程处于hanging状态,这时垃圾回收器对buffer回收也是无能为力。(这块原文是说由于缓冲满了导致垃圾回收造成泄露,理解是由于hanging无法回收造成泄露)
import (
"fmt"
"testing"
)
type Buffer []int
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)
func load(b *Buffer) {
fmt.Println(b, "load b ...")
}
func client() {
for {
var b *Buffer
select {
case b = <-freeList:
fmt.Println(" Got one; nothing more to do.")
default:
fmt.Println(" None free, so allocate a new one")
b = new(Buffer)
}
load(b)
serverChan <- b
}
}
func process(b *Buffer) {
fmt.Println(b, "process b ...")
}
func server() {
for {
b := <-serverChan
process(b)
select {
case freeList <- b:
fmt.Println(" Buffer on free list; nothing more to do.")
default:
fmt.Println(" Free list full, just carry on.")
}
}
}
func TestFreelist(t *testing.T) {
go server()
client()
}
Errors
go支持多返回值,非常方便的可以返回相关错误信息,而不需要单独设置变量来区分具体错误。
error
error类型是一个内置的接口类型
type error interface {
Error() string
}
开发者可以实现更丰富的错误信息
// PathError records an error and the operation and
// file path that caused it.
type PathError struct {
Op string // "open", "unlink", etc.
Path string // The associated file.
Err error // Returned by the system call.
}
func (e *PathError) Error() string {
return e.Op + " " + e.Path + ": " + e.Err.Error()
}
open /etc/passwx: no such file or directory
当文件未找到时可以显示出更加详细的信息。比如在错误信息中输出包名及详细的错误信息。
另外可以通过详细信息判断是否可以重试, 当然如果是我设计的话,不会这么处理,一般清理文件由单独应用处理。
for try := 0; try < 2; try++ {
file, err = os.Create(filename)
if err == nil {
return
}
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
deleteTempFiles() // Recover some space.
continue
}
return
}
另外使用到类型断言 type assertion 如果失败ok将返回false, e为nil, 如果ok返回true那么比较一下错误码
panic
应用无法继续运行的时候使用panic
// A toy implementation of cube root using Newton's method.
func CubeRoot(x float64) float64 {
z := x/3 // Arbitrary initial value
for i := 0; i < 1e6; i++ {
prevz := z
z -= (z*z*z-x) / (3*z*z)
if veryClose(z, prevz) {
return z
}
}
// A million iterations has not converged; something is wrong.
panic(fmt.Sprintf("CubeRoot(%g) did not converge", x))
}
在程序无法得到收敛的时候选择调用panic, 当然如果是库函数还是避免panic。一般panic场景是无法继续工作。如果必须这么做的话可以通过另外一个程序根据需要重新唤起应用。
另外执行panic退出时,执行defer函数,os.Exit 退出则不会。
package panic_test
import (
_ "errors"
"fmt"
"os"
"testing"
)
func TestPanicVsExit(t *testing.T) {
defer func(){
fmt.Println("Finally")
}()
fmt.Println("Start")
//panic(errors.New("Something wrong!"))
os.Exit(-1)
}
//输出
=== RUN TestPanicVsExit
Start
FAIL loop/panic_test 0.139s
panic输出还是打印堆栈信息
package panic_test
import (
"errors"
"fmt"
_ "os"
"testing"
)
func TestPanicVsExit(t *testing.T) {
defer func() {
fmt.Println("Finally")
}()
fmt.Println("Start")
panic(errors.New("Something wrong!"))
//os.Exit(-1)
}
=== RUN TestPanicVsExit
Start
Finally
--- FAIL: TestPanicVsExit (0.00s)
panic: Something wrong! [recovered]
panic: Something wrong!
goroutine 19 [running]:
testing.tRunner.func1.2(0xd45be0, 0xc000088520)
C:/Program Files/Go/src/testing/testing.go:1144 +0x345
testing.tRunner.func1(0xc000084480)
C:/Program Files/Go/src/testing/testing.go:1147 +0x4b6
panic(0xd45be0, 0xc000088520)
C:/Program Files/Go/src/runtime/panic.go:965 +0x1c7
loop/panic_test_test.TestPanicVsExit(0xc000084480)
f:/GO/go_test/panic_test/panic_test.go:16 +0xe5
testing.tRunner(0xc000084480, 0xd76068)
C:/Program Files/Go/src/testing/testing.go:1194 +0xef
created by testing.(*T).Run
C:/Program Files/Go/src/testing/testing.go:1239 +0x2b3
FAIL loop/panic_test 0.701s
recover
- recover需要在defer中调用,
- 调用recover后将不再打印相关堆栈信息
- recover需要和panic在一个协程中才起作用
package recover_test
import (
"fmt"
"runtime/debug"
"sync"
"testing"
)
var wg sync.WaitGroup
func server(workChan <-chan *Work) {
for work := range workChan {
//fmt.Printf("go work %+v\n", work)
go safelyDo(work)
}
}
func servernoblock(workChan <-chan *Work) {
var work *Work
for {
select {
case work = <-workChan:
go safelyDo(work)
default:
fmt.Println("quit")
return
}
}
}
type Work struct {
id int
stat int
}
func do(work *Work) {
if work.stat == 0 {
fmt.Printf("pannic work... %+v\n", work)
s := fmt.Sprintf("panic work %+v ", work)
panic(s)
} else {
fmt.Printf("normal work... %+v\n", work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
fmt.Println("work failed: ", err)
debug.PrintStack()
}
wg.Done()
}()
wg.Add(1)
do(work)
}
func TestRecover(t *testing.T) {
var (
work1 Work
work2 Work
work3 Work
)
work1.id = 1
work1.stat = 0
work2.id = 2
work2.stat = 1
work3.id = 3
work3.stat = 1
workchan := make(chan *Work, 100)
workchan <- &work1
workchan <- &work2
workchan <- &work3
servernoblock(workchan)
wg.Wait()
close(workchan)
}
由于recover,panic的堆栈信息未打印, 可以通过debug.PrintStack进行打印。 通过printstack重新调整了一下调用栈的打印。
package recover_test
import (
"bufio"
"fmt"
"runtime"
"strings"
"sync"
"testing"
)
var wg sync.WaitGroup
func server(workChan <-chan *Work) {
for work := range workChan {
//fmt.Printf("go work %+v\n", work)
go safelyDo(work)
}
}
func servernoblock(workChan <-chan *Work) {
var work *Work
for {
select {
case work = <-workChan:
go safelyDo(work)
default:
fmt.Println("quit")
return
}
}
}
type Work struct {
id int
stat int
}
func do(work *Work) {
if work.stat == 0 {
fmt.Printf("pannic work... %+v\n", work)
s := fmt.Sprintf("panic work %+v ", work)
panic(s)
} else {
fmt.Printf("normal work... %+v\n", work)
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
fmt.Println("work failed: ", err)
//debug.PrintStack()
buf := make([]byte, 1<<16)
stackSize := runtime.Stack(buf, true)
printoriginal(string(buf[0:stackSize]))
}
wg.Done()
}()
wg.Add(1)
do(work)
}
func printoriginal(s string) {
scanner := bufio.NewScanner(strings.NewReader(s))
flag := true
for scanner.Scan() {
s := strings.ToLower(scanner.Text())
if strings.HasPrefix(s, "goroutine") {
fmt.Println(scanner.Text())
flag = false
}
if flag {
fmt.Println(scanner.Text())
}
if flag == false && strings.HasPrefix(s, "panic") {
flag = true
scanner.Scan()
}
}
}
func TestRecover(t *testing.T) {
var (
work1 Work
work2 Work
work3 Work
)
work1.id = 1
work1.stat = 0
work2.id = 2
work2.stat = 1
work3.id = 3
work3.stat = 1
workchan := make(chan *Work, 100)
workchan <- &work1
workchan <- &work2
workchan <- &work3
servernoblock(workchan)
wg.Wait()
close(workchan)
}
A web server
示例是通过调用google服务QR码的功能。
package webserver_test
import (
"flag"
"html/template"
"log"
"net/http"
"testing"
)
var addr = flag.String("addr", ":1718", "http service address")
var templ = template.Must(template.New("qr").Parse(templateStr))
func TestWebserver(t *testing.T) {
flag.Parse()
http.Handle("/", http.HandlerFunc(QR))
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe:", err)
}
}
func QR(w http.ResponseWriter, req *http.Request) {
templ.Execute(w, req.FormValue("s"))
}
const templateStr = `
<html>
<head>
<title>QR Link Generator</title>
</head>
<body>
{{if .}}
<img src="http://chart.apis.google.com/chart?chs=300x300&cht=qr&choe=UTF-8&chl={{.}}" />
<br>
{{.}}
<br>
<br>
{{end}}
<form action="/" name=f method="GET">
<input maxLength=1024 size=70 name=s value="" title="Text to QR Encode">
<input type=submit value="Show QR" name=qr>
</form>
</body>
</html>
`
通过addr 设置服务监听端口, QR只是接收包含表单数据的请求,并以名为s的表单值对数据执行模板。templateStr 中 {{if .}} 到 {{end}} 中间部分代码, {{.}}非空时执行
参考及引用
Photo by Bruno Henrique from Pexels
https://golang.org/doc/effective_go


Comments are closed.