Java:多线程和 UDP 套接字编程
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2687238/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Java: Multithreading & UDP Socket Programming
提问by Ravi
I am new to multithreading & socket programming in Java. I would like to know what is the best way to implement 2 threads - one for receiving a socket and one for sending a socket. If what I am trying to do sounds absurd, pls let me know why! The code is largely inspired from Sun's tutorials online.I want to use Multicast sockets so that I can work with a multicast group.
我是 Java 多线程和套接字编程的新手。我想知道实现 2 个线程的最佳方法是什么 - 一个用于接收套接字,一个用于发送套接字。如果我试图做的事情听起来很荒谬,请告诉我原因!该代码主要受 Sun 在线教程的启发。我想使用多播套接字,以便我可以使用多播组。
class Server extends Thread
{
static protected MulticastSocket socket = null;
protected BufferedReader in = null;
public InetAddress group;
private static class Receive implements Runnable
{
public void run()
{
try
{
byte[] buf = new byte[256];
DatagramPacket pkt = new DatagramPacket(buf,buf.length);
socket.receive(pkt);
String received = new String(pkt.getData(),0,pkt.getLength());
System.out.println("From server@" + received);
Thread.sleep(1000);
}
catch (IOException e)
{
System.out.println("Error:"+e);
}
catch (InterruptedException e)
{
System.out.println("Error:"+e);
}
}
}
public Server() throws IOException
{
super("server");
socket = new MulticastSocket(4446);
group = InetAddress.getByName("239.231.12.3");
socket.joinGroup(group);
}
public void run()
{
while(1>0)
{
try
{
byte[] buf = new byte[256];
DatagramPacket pkt = new DatagramPacket(buf,buf.length);
//String msg = reader.readLine();
String pid = ManagementFactory.getRuntimeMXBean().getName();
buf = pid.getBytes();
pkt = new DatagramPacket(buf,buf.length,group,4446);
socket.send(pkt);
Thread t = new Thread(new Receive());
t.start();
while(t.isAlive())
{
t.join(1000);
}
sleep(1);
}
catch (IOException e)
{
System.out.println("Error:"+e);
}
catch (InterruptedException e)
{
System.out.println("Error:"+e);
}
}
//socket.close();
}
public static void main(String[] args) throws IOException
{
new Server().start();
//System.out.println("Hello");
}
}
回答by Drew
Wanting to create threads in an application is not absurd! You won't need exactly 2 threads, but I think you're talking about 2 classes that implement the Runnable interface.
想要在应用程序中创建线程并不荒谬!您不需要恰好 2 个线程,但我认为您在谈论实现 Runnable 接口的 2 个类。
The threading API has gotten better since Java 1.5 and you don't need to mess with java.lang.Thread anymore. You can simply create a java.util.concurrent.Executorand submit Runnable instances to it.
自 Java 1.5 以来,线程 API 变得更好,您不再需要与 java.lang.Thread 搞混了。您可以简单地创建一个java.util.concurrent.Executor并将 Runnable 实例提交给它。
The book Java Concurrency in Practiceuses that exact problem - creating a threaded socket server - and walks through several iterations of the code to show the best way to do it. Check out the free sample chapter, which is great. I won't copy/paste the code here, but look specifically at listing 6.8.
《Java Concurrency in Practice》一书使用了这个确切的问题——创建一个线程化的套接字服务器——并通过多次迭代代码来展示实现它的最佳方法。查看免费的示例章节,这很棒。我不会在这里复制/粘贴代码,而是专门查看清单 6.8。
回答by Kiril
First thing is first:your classes should start with a capital letter per the Java Naming Conventions:
首先是第一件事:根据Java 命名约定,您的类应该以大写字母开头:
Class names should be nouns, in mixed case with the first letter of each internal word capitalized. Try to keep your class names simple and descriptive. Use whole words-avoid acronyms and abbreviations (unless the abbreviation is much more widely used than the long form, such as URL or HTML).
类名应该是名词,大小写混合,每个内部单词的第一个字母大写。尽量保持你的类名简单和描述性。使用完整的单词——避免首字母缩略词和缩写词(除非缩写词比长格式更广泛使用,例如 URL 或 HTML)。
Second:Try to break down the code into coherent sections and organize them around some common feature that you're dealing with... perhaps around the functionality or the model you're programming.
第二:尝试将代码分解成连贯的部分,并围绕您正在处理的一些共同特征来组织它们……也许围绕您正在编程的功能或模型。
The (basic) model for the server is that the onlything it does is receive socket connections... the server relies on a handler to handle those connections and that's it. If you try to build that model it would look something like this:
服务器的(基本)模型是它唯一做的就是接收套接字连接……服务器依赖一个处理程序来处理这些连接,仅此而已。如果您尝试构建该模型,它将如下所示:
class Server{
private final ServerSocket serverSocket;
private final ExecutorService pool;
public Server(int port, int poolSize) throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void serve() {
try {
while(true) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// receive the datagram packets
}
}
Third:I would recommend that you look at some existing examples.
第三:我建议您查看一些现有示例。
- Multi-threaded Client/Server Applications:
http://www.ase.md/~aursu/ClientServerThreads.html - Doug Lea:
http://www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm(thanks to John)
http://gee.cs.oswego.edu/dl/cpj/index.html(still can't find the exact example, but it's there somewhere... if you feel brave look over his allcode.javafile). - Concurrency in Practice examples:
http://www.javaconcurrencyinpractice.com/listings.html - Java Concurrency Tutorials:
http://java.sun.com/docs/books/tutorial/essential/concurrency/
- 多线程客户端/服务器应用程序:http:
//www.ase.md/~aursu/ClientServerThreads.html - Doug Lea:
http: //www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm(感谢约翰)
http://gee.cs.oswego.edu/dl/cpj /index.html(仍然找不到确切的例子,但它在某处......如果你有勇气查看他的allcode.java文件)。 - 并发实践示例:http:
//www.javaconcurrencyinpractice.com/listings.html - Java 并发教程:http:
//java.sun.com/docs/books/tutorial/essential/concurrency/
Updated per comments:
OK Ravi, there are some bigissues with your code and some minorissues with it:
根据评论更新:
好的 Ravi,您的代码存在一些大问题和一些小问题:
I assume that the
Receive
class is your client... you should pull that out as a separate program (with its own main class) and run your server and multiple clients at the same time. Spawning a new "client thread" from your server for every new UDP package you send is a disturbing idea (bigissue).When you make your client application, you should make it run the receiving code in its own
while
loop (minorissue), e.g.:public class Client extends Thread { public Client(/*..*/) { // initialize your client } public void run() { while(true) { // receive UDP packets // process the UDP packets } } public static void main(String[] args) throws IOException { // start your client new Client().start(); } }
You should only need just one thread per client and one thread per server (you technically don't even a separate thread in there since main has its own thread), so you might not find the
ExecutorService
that useful.
我假设这个
Receive
类是你的客户端……你应该把它作为一个单独的程序(有它自己的主类)并同时运行你的服务器和多个客户端。为您发送的每个新 UDP 包从您的服务器生成一个新的“客户端线程”是一个令人不安的想法(大问题)。当您制作客户端应用程序时,您应该让它在自己的
while
循环中运行接收代码(小问题),例如:public class Client extends Thread { public Client(/*..*/) { // initialize your client } public void run() { while(true) { // receive UDP packets // process the UDP packets } } public static void main(String[] args) throws IOException { // start your client new Client().start(); } }
每个客户端只需要一个线程,每个服务器只需要一个线程(从技术上讲,您甚至不需要单独的线程,因为 main 有自己的线程),因此您可能不会发现
ExecutorService
有用的线程。
Otherwise your approach is correct... but I would stillrecommend that you check out some of examples.
否则你的方法是正确的......但我仍然建议你查看一些例子。
回答by Jé Queue
2 threads is fine. One reader another writer. Remember that with UDP you should not spawn new handler threads (unless what you're doing takes a long time), I recommend throwing the incoming messages into a processing Queue. The same for the send, have a send thread that blocks on an incoming Queue for UDP send.
2个线程没问题。一位读者另一位作家。请记住,对于 UDP,您不应产生新的处理程序线程(除非您正在执行的操作需要很长时间),我建议将传入的消息放入处理队列中。发送相同,有一个发送线程阻止传入队列以进行 UDP 发送。
回答by djBo
It's a good thing Eclipse's history works even for a day back :) Thanks to that, I am able to give both Ravi a working example and Lirik his answer on leakage.
Eclipse 的历史即使在一天前仍然有效,这是一件好事:) 多亏了这一点,我能够给 Ravi 一个工作示例和 Lirik 他关于泄漏的答案。
Let me first start of by stating that I have no clue what is causing this leak, but if I leave it long enough, it will fail on a OutOfMemoryError.
首先让我说我不知道是什么导致了这个泄漏,但是如果我让它保持足够长的时间,它会在OutOfMemoryError上失败。
Second, I left the working code commented out for Ravi for a working basic example of my UDP server. The timeout was there to test how long my firewall would kill the receivers end (30 seconds). Just remove anything with the pool, and you're good to go.
其次,我将 Ravi 的工作代码注释掉,作为我的 UDP 服务器的工作基本示例。超时是为了测试我的防火墙会杀死接收器多长时间(30 秒)。只需移除池中的任何东西,您就可以开始使用了。
So here is, a working but leaking version of my example threaded UDP server.
所以这里是我的示例线程 UDP 服务器的工作但泄漏版本。
public class TestServer {
private static Integer TIMEOUT = 30;
private final static int MAX_BUFFER_SIZE = 8192;
private final static int MAX_LISTENER_THREADS = 5;
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ");
private int mPort;
private DatagramSocket mSocket;
// You can remove this for a working version
private ExecutorService mPool;
public TestServer(int port) {
mPort = port;
try {
mSocket = new DatagramSocket(mPort);
mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE);
mSocket.setSendBufferSize(MAX_BUFFER_SIZE);
mSocket.setSoTimeout(0);
// You can uncomment this for a working version
//for (int i = 0; i < MAX_LISTENER_THREADS; i++) {
// new Thread(new Listener(mSocket)).start();
//}
// You can remove this for a working version
mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS);
} catch (IOException e) {
e.printStackTrace();
}
}
// You can remove this for a working version
public void start() {
try {
try {
while (true) {
mPool.execute(new Listener(mSocket));
}
} catch (Exception e) {
e.printStackTrace();
}
} finally {
mPool.shutdown();
}
}
private class Listener implements Runnable {
private final DatagramSocket socket;
public Listener(DatagramSocket serverSocket) {
socket = serverSocket;
}
private String readLn(DatagramPacket packet) throws IOException {
socket.receive(packet);
return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine();
}
private void writeLn(DatagramPacket packet, String string) throws IOException {
packet.setData(string.concat("\r\n").getBytes());
socket.send(packet);
}
@Override
public void run() {
DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
String s;
while (true) {
try {
packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE);
s = readLn(packet);
System.out.println(DateFormat.format(new Date()) + " Received: " + s);
Thread.sleep(TIMEOUT * 1000);
writeLn(packet, s);
System.out.println(DateFormat.format(new Date()) + " Sent: " + s);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
if (args.length == 1) {
try {
TIMEOUT = Integer.parseInt(args[0]);
} catch (Exception e) {
TIMEOUT = 30;
}
}
System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT);
//new TestServer(4444);
new TestServer(4444).start();
}
}
btw. @Lirik, I witnessed this behavior first in Eclipse, after which I tested it from the command line. And again, I have NO clue what is causing it ;) sorry...
顺便提一句。@Lirik,我首先在 Eclipse 中目睹了这种行为,之后我从命令行对其进行了测试。再说一次,我不知道是什么原因造成的 ;) 对不起...