Java 如何向不同的 Web 服务发送多个异步请求?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/21670451/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to send multiple asynchronous requests to different web services?
提问by J888
I need to send multiple requests to many different web services and receive the results. The problem is that, if I send the requests one by one it takes so long as I need to send and process all individually.
我需要向许多不同的 Web 服务发送多个请求并接收结果。问题是,如果我一个一个地发送请求,那么我需要单独发送和处理所有请求所需的时间。
I am wondering how I can send all the requests at once and receive the results.
我想知道如何一次发送所有请求并接收结果。
As the following code shows, I have three major methods and each has its own sub methods. Each sub method sends request to its associated web service and receive the results;therefore, for example, to receive the results of web service 9 I have to wait till all web services from 1 to 8 get completed, it takes a long time to send all the requests one by one and receive their results.
如以下代码所示,我有三个主要方法,每个方法都有自己的子方法。每个子方法向其关联的Web服务发送请求并接收结果;因此,例如,要接收Web服务9的结果我必须等到1到8的所有Web服务都完成,发送需要很长时间所有的请求一一接收它们的结果。
As shown below none of the methods nor sub-methods are related to each other, so I can call them all and receive their results in any order, the only thing which is important is to receive the results of each sub-method and populate their associated lists.
如下所示,方法和子方法都没有相互关联,所以我可以按任何顺序调用它们并接收它们的结果,唯一重要的是接收每个子方法的结果并填充它们相关列表。
private List<StudentsResults> studentsResults = new ArrayList();
private List<DoctorsResults> doctorsResults = new ArrayList();
private List<PatientsResults> patientsResults = new ArrayList();
main (){
retrieveAllLists();
}
retrieveAllLists(){
retrieveStudents();
retrieveDoctors();
retrievePatients();
}
retrieveStudents(){
this.studentsResults = retrieveStdWS1(); //send request to Web Service 1 to receive its list of students
this.studentsResults = retrieveStdWS2(); //send request to Web Service 2 to receive its list of students
this.studentsResults = retrieveStdWS3(); //send request to Web Service 3 to receive its list of students
}
retrieveDoctors(){
this.doctorsResults = retrieveDocWS4(); //send request to Web Service 4 to receive its list of doctors
this.doctorsResults = retrieveDocWS5(); //send request to Web Service 5 to receive its list of doctors
this.doctorsResults = retrieveDocWS6(); //send request to Web Service 6 to receive its list of doctors
}
retrievePatients(){
this.patientsResults = retrievePtWS7(); //send request to Web Service 7 to receive its list of patients
this.patientsResults = retrievePtWS8(); //send request to Web Service 8 to receive its list of patients
this.patientsResults = retrievePtWS9(); //send request to Web Service 9 to receive its list of patients
}
回答by Kumar
It has got various option to develop this.
它有多种选择来开发这个。
回答by Niels Bech Nielsen
That is a simple fork-join approach, but for clarity, you can start any number of threads and retrieve the results later as they are available, such as this approach.
这是一种简单的 fork-join 方法,但为了清楚起见,您可以启动任意数量的线程,并在结果可用时检索结果,例如这种方法。
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Callable<String>> tasks = new ArrayList<>();
tasks.add(new Callable<String>() {
public String call() throws Exception {
Thread.sleep((new Random().nextInt(5000)) + 500);
return "Hello world";
}
});
List<Future<String>> results = pool.invokeAll(tasks);
for (Future<String> future : results) {
System.out.println(future.get());
}
pool.shutdown();
UPDATE, COMPLETE:
更新,完成:
Here's a verbose, but workable solution. I wrote it ad hoc, and have not compiled it. Given the three lists have diffent types, and the WS methods are individual, it is not really modular, but try to use your best programming skills and see if you can modularize it a bit better.
这是一个冗长但可行的解决方案。我是临时写的,还没有编译。鉴于这三个列表的类型不同,而且 WS 方法各不相同,它并不是真正的模块化,但请尝试使用您最好的编程技能,看看您是否可以将其模块化得更好一点。
ExecutorService pool = Executors.newFixedThreadPool(10);
List<Callable<List<StudentsResults>>> stasks = new ArrayList<>();
List<Callable<List<DoctorsResults>>> dtasks = new ArrayList<>();
List<Callable<List<PatientsResults>>> ptasks = new ArrayList<>();
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS1();
}
});
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS2();
}
});
stasks.add(new Callable<List<StudentsResults>>() {
public List<StudentsResults> call() throws Exception {
return retrieveStdWS3();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS4();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS5();
}
});
dtasks.add(new Callable<List<DoctorsResults>>() {
public List<DoctorsResults> call() throws Exception {
return retrieveDocWS6();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS7();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS8();
}
});
ptasks.add(new Callable<List<PatientsResults>>() {
public List<PatientsResults> call() throws Exception {
return retrievePtWS9();
}
});
List<Future<List<StudentsResults>>> sresults = pool.invokeAll(stasks);
List<Future<List<DoctorsResults>>> dresults = pool.invokeAll(dtasks);
List<Future<List<PatientsResults>>> presults = pool.invokeAll(ptasks);
for (Future<List<StudentsResults>> future : sresults) {
this.studentsResults.addAll(future.get());
}
for (Future<List<DoctorsResults>> future : dresults) {
this.doctorsResults.addAll(future.get());
}
for (Future<List<PatientsResults>> future : presults) {
this.patientsResults.addAll(future.get());
}
pool.shutdown();
Each Callable
returns a list of results, and is called in its own separate thread.
When you invoke the Future.get()
method you get the result back onto the main thread.
The result is NOTavailable until the Callable
have finished, hence there is no concurrency issues.
每个都Callable
返回一个结果列表,并在其自己的单独线程中调用。
当您调用该Future.get()
方法时,您会将结果返回到主线程。
结果在完成之前不可用Callable
,因此不存在并发问题。
回答by Hussain Pirosha
Looking at the problem, you need to integrate your application with 10+ different webservices.While making all the calls asynchronous. This can be done easily with Apache Camel. It is a prominent framework for enterprise integration and also supports async processing. You can use its CXF component for calling webservices and its routing engine for invocation and processing results. Look at the following pageregarding camel's async routing capability. They have also provided a complete example invoking webservices async using CXF, it available at its maven repo. Also see the following pagefor more details.
查看问题,您需要将您的应用程序与 10 多个不同的 web 服务集成。同时使所有调用异步。这可以使用 Apache Camel 轻松完成。它是企业集成的突出框架,还支持异步处理。您可以使用其 CXF 组件来调用 Web 服务,并使用其路由引擎来调用和处理结果。请看下面的页面关于骆驼的异步路由功能。他们还提供了一个使用 CXF 异步调用 webservices 的完整示例,它可以在其 maven repo 中找到。另请参阅以下页面了解更多详情。
回答by D-Klotz
So just for fun I am providing two working examples. The first one shows the old school way of doing this before java 1.5. The second shows a much cleaner way using tools available within java 1.5:
所以只是为了好玩,我提供了两个工作示例。第一个显示了在 java 1.5 之前执行此操作的老派方法。第二个显示了使用 Java 1.5 中可用工具的更简洁的方法:
import java.util.ArrayList;
public class ThreadingExample
{
private ArrayList <MyThread> myThreads;
public static class MyRunnable implements Runnable
{
private String data;
public String getData()
{
return data;
}
public void setData(String data)
{
this.data = data;
}
@Override
public void run()
{
}
}
public static class MyThread extends Thread
{
private MyRunnable myRunnable;
MyThread(MyRunnable runnable)
{
super(runnable);
setMyRunnable(runnable);
}
/**
* @return the myRunnable
*/
public MyRunnable getMyRunnable()
{
return myRunnable;
}
/**
* @param myRunnable the myRunnable to set
*/
public void setMyRunnable(MyRunnable myRunnable)
{
this.myRunnable = myRunnable;
}
}
public ThreadingExample()
{
myThreads = new ArrayList <MyThread> ();
}
public ArrayList <String> retrieveMyData ()
{
ArrayList <String> allmyData = new ArrayList <String> ();
if (isComplete() == false)
{
// Sadly we aren't done
return (null);
}
for (MyThread myThread : myThreads)
{
allmyData.add(myThread.getMyRunnable().getData());
}
return (allmyData);
}
private boolean isComplete()
{
boolean complete = true;
// wait for all of them to finish
for (MyThread x : myThreads)
{
if (x.isAlive())
{
complete = false;
break;
}
}
return (complete);
}
public void kickOffQueries()
{
myThreads.clear();
MyThread a = new MyThread(new MyRunnable()
{
@Override
public void run()
{
// This is where you make the call to external services
// giving the results to setData("");
setData("Data from list A");
}
});
myThreads.add(a);
MyThread b = new MyThread (new MyRunnable()
{
@Override
public void run()
{
// This is where you make the call to external services
// giving the results to setData("");
setData("Data from list B");
}
});
myThreads.add(b);
for (MyThread x : myThreads)
{
x.start();
}
boolean done = false;
while (done == false)
{
if (isComplete())
{
done = true;
}
else
{
// Sleep for 10 milliseconds
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}
public static void main(String [] args)
{
ThreadingExample example = new ThreadingExample();
example.kickOffQueries();
ArrayList <String> data = example.retrieveMyData();
if (data != null)
{
for (String s : data)
{
System.out.println (s);
}
}
}
}
This is the much simpler working version:
这是更简单的工作版本:
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadingExample
{
public static void main(String [] args)
{
ExecutorService service = Executors.newCachedThreadPool();
Set <Callable<String>> callables = new HashSet <Callable<String>> ();
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service A, and put its results here";
}
});
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service B, and put its results here";
}
});
callables.add(new Callable<String>()
{
@Override
public String call() throws Exception
{
return "This is where I make the call to web service C, and put its results here";
}
});
try
{
List<Future<String>> futures = service.invokeAll(callables);
for (Future<String> future : futures)
{
System.out.println (future.get());
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (ExecutionException e)
{
e.printStackTrace();
}
}
}
回答by ErstwhileIII
You might consider the following paradigm in which you create work (serially), but the actual work is done in parallel. One way to do this is to: 1) have your "main" create a queue of work items; 2) create a "doWork" object that queries the queue for work to do; 3) have "main" start some number of "doWork" threads (can be same number as number of different services, or a smaller number); have the "doWork" objects put add their results to an object list (whatever construct works Vector, list...).
您可能会考虑以下范式,在其中创建工作(串行),但实际工作是并行完成的。一种方法是:1)让您的“主”创建一个工作项队列;2)创建一个“doWork”对象,查询队列中的待办工作;3)有“main”启动一些“doWork”线程(可以与不同服务的数量相同,或者更少);让“doWork”对象将它们的结果添加到对象列表中(无论构造如何工作,向量、列表...)。
Each "doWork" object would mark their queue item complete, put all results in the passed container and check for new work (if no more on the queue, it would sleep and try again).
每个“doWork”对象都会将它们的队列项标记为完成,将所有结果放入传递的容器中并检查新工作(如果队列中没有更多工作,它将休眠并重试)。
Of course you will want to see how well you can construct your class model. If each of the webservices is quite different for parsing, then you may want to create an Interface that each of your "retrieveinfo" classes promises to implement.
当然,你会想看看你能多好地构建你的类模型。如果每个 Web 服务在解析方面都大不相同,那么您可能希望创建一个接口,您的每个“检索信息”类都承诺实现该接口。
回答by flup
You can ask your jax-ws
implementation to generate asynchronous bindings for the web service.
您可以要求您的jax-ws
实现为 Web 服务生成异步绑定。
This has two advantages that I can see:
我可以看到这有两个优点:
- As discussed in Asynchronous web services calls with JAX-WS: Use wsimport support for asynchrony or roll my own?,
jax-ws
will generate well-tested (and possibly fancier) code for you, you need not instantiate the ExecutorService yourself. So less work for you! (but also less control over the threading implementation details) - The generated bindings include a method where you specify a callback handler, which may suit your needs better than synchronously
get()
ting all response lists on the thread callingretrieveAllLists()
. It allows for per-service-call error handling and will process the results in parallel, which is nice if processing is non-trivial.
- 如使用 JAX-WS 的异步 Web 服务调用中所述:使用 wsimport 支持异步还是自己实现?,
jax-ws
将为您生成经过良好测试(并且可能更高级)的代码,您无需自己实例化 ExecutorService 。所以你的工作更少!(但对线程实现细节的控制也较少) - 生成的绑定包括一个方法,您可以在其中指定回调处理程序,这可能比同步
get()
调用线程上的所有响应列表更适合您的需求retrieveAllLists()
。它允许对每个服务调用的错误进行处理,并将并行处理结果,如果处理很重要,这很好。
An example for Metro can be found on the Metro site. Note the contents of the custom bindings file custom-client.xml:
可以在 Metro 站点上找到 Metro 的示例。请注意自定义绑定文件custom-client.xml 的内容:
<bindings ...>
<bindings node="wsdl:definitions">
<enableAsyncMapping>true</enableAsyncMapping>
</bindings>
</bindings>
When you specify this bindings file to wsimport
, it'll generate a client which returns an object that implements javax.xml.ws.Response<T>
. Response
extends the Future
interface that others also suggest you use when rolling your own implementation.
当您将此绑定文件指定为 时wsimport
,它将生成一个客户端,该客户端返回一个实现javax.xml.ws.Response<T>
. Response
扩展Future
其他人在滚动您自己的实现时也建议您使用的接口。
So, unsurprisingly, if you go without the callbacks, the code will look similar to the other answers:
因此,不出所料,如果您不使用回调,代码将与其他答案类似:
public void retrieveAllLists() throws ExecutionException{
// first fire all requests
Response<List<StudentsResults>> students1 = ws1.getStudents();
Response<List<StudentsResults>> students2 = ws2.getStudents();
Response<List<StudentsResults>> students3 = ws3.getStudents();
Response<List<DoctorsResults>> doctors1 = ws4.getDoctors();
Response<List<DoctorsResults>> doctors2 = ws5.getDoctors();
Response<List<DoctorsResults>> doctors3 = ws6.getDoctors();
Response<List<PatientsResults>> patients1 = ws7.getPatients();
Response<List<PatientsResults>> patients2 = ws8.getPatients();
Response<List<PatientsResults>> patients3 = ws9.getPatients();
// then await and collect all the responses
studentsResults.addAll(students1.get());
studentsResults.addAll(students2.get());
studentsResults.addAll(students3.get());
doctorsResults.addAll(doctors1.get());
doctorsResults.addAll(doctors2.get());
doctorsResults.addAll(doctors3.get());
patientsResults.addAll(patients1.get());
patientsResults.addAll(patients2.get());
patientsResults.addAll(patients3.get());
}
If you create callback handers such as
如果您创建回调处理程序,例如
private class StudentsCallbackHandler
implements AsyncHandler<Response<List<StudentsResults>>> {
public void handleResponse(List<StudentsResults> response) {
try {
studentsResults.addAll(response.get());
} catch (ExecutionException e) {
errors.add(new CustomError("Failed to retrieve Students.", e.getCause()));
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
you can use them like this:
你可以像这样使用它们:
public void retrieveAllLists() {
List<Future<?>> responses = new ArrayList<Future<?>>();
// fire all requests, specifying callback handlers
responses.add(ws1.getStudents(new StudentsCallbackHandler()));
responses.add(ws2.getStudents(new StudentsCallbackHandler()));
responses.add(ws3.getStudents(new StudentsCallbackHandler()));
...
// await completion
for( Future<?> response: responses ) {
response.get();
}
// or do some other work, and poll response.isDone()
}
Note that the studentResults collection needs to be thread safe now, since results will get added concurrently!
请注意, studentResults 集合现在需要是线程安全的,因为结果将同时添加!