Java 线程乒乓示例

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/12785238/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 10:12:43  来源:igfitidea点击:

Java Thread Ping Pong example

javamultithreadingconcurrency

提问by Kummo

I'm trying to understand thread basics, and as a first example I create two thread that write a String on the stdout. As I know the scheduler allows to execute the threads using a round robin schedule. Thats why I got:

我正在尝试了解线程基础知识,作为第一个示例,我创建了两个在 stdout 上写入字符串的线程。据我所知,调度程序允许使用循环调度执行线程。这就是为什么我得到:

PING PING pong pong pong PING PING PING pong pong

乒乓乒乓乒乓乒乓乒乓

Now I want to use a shared variable, so every thread will know if its your turn:

现在我想使用一个共享变量,所以每个线程都会知道是否轮到你了:

public class PingPongThread extends Thread {
private String msg;
private static String turn;

public PingPongThread(String msg){
    this.msg = msg;
}
@Override
public void run() {
    while(true) {
        playTurn();
    }

}
public synchronized void playTurn(){
    if (!msg.equals(turn)){
        turn=msg;
        System.out.println(msg);
    }
}
}

Main class:

主类:

public class ThreadTest {
    public static void main(String[] args) {
        PingPongThread thread1 = new PingPongThread("PING");
        PingPongThread thread2 = new PingPongThread("pong");
        thread1.start();
        thread2.start();
    }
}

I synchronized the "turn manager" but I still get something like:

我同步了“turn manager”,但我仍然得到类似的信息:

PING PING pong pong pong PING PING PING pong pong

乒乓乒乓乒乓乒乓乒乓

Can someone explains what I am missing, and Why I'm not getting Ping pong... ping pong. Thanks!

有人可以解释我缺少什么,以及为什么我没有得到 Ping pong... ping pong。谢谢!

回答by linski

This line:

这一行:

public synchronized void playTurn(){
    //code
}

is equivalent in behavior to

在行为上等价于

public void playTurn() {
    synchronized(this) {
         //code
    }
}

that's why no synchronization is occuring, because as Brian Agnew pointed out, the threads are synchronizing on two different objects (thread1, thread2), each on it's own instance resulting in no effective synchronization.

这就是没有发生同步的原因,因为正如 Brian Agnew 所指出的,线程在两个不同的对象(thread1、thread2)上进行同步,每个对象都在它自己的实例上导致没有有效的同步。

If you would use your turn variable for synchronization, e.g.:

如果您将使用转弯变量进行同步,例如:

private static String turn = ""; // must initialize or you ll get an NPE

public void playTurn() {
    synchronized(turn) {
         //...
         turn = msg; // (1)
         //...
    }
}

then the situation is a lot better (run multiple times to verify), but there is also no 100% synchronization. In the beggining (mostly) you get a double ping and double pong, and afterwards they look synchronized, but you still can get double pings/pongs.

那么情况会好很多(运行多次验证),但也没有100%同步。在开始时(大多数情况下)您会得到双乒乓和双乒乓,之后它们看起来是同步的,但您仍然可以获得双乒乓/乒乓。

The synchronized block locks upon value(see this great answer) and not the reference to that value. (see EDIT)

同步块锁定(参见这个很好的答案)而不是对该值引用。(见编辑)

So let's take a look at one possible scenario:

那么让我们来看看一种可能的情况:

thread1 locks on ""
thread2 blocks on ""
thread1 changes the value of turn variable to "PING" - thread2 can continue since "" is no longer locked 

To verify that I tried putting

为了验证我是否尝试放置

try {
    Thread.currentThread().sleep(1000); // try with 10, 100 also multiple times
 } 
 catch (InterruptedException ex) {}

before and after

之前和之后

turn = msg;

and it looks synchronized?! But, if you put

看起来是同步的?!但是,如果你把

 try {
    Thread.yield();
    Thread.currentThread().sleep(1000); //  also  try multiple times
 } 
 catch (InterruptedException ex) {}

after few seconds you'll see double pings/pongs. Thread.yield()essenitally means "I'm done with the processor, put some else thread to work". This is obviously system thread scheduler implementation on my OS.

几秒钟后,您会看到双 pings/pongs。Thread.yield()本质上的意思是“我已经完成了处理器,让其他线程工作”。这显然是我的操作系统上的系统线程调度程序实现。

So, to synchronize correctly we must remove line

因此,要正确同步,我们必须删除行

    turn = msg;

so that threads could always synchronize on the same value - not really :) As explained in the great answergiven above - Strings (immutable objects) are dangerous as locks - because if you create String "A" on 100 places in your program all 100 references(variables) will point to the same "A" in memory - so you could oversynchronize.

这样线程就可以始终在相同的值上同步 - 不是真的:) 正如上面给出的很好的答案所解释的那样- 字符串(不可变对象)作为锁是危险的 - 因为如果你在程序的 100 个地方创建字符串“A”,所有 100引用(变量)将指向内存中相同的“A” - 所以你可以过度同步.

So, to answer your original question, modify your code like this:

因此,要回答您的原始问题,请像这样修改您的代码:

 public void playTurn() {
    synchronized(PingPongThread.class) {
         //code
    }
}

and the parallel PingPong example will be 100% correctly implemented (see EDIT^2).

并且并行 PingPong 示例将 100% 正确实现(请参阅 EDIT^2)。

The above code is equivalent to:

上面的代码等价于:

 public static synchronized void playTurn() {
     //code
 }

The PingPongThread.class is a Class object, e.g. on every instance you can call getClass()which always has only one instance.

PingPongThread.class 是一个Class 对象,例如在每个实例上你可以调用getClass(),它总是只有一个实例。

Also you could do like this

你也可以这样做

 public static Object lock = new Object();

 public void playTurn() {
    synchronized(lock) {
         //code
    }
}

Also, read and program examples(running multiple times whenever neccessary) this tutorial.

此外,阅读本教程并编写示例(必要时运行多次)。

EDIT:

编辑:

To be technically correct:

技术上讲是正确的

The synchronized method is the same as synchronized statement locking upon this. Let's call the argument of the synchronized statement "lock" - as Marko pointed out, "lock" is a variable storing a reference to an object/instance of a class. To quote the spec:

同步方法与同步语句锁定相同。让我们称同步语句的参数为“lock”——正如 Marko 所指出的,“lock”是一个变量,存储对类的对象/实例的引用。引用规范:

The synchronized statement computes a reference to an object; it then attempts to perform a lock action on that object's monitor..

同步语句计算对象的引用;然后它尝试在该对象的监视器上执行锁定操作。

So the synchronizaton is not acutally made upon value- the object/class instance, but upon the object monitor associatedwith that instance/value. Because

因此,同步并不是根据(对象/类实例)实际进行的,而是根据与该实例/值关联对象监视器进行的。因为

Each object in Java is associated with a monitor..

Java 中的每个对象都与一个监视器相关联。

the effect remains the same.

效果保持不变。

EDIT^2:

编辑^2:

Following up on the comments remarks: "and the parallel PingPong example will be 100% correctly implemented" - meaning, the desired behavior is achieved (without error).

跟进评论评论:“并且并行 PingPong 示例将 100% 正确实现” - 意思是,实现了所需的行为(没有错误)。

IMHO, a solution is correct if the result is correct. There are many ways of solving the problem, so the next criteria would be simplicity/elegance of the solution - the phaser solution is better approach, because as Marko said in other words in some comment there is a lot lesser chance of making error using phaser object than using synchronized mechanism - which can be seen from all the (non)solution variants in this post. Notable to see is also comparison of code size and overall clarity.

恕我直言,如果结果正确,则解决方案是正确的。有很多方法可以解决这个问题,所以下一个标准是解决方案的简单性/优雅 - 移相器解决方案是更好的方法,因为正如 Marko 在某些评论中所说的那样,使用移相器出错的可能性要小得多对象而不是使用同步机制 - 这可以从本文中的所有(非)解决方案变体中看出。值得注意的是代码大小和整体清晰度的比较。

To conclude, this sort of constructsshould be used whenever they are applicable to the problem in question.

总之,这类型的结构中,只要它们适用于所讨论的问题应该被使用。

回答by Marko Topolnik

In conclusion to my discussion with Brian Agnew, I submit this code that uses java.util.concurrent.Phaserto coordinate your ping-pong threads:

作为我与 Brian Agnew 讨论的结论,我提交了用于java.util.concurrent.Phaser协调您的乒乓线程的代码:

static final Phaser p = new Phaser(1);
public static void main(String[] args) {
  t("ping");
  t("pong");
}
private static void t(final String msg) {
  new Thread() { public void run() {
    while (true) {
      System.out.println(msg);
      p.awaitAdvance(p.arrive()+1);
    }
  }}.start();
}

The key difference between this solution and the one you attempted to code is that your solution busy-checks a flag, thereby wasting CPU time (and energy!). The correct approach is to use blocking methods that put a thread to sleep until it is notified of the relevant event.

此解决方案与您尝试编码的解决方案之间的主要区别在于,您的解决方案忙于检查标志,从而浪费 CPU 时间(和精力!)。正确的方法是使用阻塞方法让线程休眠,直到它收到相关事件的通知。

回答by Brian Agnew

Each instance of the PingPongThreadis synchronising on itself and noton a shared resource. In order to control the message passing you'll need to synchronise on a shared resource (e.g. your turnvariable ?)

的每个实例PingPongThread都在其自身上进行同步,而不是在共享资源上进行同步。为了控制消息传递,您需要在共享资源上进行同步(例如您的turn变量?)

However I don't think this is really going to work. I think you should check out wait()and notify()to do this (if you want to understand the threading primitives). See thisfor an example.

但是我不认为这真的会奏效。我认为您应该检查wait()notify()执行此操作(如果您想了解线程原语)。请参阅示例。

回答by DomenicoC

My solution is this :

我的解决方案是这样的:

public class InfinitePingPong extends Thread  {

    private static final Object lock= new Object();

private String toPrintOut;

    public InfinitePingPong(String s){
        this.toPrintOut = s;
    }


    public void run(){
        while (true){
            synchronized(lock){
                System.out.println(this.toPrintOut +" -->"+this.getId()); 
                lock.notifyAll();

                try {
                    lock.wait();
                } catch (InterruptedException e) {}
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {


        InfinitePingPong a = new InfinitePingPong("ping");
        InfinitePingPong b = new InfinitePingPong("pong");


        a.start();
        b.start();

        b.wait();

        try {
            a.join();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }








}
}

回答by Priyesh Jaiswal

One option is using SynchronousQueue .

一种选择是使用 SynchronousQueue 。

import java.util.concurrent.SynchronousQueue;

public class PingPongPattern {

    private SynchronousQueue<Integer> q = new SynchronousQueue<Integer>();
    private Thread t1 = new Thread() {

        @Override
        public void run() {
            while (true) {

                // TODO Auto-generated method stub
                super.run();
                try {

                    System.out.println("Ping");
                    q.put(1);
                    q.put(2);
                } catch (Exception e) {

                }
            }
        }

    };

    private Thread t2 = new Thread() {

        @Override
        public void run() {

            while (true) {
                // TODO Auto-generated method stub
                super.run();
                try {
                    q.take();
                    System.out.println("Pong");
                    q.take();

                } catch (Exception e) {

                }

            }

        }

    };

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        PingPongPattern p = new PingPongPattern();
        p.t1.start();
        p.t2.start();
    }
}

回答by drlolly

Here is a Ping Pong program written in Java. Ping and Pong are separate threads. Each thread is both a consumer and a producer. When each thread runs it does two things

这是一个用 Java 编写的 Ping Pong 程序。Ping 和 Pong 是独立的线程。每个线程既是消费者又是生产者。当每个线程运行时,它会做两件事

  1. Produce a message that allows the other (as a consumer) to run
  2. Consume a message that causes itself to suspend.
  1. 生成一条消息,允许另一个(作为消费者)运行
  2. 使用导致自身挂起的消息。

The code is based upon Oracles ProducerConsumerExample. Note that the Ping and Pong classes are almost identical in their code and in their behaviour. The threads in the OP's code only uses the ‘mutual exclusion' part of the objects monitor (as Brian Agnew suggested above). They never invoke a wait. Hence they only exclude one another, but never invoke the java run time to allow the other thread to run.

该代码基于 Oracles ProducerConsumerExample。请注意,Ping 和 Pong 类的代码和行为几乎相同。OP 代码中的线程仅使用对象监视器的“互斥”部分(如上面的 Brian Agnew 所建议的)。他们从不调用等待。因此,它们只是相互排斥,但从不调用 java 运行时来允许另一个线程运行。

/*
 * Copyright (c) 1995, 2008, Oracle and/or its affiliates. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *   - Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *   - Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 *   - Neither the name of Oracle or the names of its
 *     contributors may be used to endorse or promote products derived
 *     from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

 * based on oracle example on sync-wait-notify
 * cf. https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
 * run with java ProducerConsumerExample
 * 
 *
 */ 

public class ProducerConsumerExample {
    public static void main(String[] args) {
        Drop drop = new Drop();
    DropCtoP dropCtoP = new DropCtoP();
    (new Thread(new Ping(drop,dropCtoP))).start();
        (new Thread(new Pong(drop,dropCtoP))).start();
    }
}


public class Pong implements Runnable {
    private Drop drop;
    private DropCtoP dropCtoP;
    private int count=0;

    public Pong(Drop drop,DropCtoP dropCtoP) {
        this.drop = drop;
        this.dropCtoP = dropCtoP;
    }

    public void run() {
        String message;
        for (;;) {
        count++;
            message = drop.take();
            System.out.format("Pong running - : %s - ran num times %d %n", message,count);
            dropCtoP.put("Run ping token");
        }
    }
}



public class Ping implements Runnable {
    private Drop drop;
    private DropCtoP dropCtoP;
    private int count=0;

    public Ping(Drop drop,DropCtoP dropCtoP) {
        this.drop = drop;
        this.dropCtoP = dropCtoP;
    }

    public void run() {

        String message;
        for (;;) {
      count++;
      drop.put("Run pong token");
      message = dropCtoP.take();
      System.out.format("PING running - : %s- ran num times %d %n", message,count);
        }

    }
}



public class DropCtoP {
    // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty2 = true;


    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty2) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty2 = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty2) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty2 = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }    
}


public class Drop {
    // Message sent from producer
    // to consumer.
    private String message;
    // True if consumer should wait
    // for producer to send message,
    // false if producer should wait for
    // consumer to retrieve message.
    private boolean empty = true;

    public synchronized String take() {
        // Wait until message is
        // available.
        while (empty) {
            try {
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = true;
        // Notify producer that
        // status has changed.
        notifyAll();
        return message;
    }

    public synchronized void put(String message) {
        // Wait until message has
        // been retrieved.
        while (!empty) {
            try { 
                wait();
            } catch (InterruptedException e) {}
        }
        // Toggle status.
        empty = false;
        // Store message.
        this.message = message;
        // Notify consumer that status
        // has changed.
        notifyAll();
    }


}

回答by Victor

One of the possible implementations:

可能的实现之一:

public class PingPongDemo {

    private static final int THREADS = 2;

    private static int nextIndex = 0;

    private static String getMessage(int index) {
        return index % 2 == 0 ? "ping" : "pong";
    }

    public static void main(String[] args) throws Throwable {
        var lock = new ReentrantLock();

        var conditions = new Condition[THREADS];
        for (int i = 0; i < conditions.length; i++) {
            conditions[i] = lock.newCondition();
        }

        for (int i = 0; i < THREADS; i++) {
            var index = i;

            new Thread(() -> {
                lock.lock();
                try {
                    while (true) {
                        System.out.println(getMessage(index));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }

                        nextIndex = (nextIndex + 1) % THREADS;

                        conditions[nextIndex].signal();

                        while (nextIndex != index) {
                            conditions[index].awaitUninterruptibly();
                        }
                    }
                } finally {
                    lock.unlock();
                }
            }).start();

            if (index < THREADS - 1) {
                lock.lock();
                try {
                    while (nextIndex != (index + 1)) {
                        conditions[index + 1].awaitUninterruptibly();
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

Here, we're effectively making round-robin output.

在这里,我们有效地进行循环输出。

回答by Serge Aleynikov

Here is a version that uses Semaphoreobjects to accomplish synchronization:

这是一个使用Semaphore对象来完成同步的版本:

import java.util.concurrent.*;

public class Main {
    @FunctionalInterface
    public interface QuadFunction<T, U, V, W, R> {
        public R apply(T t, U u, V v, W w);
    }

    public static void main(String[] args) {
        ExecutorService svc = Executors.newFixedThreadPool(2);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Terminating...");
            svc.shutdownNow();
            try { svc.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); }
            catch(InterruptedException e) {};
        }));

        var sem1 = new Semaphore(1);
        var sem2 = new Semaphore(0);

        QuadFunction<String, String, Semaphore, Semaphore, Runnable> fun =
            (name, action, s1, s2) ->
                (Runnable) () -> {
                    try {
                        while (true) {
                            s1.acquire();
                            System.out.format("%s %s\n", name, action);
                            Thread.sleep(500);
                            s2.release(1);
                        }
                    } catch (InterruptedException e) {}
                    s2.release(1);
                    System.out.format("==> %s shutdown\n", name);
                };

        svc.execute(fun.apply("T1", "ping", sem1, sem2));
        svc.execute(fun.apply("T2", "pong", sem2, sem1));
    }
}