合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# 通道 * * * * * --: 作者:Mick 时间:2018年11月21日 * * * * * * [ ] 什么是通道 * [ ] 通道是解决什么问题的 * [ ] 通道的分类 * [ ] 通道的使用 ### 思考问题 1:并发读取文件夹下面的文件,假设文件夹下有9文件,每个文件都会进行大量处理(使用沉睡1秒的方式模拟处理),怎9个文件处理完毕需要9秒,问如何使用并行+goroutine缩短这时间 2:受线程数量的影响goroutinue的数量超过线程则是并发,要求是goroutinue的数量和线程数一直量处理文件 ``` package main import ( "path/filepath" "os" "time" "io/ioutil" "fmt" "sync" ) var wg sync.WaitGroup // 问题1 var filePath string = "./static/file" var files []string func init(){ err := filepath.Walk(filePath,func(path string,info os.FileInfo,err error)error{ if info.IsDir() { return nil } files = append(files,path) return nil }) if err != nil{ panic(err) } } func main(){ questionTwo() } // 如何控制goroutinue的数量,来并发并行的计算 func questionTwo(){ fileChan := make(chan string) fileNum := len(files) wg.Add(fileNum) for i:=0;i<fileNum;i++{ go ReadFileChan(fileChan) } for _,v := range files { fileChan <- v } wg.Wait() close(fileChan) } func ReadFileChan(fileChan chan string){ defer wg.Done() filePath := <- fileChan time.Sleep(time.Second) data,err := ioutil.ReadFile(filePath) fmt.Println(string(data),err) } //采用一个文件一个goroutinue的方式,如果文件是几十万个,则开启几十万个goroutinue func questionOne(){ //wg.Add(len(files)) 问题1 for _,v := range files { //go ReadFile(v) 问题1 ReadFile(v) } //wg.Wait() 问题1 } func ReadFile(filePath string){ //defer wg.Done() 问题1 time.Sleep(time.Second) data,err := ioutil.ReadFile(filePath) fmt.Println(string(data),err) } ``` ### 无缓冲通道 示例1:打网球示例 ``` func PlayerExam(){ court := make(chan int) wg.Add(2) go Player("Jack",court) go Player("Tom",court) court <- 1 wg.Wait() } func Player(name string,court chan int){ defer wg.Done() for{ ball,ok := <- court if !ok { fmt.Printf("Player %s Win\n",name) return } n := rand.Intn(100) if n %13 == 0{ fmt.Printf("Player %s Missed %d\n",name,ball) close(court) return } fmt.Printf("Player %s Hit %d\n",name,ball) ball ++ court <- ball } } ``` 示例2:接力赛示例 ``` func init(){ rand.Seed(time.Now().UnixNano()) } var wg sync.WaitGroup func main(){ baton := make(chan int) wg.Add(1) go Runner(baton) baton <- 1 wg.Wait() } func Runner(b chan int){ var newRunner int runner := <- b fmt.Printf("Runner %d Running With Baton\n",runner) time.Sleep(time.Millisecond*500) if runner == 4{ fmt.Printf("Runner %d Finish Race\n",runner) wg.Done() }else{ newRunner = runner + 1 fmt.Printf("Runner %d To The Line\n",newRunner) go Runner(b) fmt.Printf("Runner %d Exchange With Runner %d\n",runner,newRunner) b <- newRunner } } ``` ### 有缓冲通道 示例1:并发工作 ``` package main import ( "fmt" "sync" "time" ) type Work struct{ tasks chan string wg sync.WaitGroup } func(w *Work) Producter(n int){ w.tasks = make(chan string,n) } func(w *Work) Worker(id int,task chan string){ defer w.wg.Done() for v := range task{ fmt.Printf("Worker %d Begin %s\n",id,v) time.Sleep(time.Millisecond * 200) fmt.Printf("Worker %d Finish %s\n",id,v) } } func(w *Work) ShutDown(){ w.wg.Wait() fmt.Println("Work Over") } func main(){ goNum := 10 w := Work{} w.Producter(goNum) w.wg.Add(goNum) for i:=0;i<10;i++{ go w.Worker(i,w.tasks) } for i:=0;i<100;i++{ w.tasks <- fmt.Sprintf("Task %d",i) } close(w.tasks) w.wg.Wait() } ``` ### 小结 **并发是指 goroutine 运行的时候是相互独立的。 使用关键字 go 创建 goroutine 来运行函数。 goroutine 在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列。 竞争状态是指两个或者多个 goroutine 试图访问同一个资源。 原子函数和互斥锁提供了一种防止出现竞争状态的办法。 通道提供了一种在两个 goroutine 之间共享数据的简单方法。 无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证。**