Skip navigation links

Eclipse MicroProfile Reactive Streams Operators API v3.0

@Version(value="1.0")

Package org.eclipse.microprofile.reactive.streams.operators

MicroProfile Reactive Streams Operators.

See: Description

Package org.eclipse.microprofile.reactive.streams.operators Description

MicroProfile Reactive Streams Operators.

This provides operators for building stream graphs that consume or produce elements using the Publisher, Subscriber and Processor interfaces.

There are four primary classes used for building these graphs:

Operations on these builders may change the shape of the builder, for example, ProcessorBuilder.toList() changes the builder to a SubscriberBuilder, since the processor now has a termination stage to direct its elements to.

SubscriberBuilder's are a special case, in that they don't just build a Subscriber, they build a CompletionSubscriber, which encapsulates both a Subscriber and a CompletionStage of the result of the subscriber processing. This CompletionStage will be redeemed with a result when the stream terminates normally, or if the stream terminates with an error, will be redeemed with an error. The result is specific to whatever the Subscriber does, for example, in the case of ProcessorBuilder.toList(), the result will be a List of all the elements produced by the Processor, while in the case of ProcessorBuilder.findFirst(), it's an Optional of the first element of the stream. In some cases, there is no result, in which case the result is the Void type, and the CompletionStage is only useful for signalling normal or error termination of the stream.

The CompletionRunner builds a closed graph, in that case both a Publisher and Subscriber have been provided, and building the graph will run it and return the result of the Subscriber as a CompletionStage.

An example use of this API is perhaps you have a Publisher of rows from a database, and you want to output it as a comma separated list of lines to publish to an HTTP client request body, which expects a Publisher of ByteBuffer. Here's how this might be implemented:

   Publisher<Row> rowsPublisher = ...;

   Publisher<ByteBuffer> bodyPublisher =
     // Create a publisher builder from the rows publisher
     ReactiveStreams.fromPublisher(rowsPublisher)
       // Map the rows to CSV strings
       .map(row ->
         String.format("%s, %s, %d\n", row.getString("firstName"),
           row.getString("lastName"), row.getInt("age))
       )
       // Convert to ByteBuffer
       .map(line -> ByteBuffer.wrap(line.getBytes("utf-8")))
       // Build the publisher
       .build();

   // Make the request
   HttpClient client = HttpClient.newHttpClient();
   client.send(
     HttpRequest
       .newBuilder(new URI("http://www.foo.com/"))
       .POST(BodyProcessor.fromPublisher(bodyPublisher))
       .build()
   );
 

The documentation for each operator uses marble diagrams to visualize how the operator functions. Each element flowing in and out of the stream is represented as a coloured marble that has a value, with the operator applying some transformation or some side effect, termination and error signals potentially being passed, and for operators that subscribe to the stream, an output value being redeemed at the end.

Below is an example diagram labelling all the parts of the stream.

Example marble diagram

Skip navigation links

Eclipse MicroProfile Reactive Streams Operators API v3.0

Copyright © 2018 – 2022 Eclipse Foundation. All rights reserved.
Use is subject to license terms.