java 如何将 RxJava combineLatest 运算符与 9 个以上的 observables 一起使用
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/38811923/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use RxJava combineLatest operator with more than 9 observables
提问by user3762200
I'm using RxJava and I want to combine 12 different observables using the operator combineLatest.
我正在使用 RxJava 并且我想使用 operator 组合 12 个不同的 observables combineLatest。
I saw a function prototype that takes a list of observables and an implementation of FuncNbut I'm not sure how to do this, I'm having trouble implementing the callmethod.
我看到了一个函数原型,它需要一个可观察的列表和一个实现,FuncN但我不知道如何做到这一点,我在实现该call方法时遇到了麻烦。
Can someone show me an example?
有人可以给我举个例子吗?
回答by Egor Neliuba
There is a combineLatestthat takes a Listof observables. Here's an example on how to use it:
有一个combineLatest需要一个List观察值。这是有关如何使用它的示例:
List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
@Override
public String call(Object... args) {
String concat = "";
for (Object value : args) {
if (value instanceof Integer) {
concat += (Integer) value;
} else if (value instanceof String) {
concat += (String) value;
}
}
return concat;
}
});
回答by TacoEater
Yo expand on that answer, I am using it to read multiple characteristics at once, it can be done like so:
哟扩展那个答案,我用它来一次读取多个特征,它可以这样做:
connectionObservable
.flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
List<Observable<?>> list1 = Arrays.asList(
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...));
return Observable.combineLatest(list1, args -> {
Object o = doSomethingWithResults(args);
return o;
});
})
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearConnectionSubscription)
.subscribe(retVal -> {
Log.d(TAG, "result:" + retVal.toString());
Log.w(TAG, "SUCCESS");
triggerDisconnect();
}, MyActivity.this::onReadFailure);
}
Comments if you have suggestions on how to improve this process.
如果您对如何改进此过程有任何建议,请发表评论。
回答by android
RxKotlinsupports upto 9 opertators in parameters in combineLatest() method but to use more than 9 parameters means to pass unlimited dynamic custom object arraylist you can use it as below:
RxKotlin在 combineLatest() 方法的参数中支持最多9 个操作符,但是使用超过 9 个参数意味着传递无限的动态自定义对象数组列表,您可以使用它,如下所示:
First Let me give you simple example with only two parameters with custom data types
首先让我给你一个简单的例子,只有两个自定义数据类型的参数
val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Now what if i want to pass multiple parameters in combineLatest? Then your answer is below: (i have used custom data types, so someone's custom problem can also be solved here)
现在如果我想在 combineLatest 中传递多个参数怎么办?那么你的答案如下:(我使用了自定义数据类型,所以也可以在这里解决某人的自定义问题)
val myList = arrayOf(Observable.just("MyName"),
Observable.just(2),
Observable.just(3.55),
Observable.just("My Another String"),
Observable.just(5),
Observable.just(6),
Observable.just(7),
Observable.just(8),
Observable.just(9),
Observable.just(10),
Observable.just(11),
Observable.just(12),
Observable.just(13),
Observable.just(14),
Observable.just(15))
Observable.combineLatest(myList, {
val a = it[0] as String
val b = it[1] as Int
val c = it[2] as Float
val d = it[3] as String
val e = it[4] as Int
val f = it[5] as Int
val g = it[6] as Int
val h = it[7] as Int
val i = it[8] as Int
val j = it[9] as Int
val k = it[10] as Int
val l = it[11] as Int
val m = it[12] as Int
"$a - age:${b}" })
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
回答by Shriram Kadam
I am using RxJava and I want to combine 12 different observables using the operator combineLatest.
我正在使用 RxJava 并且我想使用操作符 combineLatest 组合 12 个不同的观察值。
I saw a function prototype that takes a list of observables and an implementation. but I am not sure how to do this, I am having trouble implementing the call method. Please check my code and do the needful.
我看到了一个函数原型,它带有一个可观察的列表和一个实现。但我不知道该怎么做,我在实现 call 方法时遇到了麻烦。请检查我的代码并执行必要的操作。
Stream> get cities => _citiesController.stream;
流> 获取城市 => _citiesController.stream;
Stream get city => _cityController.stream;
流获取城市 => _cityController.stream;
Stream get agentcity => _agentcityController.stream;
流获取 agentcity => _agentcityController.stream;
Stream get userpackages => _packagesController.stream;
流获取用户包 => _packagesController.stream;
Stream get email => _emailController.stream.transform(validateEmail);
流获取电子邮件 => _emailController.stream.transform(validateEmail);
Stream get firstName => _firstNameController.stream.transform(validateFirstName);
Stream get firstName => _firstNameController.stream.transform(validateFirstName);
Stream get lastName => _lastNameController.stream.transform(validateLastName);
流获取 lastName => _lastNameController.stream.transform(validateLastName);
Stream get mobileNumber => _mobileNumberController.stream.transform(validateMobile);
Stream get mobileNumber => _mobileNumberController.stream.transform(validateMobile);
Stream get dob => _dobController.stream.transform(validatedob);
流获取 dob => _dobController.stream.transform(validatedob);
Stream get appointmentdate => _appointmentdateController.stream.transform(validateappointmentDate);
流获取约会日期 => _appointmentdateController.stream.transform(validateappointmentDate);
Stream get pincode => _pincodeController.stream.transform(validatePincode);
流获取密码 => _pincodeController.stream.transform(validatePincode);
Stream get gender => _genderController.stream;
流获取性别 => _genderController.stream;
Stream get address => _addressController.stream.transform(validateAddress);
流获取地址 => _addressController.stream.transform(validateAddress);
Stream get agentname => _agentnameController.stream.transform(validateAgentName);
Stream get agentname => _agentnameController.stream.transform(validateAgentName);
Stream get validSubmission => Observable.combineLatest9(
流获取 validSubmission => Observable.combineLatest9(
email,
firstName,
mobileNumber,
pincode,
dob,
address,
agentname,
_genderController.stream,
_cityController.stream,
_agentcityController.stream,
_packagesController.stream,
_appointmentdateController.stream,
(e, f, m, p, d, a, an, g, c, ac, pc, ad) => true,
); Please let me know how to use combineLatest in my code with Flutter
); 请让我知道如何在我的 Flutter 代码中使用 combineLatest
回答by peresisUser
To expand on Egor Neliuba's answer, you can aggregate all the results inside a container object, and then use it as you will inside the subscribe clause:
要扩展 Egor Neliuba 的答案,您可以将所有结果聚合到一个容器对象中,然后像在 subscribe 子句中一样使用它:
List<Observable<?>> list = new ArrayList<>();
list.add(mCreateMarkupFlowManager.getFlowState());
list.add(mCreateIssueFlowStateManager.getIssueFlowState());
list.add(mViewerStateManager.getMarkupLoadingProgressChanges());
list.add(mViewerStateManager.getIssueLoadingProgressChanges());
list.add(mMeasurementFlowStateManager.getFlowState());
list.add(mViewerStateManager.isSheetLoaded());
list.add(mProjectDataManager.isCreateFieldIssueEnabledForCurrentProject().distinctUntilChanged());
list.add(mViewerStateManager.getMarkupViewMode());
list.add(mViewerStateManager.isFirstPerson());
list.add(mProjectDataManager.isCreateRfiEnabledForCurrentProject().distinctUntilChanged());
list.add(mCreateRfiFlowStateManager.getRfiFlowState());
attachSubscription(Observable.combineLatest(list, args -> {
Holder holder = new Holder();
holder.setFirst((String) args[0]);
holder.setSecond((Integer) args[1]);
holder.setThird((Boolean) args[2]);
holder.setFourth((Boolean) args[3]);
holder.setFifth((String) args[4]);
holder.setSixth((Boolean) args[5]);
holder.setSeventh((Boolean) args[6]);
holder.setEighth((Boolean) args[7]);
holder.setNinth((Boolean) args[8]);
holder.setTenth((Boolean) args[9]);
holder.setEleventh((String) args[10]);
return holder;
})
.filter(holder -> Util.isTrue(holder.sixth))
.compose(Util.applySchedulers())
.subscribe(holder -> {
if (isViewAttached()) {
String createMarkupState = holder.first;
Integer createIssueState = holder.second;
boolean markupsLoadingFinished = holder.third;
boolean issuesLoadingFinished = holder.fourth;
boolean loadingFinished = markupsLoadingFinished && issuesLoadingFinished;
String measurementState = holder.fifth;
boolean isMarkupLockMode = holder.eighth;
boolean showCreateMarkupButton = shouldShowCreateMarkupButton();
boolean showCreateMeasureButton = shouldShowMeasureButton();
boolean showCreateFieldIssueButton = holder.seventh;
boolean isFirstPersonEnabled = holder.ninth;
Boolean showCreateRfiButton = holder.tenth;
String rfiFlowState = holder.eleventh;
}
})
);
public class Holder {
public String first;
public Integer second;
public Boolean third;
public Boolean fourth;
public String fifth;
public Boolean sixth;
public Boolean seventh;
public Boolean eighth;
public Boolean ninth;
public Boolean tenth;
public String eleventh;
public void setEleventh(String eleventh) {
this.eleventh = eleventh;
}
public void setFirst(String first) {
this.first = first;
}
public void setSecond(Integer second) {
this.second = second;
}
public void setThird(Boolean third) {
this.third = third;
}
public void setFourth(Boolean fourth) {
this.fourth = fourth;
}
public void setFifth(String fifth) {
this.fifth = fifth;
}
public void setSixth(Boolean sixth) {
this.sixth = sixth;
}
public void setSeventh(Boolean seventh) {
this.seventh = seventh;
}
public void setEighth(Boolean eighth) {
this.eighth = eighth;
}
public void setNinth(Boolean ninth) {
this.ninth = ninth;
}
public void setTenth(Boolean tenth) {
this.tenth = tenth;
}
public Holder() {}
}
回答by DarthHater
from above code. replace each line
从上面的代码。替换每一行
rxBleConnection.readCharacteristic(UUID...),
with
和
rxBleConnection.readCharacteristic(UUID...).onErrorResumeNext { bytes -> Observable.just(new byte[0])) },
essentially you are returning an empty byte array if a certain characteristic is not found. code will continue
本质上,如果未找到某个特征,您将返回一个空字节数组。代码将继续

