multithreading 你将如何定义一个同时执行的 goroutine 池?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18405023/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How would you define a pool of goroutines to be executed at once?
提问by tux21b
TL;DR: Please just go to the last part and tell me how you would solve this problem.
TL;DR:请转到最后一部分并告诉我您将如何解决此问题。
I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bitof concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.
今天早上我开始使用来自 Python 的 Go。我想从 Go 中多次调用闭源可执行文件,并带有一些并发性,使用不同的命令行参数。我生成的代码运行良好,但我想得到您的意见以改进它。由于我处于早期学习阶段,我还将解释我的工作流程。
For the sake of simplicity, assume here that this "external closed-source program" is zenity
, a Linux command line tool that can display graphical message boxes from the command line.
为了简单起见,这里假设这个“外部闭源程序”是zenity
一个 Linux 命令行工具,可以从命令行显示图形消息框。
Calling an executable file from Go
从 Go 调用可执行文件
So, in Go, I would go like this:
所以,在 Go 中,我会这样:
package main
import "os/exec"
func main() {
cmd := exec.Command("zenity", "--info", "--text='Hello World'")
cmd.Run()
}
This should be working just right. Note that .Run()
is a functional equivalent to .Start()
followed by .Wait()
. This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.
这应该工作得恰到好处。请注意,.Run()
它与.Start()
后跟的功能等效.Wait()
。这很好,但是如果我只想执行一次这个程序,那么整个编程的东西就不值得了。所以让我们多次这样做。
Calling an executable multiple times
多次调用可执行文件
Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i
for the sake of simplicity).
现在我有了这个工作,我想用自定义命令行参数多次调用我的程序(这里只是i
为了简单起见)。
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
for i:=0; i<NumEl; i++ {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
}
Ok, we did it! But I still can't see the advantage of Go over Python?… This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.
好的,我们做到了!但是还是看不出Go over Python的优势?……这段代码其实是串行执行的。我有一个多核 CPU,我想利用它。所以让我们用 goroutines 添加一些并发性。
Goroutines, or a way to make my program parallel
Goroutines,或一种使我的程序并行的方法
a) First attempt: just add "go"s everywhere
a) 第一次尝试:只需在任何地方添加“go”
Let's rewrite our code to make things easier to call and reuse and add the famous go
keyword:
让我们重写我们的代码,使调用和重用更容易,并添加著名的go
关键字:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8
for i:=0; i<NumEl; i++ {
go callProg(i) // <--- There!
}
}
func callProg(i int) {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep
: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.
没有!问题是什么?所有的 goroutine 都被一次性执行。我真的不知道为什么不执行 zenity 但 AFAIK,Go 程序在 zenity 外部程序甚至可以初始化之前就退出了。这通过使用time.Sleep
: 等待几秒钟就足以让 zenity 的 8 实例自行启动。我不知道这是否可以被视为一个错误。
To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching?… I don't know how plain Go goroutines behave, but exec.Command
willlaunch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!
更糟糕的是,我真正想要调用的真正程序需要一段时间才能执行。如果我在我的 4 核 CPU 上并行执行这个程序的 8 个实例,它会浪费一些时间做很多上下文切换吗?......我不知道普通 Go协程的行为,但exec.Command
会在 8 个不同的情况下启动 zenity 8 次线程。更糟糕的是,我想执行这个程序超过 100,000 次。在 goroutine 中一次完成所有这些工作根本没有效率。不过,我想利用我的 4 核 CPU!
b) Second attempt: use pools of goroutines
b) 第二次尝试:使用 goroutines 池
The online resources tend to recommend the use of sync.WaitGroup
for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for allthe 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.
在线资源倾向于推荐sync.WaitGroup
用于此类工作。这种方法的问题在于,您基本上是在处理成批的 goroutine:如果我创建了 4 个成员的 WaitGroup,则 Go 程序将等待所有4 个外部程序完成,然后再调用新的 4 个程序批次。这效率不高:CPU 又一次被浪费了。
Some other resources recommended the use of a buffered channel to do the work:
其他一些资源建议使用缓冲通道来完成这项工作:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
for i:=0; i<NumEl; i++ {
go callProg(i, c)
c <- true // At the NumCoreth iteration, c is blocking
}
}
func callProg(i int, c chan bool) {
defer func () {<- c}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer
but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.
这看起来很丑。频道并非用于此目的:我正在利用副作用。我喜欢这个概念,defer
但我讨厌必须声明一个函数(甚至是一个 lambda)来从我创建的虚拟通道中弹出一个值。哦,当然,使用虚拟通道本身就是丑陋的。
c) Third attempt: die when all the children are dead
c) 第三次尝试:当所有孩子都死了就死
Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup
will be useful.
现在我们快完成了。我只需要考虑另一个副作用:Go 程序在所有 zenity 弹出窗口关闭之前关闭。这是因为当循环结束时(在第 8 次迭代时),没有什么可以阻止程序完成。这一次,sync.WaitGroup
会有用。
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
wg := new(sync.WaitGroup)
wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
for i:=0; i<NumEl; i++ {
go callProg(i, c, wg)
c <- true // At the NumCoreth iteration, c is blocking
}
wg.Wait() // Wait for all the children to die
close(c)
}
func callProg(i int, c chan bool, wg *sync.WaitGroup) {
defer func () {
<- c
wg.Done() // Decrease the number of alive goroutines
}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
Done.
完毕。
My questions
我的问题
- Do you know any other proper way to limit the number of goroutines executed at once?
- 你知道任何其他适当的方法来限制一次执行的 goroutines 的数量吗?
I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command
creates a new thread each time it is called, so I should control the number of time it is called.
我不是指线程;Go 如何在内部管理 goroutines 并不相关。我的真正意思是限制一次启动的 goroutines 的数量:exec.Command
每次调用时都会创建一个新线程,所以我应该控制它被调用的次数。
- Does that code look fine to you?
- Do you know how to avoid the use of a dummy channel in that case?
- 你觉得这段代码好吗?
- 您知道在这种情况下如何避免使用虚拟通道吗?
I can't convince myself that such dummy channels are the way to go.
我无法说服自己这样的虚拟频道是要走的路。
回答by tux21b
I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroupto wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:
我会生成 4 个从公共通道读取任务的工作程序 goroutine。比其他 Goroutine 更快(因为它们的调度方式不同或碰巧获得简单的任务)将从这个通道接收到比其他通道更多的任务。除此之外,我会使用sync.WaitGroup来等待所有工人完成。剩下的部分只是任务的创建。您可以在此处查看该方法的示例实现:
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
tasks := make(chan *exec.Cmd, 64)
// spawn four worker goroutines
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for cmd := range tasks {
cmd.Run()
}
wg.Done()
}()
}
// generate some tasks
for i := 0; i < 10; i++ {
tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
}
close(tasks)
// wait for the workers to finish
wg.Wait()
}
There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.
可能还有其他可能的方法,但我认为这是一个非常干净的解决方案,很容易理解。
回答by zzzz
A simple approach to throttling (execute f()
N times but maximum maxConcurrency
concurrently), just a scheme:
一种简单的节流方法(执行f()
N 次但最多maxConcurrency
同时执行),只是一个方案:
package main
import (
"sync"
)
const maxConcurrency = 4 // for example
var throttle = make(chan int, maxConcurrency)
func main() {
const N = 100 // for example
var wg sync.WaitGroup
for i := 0; i < N; i++ {
throttle <- 1 // whatever number
wg.Add(1)
go f(i, &wg, throttle)
}
wg.Wait()
}
func f(i int, wg *sync.WaitGroup, throttle chan int) {
defer wg.Done()
// whatever processing
println(i)
<-throttle
}
I wouldn't probably call the throttle
channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.
我可能不会称该throttle
频道为“虚拟”。恕我直言,这是一种优雅的方式(当然这不是我的发明),如何限制并发性。
BTW: Please note that you're ignoring the returned error from cmd.Run()
.
顺便说一句:请注意,您忽略了从cmd.Run()
.
回答by korovkin
try this: https://github.com/korovkin/limiter
试试这个:https: //github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
zenity(...)
})
limiter.Wait()