Use Observable.FromEventPattern to perform action after inactivity or count
Tag : chash , By : Magnus
Date : March 29 2020, 07:55 AM
I hope this helps . Use Buffer with a custom bufferClosingSelector. The idea here is that every buffer should be closed either after maxDuration or after maxCount items, whichever comes sooner. Each time a buffer closes, a new one is opened. var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
var reachedMaxDuration = o
.Select(_ => Observable.Timer(maxDuration, scheduler))
.Switch();
return o.Buffer(() => o
.TakeUntil(reachedMaxDuration)
.Take(maxCount)
.LastOrDefaultAsync());
});
|
Is it safe to initialize RecyclerView item in onNext of Observable
Date : March 29 2020, 07:55 AM
this will help Preparing data during scroll is quite bad idea in general (CursorAdapter is another story). I suppose in your case everything works ok because of your code if(DateUtils.isToday(date)){
formatedDate = DateFormat.format("H:m", date).toString();
} else {
formatedDate = DateFormat.format("d MMM yyyy", date).toString();
}
public class Model{
String formatedDate;
//other fields
}
Observable<List<Model>> getModels(long fromDate, long toDate);
|
Perform action AFTER subscribing to observable
Date : March 29 2020, 07:55 AM
This might help you I want to perform an action, which is executed after subscribing to an Observable. Currently I perform some BLE related operations like reading the rssi of my connection: , From doOnSubscribe documentation: public Observable<RssiState> readRssi() {
return Observable.create(emitter -> {
// this is invoked for every new subscriber
BluetoothGattCallback callback =
(BluetoothGatt gatt, int rssi, int status) -> {
emitter.onNext(new RssiState(gatt, rssi, status));
emitter.onComplete();
};
gatt.addListener(callback); // not sure about method name
gatt.readRemoteRssi(); // initiate read
});
}
|
Perform an action on Observable zip subscription
Date : March 29 2020, 07:55 AM
Hope this helps You have to return the Observable data from those http requests instead of subscribing there http1() {
return this.http.post<any>('/url1').pipe(tap(val => console.log(1)));
}
http2() {
return this.http.post<any>('/url2').pipe(tap(val => console.log(1)));
}
zip(http1(), http2()).subscribe(() => console.log(3));
|
How to pass the item indexes in Observable.fromIterable to onNext in subscribe method?
Tag : java , By : Jet Thompson
Date : March 29 2020, 07:55 AM
it should still fix some issue If you are processing the URLs sequentially, you can just introduce an index field in the Observer: Observable.fromIterable(mUrls.getGroups())
.concatMapSingle(url -> getChannels(url))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ResponseBody>() {
int index; // <----------------------------------------------------
// ...
@Override
public void onNext(ResponseBody responseBody) {
Group group = GroupParser.parseList(responseBody.byteStream(), index);
groups.put(index, group);
index++; // <---------------------------------------------------------
}
// ...
});
Observable.defer(() -> {
AtomicInteger counter = new AtomicInteger();
return Observable.fromIterable(mUrls.getGroups())
.map(url -> Pair.of(url, counter.getAndIncremenet()));
})
.flatMapSingle(urlIndex ->
getChannels(urlIndex.first)
.map(v -> Pair.of(v, urlIndex.second))
.subscribeOn(Schedulers.io())
)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Pair<ResponseBody, Integer>>() {
// ...
@Override
public void onNext(Pair<ResponseBody, Integer> pair) {
Group group = GroupParser.parseList(pair.first.byteStream(), pair.second);
groups.put(pair.second, group);
}
// ...
});
|