How to set up blocking asynchronous request/response in project reactor?

How to set up blocking asynchronous request/response in project reactor?

Content Index :

How to set up blocking asynchronous request/response in project reactor?
Tag : java , By : Bobblegate
Date : December 01 2020, 04:52 PM

hop of those help? I arrived at a working solution, but I'm not sure if it's the best:
I set up a Mono for sending the asynchronous message and merge it with a Flux that filters for a matching message. Seeing the Mono never emits values, I know the first object from the merge is the response message from my Flux, so I can cast it to the correct type.
    public AntMessage sendBlocking(AntBlockingMessage requestMessage) {
        Flux<AntMessage> response = this.antUsbReader.antMessages()
                .filter(responseMessage -> isMatchingResponse(requestMessage, responseMessage))

        Mono<Void> messageSender = Mono.fromRunnable(() -> this.antUsbWriter.write(requestMessage));
        return (AntMessage) Flux.merge(response, messageSender).blockFirst(Duration.ofSeconds(1));

    private boolean isMatchingResponse(AntBlockingMessage message, AntMessage response) {
        if (message instanceof RequestMessage) {
            return response.getMessageId() == ((RequestMessage) message).getMsgIdRequested();
        return response.getMessageId() == message.getMessageId();

No Comments Right Now !

Boards Message :
You Must Login Or Sign Up to Add Your Comments .

Share : facebook icon twitter icon

Java Non-Blocking and Asynchronous IO with NIO & NIO.2 (JSR203) - Reactor/Proactor Implementations

Tag : java , By : Rb.Ridge
Date : March 29 2020, 07:55 AM
around this issue There are lots of factors involved in this one. I will try to summarize my findings as best as possible (aware of the fact that there is contention regarding the usefulness of reactor and proactor IO handling implementations).

Wrapping blocking I/O in project reactor

Tag : java , By : Brandon
Date : March 29 2020, 07:55 AM
With these it helps 1) Use of subscribeOn is correctly putting the JDBC work on the jdbcScheduler
2) Neither, the results of the Callable - while computed on the jdbcScheduler, are publishOn the parallel Scheduler, so your map will be executed on a thread from the Schedulers.parallel() pool (rather than hogging the jdbcScheduler).

Project Reactor Kafka: Perform action at the end of Flux without blocking

Tag : java , By : Ike
Date : March 29 2020, 07:55 AM
seems to work fine Since it seems the path variable is accessible in the whole pipeline (method input parameter?), you could delete the file within a separate doFinally. You would need to filter for onComplete or cancel SignalType, because you don't want to delete the file in case of a failure.
Another option would be doOnComplete if you're not interested in deleting the file upon cancellation.

Is it possible to wait for a event without blocking a thread with Project Reactor?

Tag : java , By : Keonne Rodriguez
Date : March 29 2020, 07:55 AM
will be helpful for those in need Yes, it is possible. You can use reactor.core.publisher.Mono#create method to achieve it
For your example:
public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";

    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);

    private void processResponse(Response response) {

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";


            } catch (InterruptedException e) {

Blocking I/O in Project Reactor pipeline

Tag : java , By : shehan
Date : March 29 2020, 07:55 AM
I hope this helps you . It will work fine, it's robust, but the fact you're using a parallel scheduler for blocking IO work isn't optimal (and isn't particularly idiomatic; when someone experienced in reactor sees a parallel scheduler, they expect to see it running non-blocking IO.)
The better approach here would be to swap out your parallel scheduler for a bounded elastic scheduler with a cap of your choosing (10 in your example) - this will spin up and reuse backing workers as necessary, up to your cap.
Related Posts Related QUESTIONS :
  • Spring Boot application with a jar dependency does not run after maven build
  • Spring Data JPA query , filter ? search engine ? JPQL?
  • Why LiveData returns null in ViewModel?
  • what this line of code mean....new URLClassLoader(new URL[0],getClass().getClassLoader());
  • Why do need to use new Random() instead of just Random Randomnum?
  • I want to access zk components from the java file
  • How do I cast FieldValue.serverTimestamp() to Kotlin/Java Date Class
  • Insertion Sort Double Array with User Input - JAVA
  • Creating 2 dimesional array with user input and find sum of specific columns
  • can not get Advertising ID Provider in android
  • Convert list of Objects to map of properties
  • How to represent an undirected weighted graph in java
  • Return values as array from collection
  • ByteBuddy generic method return cast to concrete type
  • ImageView hides the round corners of the parent
  • Is there a way to find setter method by its getter method or vice versa in a class?
  • Get aggregated list of properties from list of Objects(Java 8)
  • Unable to find a document in Mongodb where exact date match in java
  • UsernamePasswordAuthenticationFilter skips success handler
  • Use Java filter on stream with in a stream filter
  • Default Login not successful in spring boot 2.1.7
  • Adding key value pairs from a file to a Hashmap
  • Rub regex: matching a char except when after by another char
  • Convert Base64 String to String Array
  • Escape Unicode Character 'POPCORN' to HTML Entity
  • An empty JSON field which is a boolean/nullable field in Java model, is getting converted as null
  • Mongo java driver cannot find public constructor for interface
  • How to unit test writing a file to AWS Lambda output stream?
  • How to make a GitHub GraphQL API Call from Java
  • What's the difference between @ComponentScan and @Bean in a context configuration?
  • Expected class or package adding a view using a class
  • can be delete of a element in a static array be O(1)?
  • Instance variable heap or stack ? ( with specific example)
  • Assert progress of ProgressBar in Espresso test
  • How to detect if gson.fromjson() has excess elements
  • I cant generate the proper code to select the a specific filter on a BI dashboard I am working on
  • How to Inject Dependencies into a Servlet Filter with Spring Boot Filter Registration Bean?
  • Thrift types as a Generic
  • Effective algorithm to random 4 unique integers less than a big max such as 100_000
  • Combining or and negation in Java regex?
  • Unable to instantiate default tuplizer Exception
  • Multi-tenant migration to work with quarkus
  • Ignite persisting a Set: Cannot find metadata for object with compact footer
  • Maven cannot resolve Jacob dependency using eclipse
  • testcontainers oracle database container starts before database user is created
  • Launching two spring boot apps in integration test
  • Is there a way to add a HashMap's value that is a integer array into a ArrayList?
  • Is there any way that I can get a parameter in paintComponent?
  • Empty stack with one recursive method and one iterative method
  • What's the behavior of onBackpressureBuffer in RxJava2
  • Java regex can only use 1 quantifier in a lookback (need 2)
  • How to fix error in native query : it is showing syntax error near or at
  • How to retrieve nested object from a document and display it in FirestoreRecyclerOptions?
  • Why not use ListIterator for full LinkedList Operation?
  • Android Webview EvaluateJavascript sometimes does not return a response
  • Matcher java doesn't work but regex seems to be good
  • Finding dimensions of a .gif file
  • Java Number format how to change +/- sign to custom text
  • Entity partially saved when using JOINED inheritance strategy and setting spring.jpa.properties.hibernate.jdbc.batch_siz
  • Stored Procedure in Java Spring Boot Project returns null as Output
  • shadow
    Privacy Policy - Terms - Contact Us © scrbit.com