go 并发编程

概念

  • 协程与传统的系统级线程和进程相比,协程的最大优势在于其“轻量级”,可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。这也是协程也叫轻量级线程的原因。多数语言不支持携程,而是通过库的方式支持,这样如果调用同一个io如本地文件读写都会阻塞其他并发执行轻量级线程。

  • go在语言级别支持协程,叫goroutine。

  • go 关键词后面的语句会以一个新的线程去运行。

goroutine

func main() {
    go fmt.Println("Hello from another goroutine”)// 线程
    fmt.Println("Hello from main goroutine")

    // 至此,程序运行结束,
    // 所有活跃的goroutine被杀死
}
// 函数Publish在给定时间过期后打印text字符串到标准输出
// 该函数并不会阻塞而是立即返回
func Publish(text string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
    }()    // 注意这里的括号。必须调用匿名函数
}
func main() {
    Publish("A goroutine starts a new thread of execution.", 5*time.Second)
    fmt.Println("Let’s hope the news will published before I leave.")
    // 等待发布新闻
    time.Sleep(10 * time.Second)
    fmt.Println("Ten seconds later: I’m leaving now.")
}



输出:
Let’s hope the news will published before I leave.
BREAKING NEWS: A goroutine starts a new thread of execution.
Ten seconds later: I’m leaving now.

channel(通道)

  • channel是Go语言在语言级别提供的goroutine间的通信方式。

  • channel是类型相关的。也就是说,一个channel只能传递一种类型的值,这个类型需要在声明channel时指定。

  • 声明格式:var chanName chan ElementType

通道发送数据的格式

通道变量 <- 值

通道接收数据

  • 通道接收同样使用<-操作符
  • 阻塞接收数据,阻塞模式接收数据时,将接收变量作为<-操作符的左值 data := <-ch
  • 非阻塞接收数据,data, ok := <-ch data:表示接收到的数据。 ok:表示是否接收到数据。
  • 接收任意数据,忽略接收的数据<-ch
func sum(s []int, c chan int) {
        sum := 0
        for _, v := range s {
                sum += v
        }
        c <- sum // 把 sum 发送到通道 c
}
func main() {
        s := []int{7, 2, 8, -9, 4, 0}
        c := make(chan int)
        go sum(s[:len(s)/2], c)
        go sum(s[len(s)/2:], c)
        x, y := <-c, <-c // 从通道 c 中接收
        fmt.Println(x, y, x+y)
}
输出结果为:
-5 17 12
注意:默认情况下,通道是不带缓冲区的。发送端发送数据,同时必须又接收端相应的接收数据。

通道缓冲区

通道可以设置缓冲区,通过 make 的第二个参数指定缓冲区大小 :ch := make(chan int, 100)

func main() {
    // 这里我们定义了一个可以存储整数类型的带缓冲通道
        // 缓冲区大小为2
        ch := make(chan int, 2)
        // 因为 ch 是带缓冲的通道,我们可以同时发送两个数据
        // 而不用立刻需要去同步读取数据
        ch <- 1
        ch <- 2
        // 获取这两个数据
        fmt.Println(<-ch)
        fmt.Println(<-ch)
}

输出结果为:
1
2

Range

func main() {
    go func() {
        time.Sleep(1 * time.Hour)
    }()
    c := make(chan int)
    go func() {
        for i := 0; i < 10; i = i + 1 {
            c <- i
        }
        close(c)
    }()
    for i := range c {
        fmt.Println(i)
    }
    fmt.Println("Finished")
}

range c产生的迭代值为Channel中发送的值,它会一直迭代直到channel被关闭。上面的例子中如果把close(c)注释掉,程序会一直阻塞在for …… range那一行。

select

  • select语句选择一组可能的send操作和receive操作去处理。它类似switch,但是只是用来处理通讯(communication)操作。
  • 最多允许有一个default case,它可以放在case列表的任何位置,尽管我们大部分会将它放在最后。
  • 如果有同时多个case去处理,比如同时有多个channel可以接收数据,那么Go会伪随机的选择一个case处理(pseudo-random)。如果没有case需要处理,则会选择default去处理,如果default case存在的情况下。如果没有default case,则select语句会阻塞,直到某个case需要处理。
  • select语句和switch语句一样,它不是循环,它只会选择一个case来处理,如果想一直处理channel,你可以在外面加一个无限的for循环
import "fmt"
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

timeout

select有很重要的一个应用就是超时处理。 因为上面我们提到,如果没有case需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

下面这个例子我们会在2秒后往channel c1中发送一个数据,但是select设置为1秒超时,因此我们会打印出timeout 1,而不是result 1

func main() {
    c1 := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

它利用的是time.After方法,它返回一个类型为<-chan Time的单向的channel,在指定的时间发送一个当前时间给返回的channel中。

Timer和Ticker

timer是一个定时器,代表未来的一个单一事件,你可以告诉timer你要等待多长时间,它提供一个Channel,在将来的那个时间那个Channel提供了一个时间值。下面的例子中第二行会阻塞2秒钟左右的时间,直到时间到了才会继续执行。还可以使用timer.Stop来停止计时器。

timer2 := time.NewTimer(time.Second)
go func() {
    <-timer2.C
    fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
    fmt.Println("Timer 2 stopped")
}

ticker是一个定时触发的计时器,它会以一个间隔(interval)往Channel发送一个事件(当前时间),而Channel的接收者可以以固定的时间间隔从Channel中读取事件。

ticker := time.NewTicker(time.Millisecond * 500)
go func() {
    for t := range ticker.C {
        fmt.Println("Tick at", t)
    }
}()

类似timer, ticker也可以通过Stop方法来停止。

close

内建的close方法可以用来关闭channel。

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) //1
fmt.Println(<-c) //2
fmt.Println(<-c) //0
fmt.Println(<-c) //0

但是从这个关闭的channel中不但可以读取出已发送的数据,还可以不断的读取零值

同步

channel可以用在goroutine之间的同步。

下面的例子中main goroutine通过done channel等待worker完成任务。 worker做完任务后只需往channel发送一个数据就可以通知main goroutine任务完成。

func worker(done chan bool) {
    time.Sleep(time.Second)
    // 通知任务已完成
    done <- true
}
func main() {
    done := make(chan bool, 1)
    go worker(done)
    // 等待任务完成
    <-done

互斥锁

func main() {
var mutex sync.Mutex
count := 0

for r := 0; r < 50; r++ {
   go func() {
           mutex.Lock()
           count += 1
           mutex.Unlock()
       }()
   }

   time.Sleep(time.Second)
   fmt.Println("the count is : ", count)
}
当执行了 mutex.Lock() 操作后,如果有另外一个 goroutine 又执行了上锁操作,那么该操作被被阻塞,直到该互斥锁恢复到解锁状态。

WaitGroup

Go语言提供同步包(sync),Sync.WaitGroup是一种实现并发控制方式,WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。

  • Add(n) 把计数器设置为n
  • Done() 每次把计数器-1
  • wait() 会阻塞代码的运行,直到计数器地值减为0。
package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   //定义一个WaitGroup
   var wg sync.WaitGroup
   //计数器设置为2
   wg.Add(2)
   go func() {
      time.Sleep(2 * time.Second)
      fmt.Println("goroutineA finish")
      //计数器减1
      wg.Done()
   }()
   go func() {
      time.Sleep(2 * time.Second)
      fmt.Println("goroutineB finish")
      //计数器减1
      wg.Done()
   }()
   //会阻塞代码的运行,直到计数器地值减为0。
   wg.Wait()
   time.Sleep(2 * time.Second)
   fmt.Println("main fun exit")
}

Context

package main

import (
   "fmt"
   "time"
   "golang.org/x/net/context"
)

func main() {
   //创建一个可取消子context,context.Background():返回一个空的Context,这个空的Context一般用于整个Context树的根节点。
   ctx, cancel := context.WithCancel(context.Background())
   go func(ctx context.Context) {
      for {
         select {
         //使用select调用<-ctx.Done()判断是否要结束
         case <-ctx.Done():
            fmt.Println("goroutine exit")
            return
         default:
            fmt.Println("goroutine running.")
            time.Sleep(2 * time.Second)
         }
      }
   }(ctx)

   time.Sleep(10 * time.Second)
   fmt.Println("main fun exit")
   //取消context
   cancel()
   time.Sleep(5 * time.Second)

}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×