在 Java 中实现协程

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2846664/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 13:34:23  来源:igfitidea点击:

Implementing coroutines in Java

javakotlin-coroutines

提问by JUST MY correct OPINION

This question is related to my question on existing coroutine implementations in Java. If, as I suspect, it turns out that there is no full implementation of coroutines currently available in Java, what would be required to implement them?

这个问题与我关于Java 中现有协程实现的问题有关。如果,正如我所怀疑的,事实证明 Java 中目前没有可用的协程的完整实现,那么实现它们需要什么?

As I said in that question, I know about the following:

正如我在那个问题中所说,我知道以下几点:

  1. You can implement "coroutines" as threads/thread pools behind the scenes.
  2. You can do tricksy things with JVM bytecode behind the scenes to make coroutines possible.
  3. The so-called "Da Vinci Machine" JVM implementation has primitives that make coroutines doable without bytecode manipulation.
  4. There are various JNI-based approaches to coroutines also possible.
  1. 您可以在幕后将“协程”实现为线程/线程池。
  2. 你可以在幕后用 JVM 字节码做一些棘手的事情,使协程成为可能。
  3. 所谓的“达芬奇机器”JVM 实现具有使协同程序无需字节码操作即可执行的原语。
  4. 还有各种基于 JNI 的协程方法也是可能的。

I'll address each one's deficiencies in turn.

我将依次解决每个人的不足之处。

Thread-based coroutines

基于线程的协程

This "solution" is pathological. The whole point of coroutines is to avoidthe overhead of threading, locking, kernel scheduling, etc. Coroutines are supposed to be light and fast and to execute only in user space. Implementing them in terms of full-tilt threads with tight restrictions gets rid of all the advantages.

这种“解决方案”是病态的。协程的全部意义在于避免线程、锁定、内核调度等的开销。协程应该是轻量级和快速的,并且只在用户空间中执行。在严格限制的全倾斜螺纹方面实施它们可以消除所有优点。

JVM bytecode manipulation

JVM 字节码操作

This solution is more practical, albeit a bit difficult to pull off. This is roughly the same as jumping down into assembly language for coroutine libraries in C (which is how many of them work) with the advantage that you have only one architecture to worry about and get right.

这个解决方案更实用,虽然有点难以实现。这与跳到 C 中协程库的汇编语言(这是其中多少工作)大致相同,优点是您只需要担心和正确使用一种架构。

It also ties you down to only running your code on fully-compliant JVM stacks (which means, for example, no Android) unless you can find a way to do the same thing on the non-compliant stack. If you do find a way to do this, however, you have now doubled your system complexity and testing needs.

它还使您只能在完全兼容的 JVM 堆栈(这意味着,例如,没有 Android)上运行您的代码,除非您可以找到一种方法在不兼容的堆栈上执行相同的操作。但是,如果您确实找到了一种方法来执行此操作,那么您现在的系统复杂性和测试需求就会增加一倍。

The Da Vinci Machine

达芬奇机器

The Da Vinci Machine is cool for experimentation, but since it is not a standard JVM its features aren't going to be available everywhere. Indeed I suspect most production environments would specifically forbid the use of the Da Vinci Machine. Thus I could use this to make cool experiments but not for any code I expect to release to the real world.

Da Vinci Machine 非常适合用于实验,但由于它不是标准的 JVM,它的功能不会随处可用。事实上,我怀疑大多数生产环境会明确禁止使用达芬奇机器。因此,我可以用它来做很酷的实验,但不能用于我希望发布到现实世界的任何代码。

This also has the added problem similar to the JVM bytecode manipulation solution above: won't work on alternative stacks (like Android's).

这也增加了类似于上面的 JVM 字节码操作解决方案的问题:不适用于替代堆栈(如 Android 的)。

JNI implementation

JNI 实现

This solution renders the point of doing this in Java at all moot. Each combination of CPU and operating system requires independent testing and each is a point of potentially frustrating subtle failure. Alternatively, of course, I could tie myself down to one platform entirely but this, too, makes the point of doing things in Java entirely moot.

该解决方案使在 Java 中执行此操作的意义完全没有实际意义。CPU 和操作系统的每种组合都需要独立测试,并且每种组合都是潜在的令人沮丧的细微故障点。或者,当然,我可以将自己完全束缚在一个平台上,但这也使得用 Java 做事的意义完全没有意义。

So...

所以...

Is there any way to implement coroutines in Java without using one of these four techniques? Or will I be forced to use the one of those four that smells the least (JVM manipulation) instead?

有没有办法在不使用这四种技术之一的情况下在 Java 中实现协程?或者我是否会被迫使用这四种气味最少的一种(JVM 操作)来代替?



Edited to add:

编辑添加:

Just to ensure that confusion is contained, this is a relatedquestion to my other one, but not the same. That one is looking for an existingimplementation in a bid to avoid reinventing the wheel unnecessarily. This one is a question relating to how one would go about implementing coroutines in Java should the other prove unanswerable. The intent is to keep different questions on different threads.

只是为了确保包含混淆,这是我的另一个相关的问题,但不一样。那个人正在寻找现有的实现,以避免不必要地重新发明轮子。这是一个问题,如果另一个问题被证明无法回答,将如何在 Java 中实现协程。目的是在不同的线程上保留不同的问题。

采纳答案by luke

I would take a look at this: http://www.chiark.greenend.org.uk/~sgtatham/coroutines.html, its pretty interesting and should provide a good place to start. But of course we are using Java so we can do better (or maybe worse because there are no macros :))

我会看看这个:http: //www.chiark.greenend.org.uk/~sgtatham/coroutines.html,它非常有趣,应该提供一个很好的起点。但是当然我们正在使用 Java,所以我们可以做得更好(或者可能更糟,因为没有宏:))

From my understanding with coroutines you usually have a producerand a consumercoroutine (or at least this is the most common pattern). But semantically you don't want the producer to call the consumer or visa-versa because this introduces an asymmetry. But given the way stack based languages work we will need to have someone do the calling.

根据我对协程的理解,您通常有一个生产者和一个消费者协程(或者至少这是最常见的模式)。但从语义上讲,您不希望生产者调用消费者或反之亦然,因为这会引入不对称性。但是考虑到基于堆栈的语言的工作方式,我们需要有人来调用。

So here is a very simple type hierarchy:

所以这是一个非常简单的类型层次结构:

public interface CoroutineProducer<T>
{
    public T Produce();
    public boolean isDone();
}

public interface CoroutineConsumer<T>
{
    public void Consume(T t);
}

public class CoroutineManager
{
    public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con)
    {
        while(!prod.IsDone()) // really simple
        {
            T d = prod.Produce();
            con.Consume(d);
        }
    }
}

Now of course the hard part is implementingthe interfaces, in particular it is difficult to break a computation into individual steps. For this you would probably want a whole other set of persistent control structures. The basic idea is that we want to simulate non-local transfer of control (in the end its kinda like we're simulating a goto). We basically want to move away from using the stack and the pc(program-counter) by keeping the state of our current operations in the heap instead of on the stack. Therefore we are going to need a bunch of helper classes.

现在当然最困难的部分是实现接口,特别是很难将计算分解为单独的步骤。为此,您可能需要一整套其他持久控制结构。基本思想是我们想要模拟非本地控制转移(最终它有点像我们模拟 a goto)。我们基本上希望pc通过将当前操作的状态保持在堆中而不是堆栈中来摆脱使用堆栈和(程序计数器)。因此,我们将需要一堆辅助类。

For example:

例如:

Let's say that in an ideal world you wanted to write a consumer that looked like this (psuedocode):

假设在理想的世界中,您希望编写一个如下所示的使用者(伪代码):

boolean is_done;
int other_state;
while(!is_done)
{
    //read input
    //parse input
    //yield input to coroutine
    //update is_done and other_state;
}

we need to abstract the local variable like is_doneand other_stateand we need to abstract the while loop itself because our yieldlike operation is not going to be using the stack. So let's create a while loop abstraction and associated classes:

我们需要抽象局部变量 likeis_done并且other_state我们需要抽象 while 循环本身,因为我们的yieldlike 操作不会使用堆栈。因此,让我们创建一个 while 循环抽象和关联的类:

enum WhileState {BREAK, CONTINUE, YIELD}
abstract class WhileLoop<T>
{
    private boolean is_done;
    public boolean isDone() { return is_done;}
    private T rval;
    public T getReturnValue() {return rval;} 
    protected void setReturnValue(T val)
    {
        rval = val;
    }


    public T loop()
    {
        while(true)
        {
            WhileState state = execute();
            if(state == WhileState.YIELD)
                return getReturnValue();
            else if(state == WhileState.BREAK)
                    {
                       is_done = true;
                return null;
                    }
        }
    }
    protected abstract WhileState execute();
}

The Basic trick here is to move localvariables to be classvariables and turn scope blocks into classes which gives us the ability to 're-enter' our 'loop' after yielding our return value.

这里的基本技巧是将局部变量移动为变量并将作用域块转换为类,这使我们能够在产生返回值后“重新进入”我们的“循环”。

Now to implement our producer

现在来实现我们的生产者

public class SampleProducer : CoroutineProducer<Object>
{
    private WhileLoop<Object> loop;//our control structures become state!!
    public SampleProducer()
    {
        loop = new WhileLoop()
        {
            private int other_state;//our local variables become state of the control structure
            protected WhileState execute() 
            {
                //this implements a single iteration of the loop
                if(is_done) return WhileState.BREAK;
                //read input
                //parse input
                Object calcluated_value = ...;
                //update is_done, figure out if we want to continue
                setReturnValue(calculated_value);
                return WhileState.YIELD;
            }
        };
    }
    public Object Produce()
    {
        Object val = loop.loop();
        return val;
    }
    public boolean isDone()
    {
        //we are done when the loop has exited
        return loop.isDone();
    }
}

Similar tricks could be done for other basic control flow structures. You would ideally build up a library of these helper classes and then use them to implement these simple interfaces which would ultimately give you the semantics of co-routines. I'm sure everything I've written here can be generalized and expanded upon greatly.

类似的技巧可以用于其他基本的控制流结构。理想情况下,您将构建这些帮助程序类的库,然后使用它们来实现这些简单的接口,最终为您提供协同程序的语义。我确信我在这里写的所有内容都可以进行广泛的概括和扩展。

回答by Howard Lovatt

I have a Coroutine class that I use in Java. It is based on threads and using threads has the advantage of allowing parallel operation, which on multicore machines can be an advantage. Therefore you might want to consider a thread based approach.

我有一个在 Java 中使用的 Coroutine 类。它基于线程,使用线程的优点是允许并行操作,这在多核机器上可能是一个优势。因此,您可能需要考虑基于线程的方法。

回答by Bunny83

I just came across this question and just want to mention that i think it might be possible to implement coroutines or generators in a similar way C# does. That said i don't actually use Java but the CIL has quite similar limitations as the JVM has.

我刚刚遇到了这个问题,只想提一下,我认为有可能以与 C# 类似的方式实现协程或生成器。也就是说,我实际上并不使用 Java,但 CIL 与 JVM 具有非常相似的限制。

The yield statementin C# is a pure language feature and is not part of the CIL bytecode. The C# compiler just creates a hidden private class for each generator function. If you use the yield statement in a function it has to return an IEnumerator or an IEnumerable. The compiler "packs" your code into a statemachine-like class.

C# 中的yield 语句是一种纯语言特性,不是 CIL 字节码的一部分。C# 编译器只是为每个生成器函数创建一个隐藏的私有类。如果在函数中使用 yield 语句,它必须返回 IEnumerator 或 IEnumerable。编译器将您的代码“打包”到一个类似状态机的类中。

The C# compiler might use some "goto's" in the generated code to make the conversion into a statemachine easier. I don't know the capabilities of Java bytecode and if there's something like a plain unconditional jump, but at "assembly level" it's usually possible.

C# 编译器可能会在生成的代码中使用一些“goto”,以便更轻松地转换为状态机。我不知道 Java 字节码的功能,如果有类似无条件跳转之类的东西,但在“程序集级别”通常是可能的。

As already mentioned this feature has to be implemented in the compiler. Because i have only little knowledge about Java and it's compiler i can't tell if it's possible to alter / extend the compiler, maybe with a "preprocessor" or something.

如前所述,此功能必须在编译器中实现。因为我对 Java 和它的编译器知之甚少,所以我不知道是否可以改变/扩展编译器,也许使用“预处理器”或其他东西。

Personally i love coroutines. As a Unity games developer i use them quite often. Because i play alot of Minecraft with ComputerCraft i was curious why coroutines in Lua (LuaJ) are implemented with threads.

我个人喜欢协程。作为 Unity 游戏开发人员,我经常使用它们。因为我用 ComputerCraft 玩了很多 Minecraft 我很好奇为什么 Lua (LuaJ) 中的协程是用线程实现的。

回答by Roman Elizarov

I'd suggest to look at Kotlin coroutines on JVM. It falls into a different category, though. There is no byte-code manipulation involved and it works on Android, too. However, you will have to write your coroutines in Kotlin. The upside is that Kotlin is designed for interoperability with Java in mind, so you can still continue to use all your Java libraries and freely combine Kotlin and Java code in the same project, even putting them side-by-side in the same directories and packages.

我建议在 JVM 上查看Kotlin 协程。不过,它属于不同的类别。不涉及字节码操作,它也适用于 Android。但是,您必须在 Kotlin 中编写协程。好处是 Kotlin 在设计时考虑到了与 Java 的互操作性,因此您仍然可以继续使用所有 Java 库并在同一个项目中自由组合 Kotlin 和 Java 代码,甚至将它们并排放在相同的目录和包。

This Guide to kotlinx.coroutinesprovides many more examples, while the coroutines designdocument explains all the motivation, use-cases and implementation details.

kotlinx.coroutines 指南提供了更多示例,而协程设计文档解释了所有动机、用例和实现细节。

回答by gooboo

Kotlin uses the following approach for co-routines
(from https://kotlinlang.org/docs/reference/coroutines.html):

Kotlin 对协程使用以下方法
(来自https://kotlinlang.org/docs/reference/coroutines.html):

Coroutines are completely implemented through a compilation technique (no support from the VM or OS side is required), and suspension works through code transformation. Basically, every suspending function (optimizations may apply, but we'll not go into this here) is transformed to a state machine where states correspond to suspending calls. Right before a suspension, the next state is stored in a field of a compiler-generated class along with relevant local variables, etc. Upon resumption of that coroutine, local variables are restored and the state machine proceeds from the state right after suspension.

A suspended coroutine can be stored and passed around as an object that keeps its suspended state and locals. The type of such objects is Continuation, and the overall code transformation described here corresponds to the classical Continuation-passing style. Consequently, suspending functions take an extra parameter of type Continuation under the hood.

协程完全通过编译技术实现(不需要VM或OS端的支持),暂停通过代码转换工作。基本上,每个挂起函数(可能会应用优化,但我们不会在这里讨论)被转换为状态机,其中状态对应于挂起调用。就在暂停之前,下一个状态与相关局部变量等一起存储在编译器生成的类的字段中。在恢复该协程时,局部变量被恢复,状态机从暂停后的状态继续进行。

挂起的协程可以作为保持其挂起状态和局部变量的对象进行存储和传递。此类对象的类型是 Continuation,这里描述的整体代码转换对应于经典的 Continuation-passing 风格。因此,挂起函数在幕后采用了一个 Continuation 类型的额外参数。

Check out the design document at https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md

https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md查看设计文档

回答by John Lee

There's an another choice is here for Java6+

Java6+还有另一个选择

A pythonic coroutine implementation:

一个pythonic协程实现:

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class CorRunRAII {
    private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>();

    public CorRunRAII add(CorRun resource) {
        if (resource == null) {
            return this;
        }
        resources.add(new WeakReference<>(resource));

        return this;
    }

    public CorRunRAII addAll(List<? extends CorRun> arrayList) {
        if (arrayList == null) {
            return this;
        }
        for (CorRun corRun : arrayList) {
            add(corRun);
        }

        return this;
    }

    @Override
    protected void finalize() throws Throwable {
        super.finalize();

        for (WeakReference<? extends CorRun> corRunWeakReference : resources) {
            CorRun corRun = corRunWeakReference.get();
            if (corRun != null) {
                corRun.stop();
            }
        }
    }
}

class CorRunYieldReturn<ReceiveType, YieldReturnType> {
    public final AtomicReference<ReceiveType> receiveValue;
    public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue;

    CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        this.receiveValue = receiveValue;
        this.yieldReturnValue = yieldReturnValue;
    }
}

interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> {
    boolean start();
    void stop();
    void stop(final Throwable throwable);
    boolean isStarted();
    boolean isEnded();
    Throwable getError();

    ReceiveType getReceiveValue();
    void setResultForOuter(YieldReturnType resultForOuter);
    YieldReturnType getResultForOuter();

    YieldReturnType receive(ReceiveType value);
    ReceiveType yield();
    ReceiveType yield(YieldReturnType value);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another);
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value);
}

abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private ReceiveType receiveValue;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>();

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Throwable error;

    private YieldReturnType resultForOuter;

    @Override
    public boolean start() {

        boolean isStarted = this.isStarted.getAndSet(true);
        if ((! isStarted)
                && (! isEnded())) {
            receive(null);
        }

        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(Throwable throwable) {
        isEnded.set(true);
        if (throwable != null) {
            error = throwable;
        }

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                child.stop();
            }
        }
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    @Override
    public boolean isEnded() {
        return isEnded.get();
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public ReceiveType getReceiveValue() {
        return receiveValue;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter = resultForOuter;
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return resultForOuter;
    }

    @Override
    public synchronized YieldReturnType receive(ReceiveType value) {
        receiveValue = value;

        run();

        return getResultForOuter();
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(YieldReturnType value) {
        resultForOuter = value;
        return receiveValue;
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            boolean isStarted = another.start();
            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            return another.receive(value);
        }
    }

    @Override
    public void run() {
        try {
            this.call();
        }
        catch (Exception e) {
            e.printStackTrace();

            stop(e);
            return;
        }
    }
}

abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> {

    private final ExecutorService childExecutorService = newExecutorService();
    private ExecutorService executingOnExecutorService;

    private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>());

    private final CorRun<ReceiveType, YieldReturnType> self;
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList;
    private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn;

    private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue;

    // Outside

    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AtomicBoolean isEnded = new AtomicBoolean(false);
    private Future<YieldReturnType> future;
    private Throwable error;

    private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>();

    CorRunThread() {
        executingOnExecutorService = childExecutorService;

        receiveQueue = new LinkedBlockingDeque<>();
        potentialChildrenCoroutineList = new ArrayList<>();

        self = this;
    }

    @Override
    public void run() {
        try {
            self.call();
        }
        catch (Exception e) {
            stop(e);
            return;
        }

        stop();
    }

    @Override
    public abstract YieldReturnType call();

    @Override
    public boolean start() {
        return start(childExecutorService);
    }

    protected boolean start(ExecutorService executorService) {
        boolean isStarted = this.isStarted.getAndSet(true);
        if (!isStarted) {
            executingOnExecutorService = executorService;
            future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self);
        }
        return isStarted;
    }

    @Override
    public void stop() {
        stop(null);
    }

    @Override
    public void stop(final Throwable throwable) {
        if (throwable != null) {
            error = throwable;
        }
        isEnded.set(true);

        returnYieldValue(null);
        // Do this for making sure the coroutine has checked isEnd() after getting a dummy value
        receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN);

        for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) {
            CorRun child = weakReference.get();
            if (child != null) {
                if (child instanceof CorRunThread) {
                    ((CorRunThread)child).tryStop(childExecutorService);
                }
            }
        }

        childExecutorService.shutdownNow();
    }

    protected void tryStop(ExecutorService executorService) {
        if (this.executingOnExecutorService == executorService) {
            stop();
        }
    }

    @Override
    public boolean isEnded() {
        return isEnded.get() || (
                future != null && (future.isCancelled() || future.isDone())
                );
    }

    @Override
    public boolean isStarted() {
        return isStarted.get();
    }

    public Future<YieldReturnType> getFuture() {
        return future;
    }

    @Override
    public Throwable getError() {
        return error;
    }

    @Override
    public void setResultForOuter(YieldReturnType resultForOuter) {
        this.resultForOuter.set(resultForOuter);
    }

    @Override
    public YieldReturnType getResultForOuter() {
        return this.resultForOuter.get();
    }

    @Override
    public YieldReturnType receive(ReceiveType value) {

        LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>();

        offerReceiveValue(value, yieldReturnValue);

        try {
            AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take();
            return takeValue == null ? null : takeValue.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    @Override
    public ReceiveType yield() {
        return yield(null);
    }

    @Override
    public ReceiveType yield(final YieldReturnType value) {
        returnYieldValue(value);

        return getReceiveValue();
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) {
        return yieldFrom(another, null);
    }

    @Override
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) {
        if (another == null || another.isEnded()) {
            throw new RuntimeException("Call null or isEnded coroutine");
        }

        boolean isStarted = false;
        potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another));

        synchronized (another) {
            if (another instanceof CorRunThread) {
                isStarted = ((CorRunThread)another).start(childExecutorService);
            }
            else {
                isStarted = another.start();
            }

            boolean isJustStarting = ! isStarted;
            if (isJustStarting && another instanceof CorRunSync) {
                return another.getResultForOuter();
            }

            TargetYieldReturnType send = another.receive(value);
            return send;
        }
    }

    @Override
    public ReceiveType getReceiveValue() {

        setLastCorRunYieldReturn(takeLastCorRunYieldReturn());

        return lastCorRunYieldReturn.receiveValue.get();
    }

    protected void returnYieldValue(final YieldReturnType value) {
        CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn;
        if (corRunYieldReturn != null) {
            corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value));
        }
    }

    protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) {
        receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue));
    }

    protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() {
        try {
            return receiveQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }

    protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) {
        this.lastCorRunYieldReturn = lastCorRunYieldReturn;
    }

    protected ExecutorService newExecutorService() {
        return Executors.newCachedThreadPool(getThreadFactory());
    }

    protected ThreadFactory getThreadFactory() {
        return new ThreadFactory() {
            @Override
            public Thread newThread(final Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread thread, Throwable throwable) {
                        throwable.printStackTrace();
                        if (runnable instanceof CorRun) {
                            CorRun self = (CorRun) runnable;
                            self.stop(throwable);
                            thread.interrupt();
                        }
                    }
                });
                return thread;
            }
        };
    }
}

Now you can use pythonic coroutines in this way (e.g. fibonacci numbers)

现在你可以以这种方式使用 pythonic 协程(例如斐波那契数)

Thread Version:

线程版本:

class Fib extends CorRunThread<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();
        do {
            int a = 1, b = 1;
            for (int i = 0; times != null && i < times; i++) {
                int temp = a + b;
                a = b;
                b = temp;
            }
            // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller
            times = yield(a);
        } while (! isEnded());

        setResultForOuter(Integer.MAX_VALUE);
        return getResultForOuter();
    }
}

class MainRun extends CorRunThread<String, String> {

    @Override
    public String call() {

        // The fib coroutine would be recycled by its parent
        // (no requirement to call its start() and stop() manually)
        // Otherwise, if you want to share its instance and start/stop it manually,
        // please start it before being called by yieldFrom() and stop it in the end.
        Fib fib = new Fib();
        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current`
            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;

        }

        setResultForOuter(result);

        return result;
    }
}

Sync(non-thread) version:

同步(非线程)版本:

class Fib extends CorRunSync<Integer, Integer> {

    @Override
    public Integer call() {
        Integer times = getReceiveValue();

        int a = 1, b = 1;
        for (int i = 0; times != null && i < times; i++) {
            int temp = a + b;
            a = b;
            b = temp;
        }
        yield(a);

        return getResultForOuter();
    }
}

class MainRun extends CorRunSync<String, String> {

    @Override
    public String call() {

        CorRun<Integer, Integer> fib = null;
        try {
            fib = new Fib();
        } catch (Exception e) {
            e.printStackTrace();
        }

        String result = "";
        Integer current;
        int times = 10;
        for (int i = 0; i < times; i++) {

            current = yieldFrom(fib, i);

            if (fib.getError() != null) {
                throw new RuntimeException(fib.getError());
            }

            if (current == null) {
                continue;
            }

            if (i > 0) {
                result += ",";
            }
            result += current;
        }

        stop();
        setResultForOuter(result);

        if (Utils.isEmpty(result)) {
            throw new RuntimeException("Error");
        }

        return result;
    }
}


Execution(Both versions will work):

执行(两个版本都可以):

// Run the entry coroutine
MainRun mainRun = new MainRun();
mainRun.start();

// Wait for mainRun ending for 5 seconds
long startTimestamp = System.currentTimeMillis();
while(!mainRun.isEnded()) {
    if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) {
        throw new RuntimeException("Wait too much time");
    }
}
// The result should be "1,1,2,3,5,8,13,21,34,55"
System.out.println(mainRun.getResultForOuter());

回答by OlliP

There is also Quasarfor Java and Project Loomat Oracle where extensions are made to the JVM for fibers and continuations. Here is a presentation of Loomon Youtoube. There are several more. Easy to find with a little searching.

Oracle还有Quasarfor Java 和Project Loom,其中对 JVM 进行了光纤和延续的扩展。这是Youtoube上 Loom介绍。还有几个。稍加搜索即可轻松找到。