T
- The type of the elements that the processor consumes.R
- The type of the elements that the processor emits.public interface ProcessorBuilder<T,R> extends TransformingOperators<R>, FilteringOperators<R>, PeekingOperators<R>, ConsumingOperators<R>, ErrorHandlingOperators<R>, ConnectingOperators<R>
Processor
.
The documentation for each operator uses marble diagrams to visualize how the operator functions. Each element flowing in and out of the stream is represented as a coloured marble that has a value, with the operator applying some transformation or some side effect, termination and error signals potentially being passed, and for operators that subscribe to the stream, an output value being redeemed at the end.
Below is an example diagram labelling all the parts of the stream.
ReactiveStreams
Modifier and Type | Method and Description |
---|---|
org.reactivestreams.Processor<T,R> |
buildRs()
Build this stream, using the first
ReactiveStreamsEngine found by the ServiceLoader . |
org.reactivestreams.Processor<T,R> |
buildRs(ReactiveStreamsEngine engine)
Build this stream, using the supplied
ReactiveStreamsEngine . |
SubscriberBuilder<T,Void> |
cancel()
Cancels the stream as soon as it is run.
|
<S,A> SubscriberBuilder<T,S> |
collect(Collector<? super R,A,S> collector)
Collect the elements emitted by this stream using the given
Collector . |
<S> SubscriberBuilder<T,S> |
collect(Supplier<S> supplier,
BiConsumer<S,? super R> accumulator)
Collect the elements emitted by this stream using a
Collector built from the given
supplier and accumulator . |
ProcessorBuilder<T,R> |
distinct()
Creates a stream consisting of the distinct elements (according to
Object.equals(Object) ) of this stream. |
ProcessorBuilder<T,R> |
dropWhile(Predicate<? super R> predicate)
Drop the longest prefix of elements from this stream that satisfy the given
predicate . |
ProcessorBuilder<T,R> |
filter(Predicate<? super R> predicate)
Filter elements emitted by this publisher using the given
Predicate . |
SubscriberBuilder<T,Optional<R>> |
findFirst()
Find the first element emitted by the
Publisher , and return it in a
CompletionStage . |
<S> ProcessorBuilder<T,S> |
flatMap(Function<? super R,? extends PublisherBuilder<? extends S>> mapper)
Map the elements to publishers, and flatten so that the elements emitted by publishers produced by the
mapper function are emitted from this stream. |
<S> ProcessorBuilder<T,S> |
flatMapCompletionStage(Function<? super R,? extends CompletionStage<? extends S>> mapper)
Map the elements to
CompletionStage , and flatten so that the elements the values redeemed by each
CompletionStage are emitted from this stream. |
<S> ProcessorBuilder<T,S> |
flatMapIterable(Function<? super R,? extends Iterable<? extends S>> mapper)
Map the elements to
Iterable 's, and flatten so that the elements contained in each iterable are
emitted by this stream. |
<S> ProcessorBuilder<T,S> |
flatMapRsPublisher(Function<? super R,? extends org.reactivestreams.Publisher<? extends S>> mapper)
Map the elements to publishers, and flatten so that the elements emitted by publishers produced by the
mapper function are emitted from this stream. |
SubscriberBuilder<T,Void> |
forEach(Consumer<? super R> action)
Performs an action for each element on this stream.
|
SubscriberBuilder<T,Void> |
ignore()
Ignores each element of this stream.
|
ProcessorBuilder<T,R> |
limit(long maxSize)
Truncate this stream, ensuring the stream is no longer than
maxSize elements in length. |
<S> ProcessorBuilder<T,S> |
map(Function<? super R,? extends S> mapper)
Map the elements emitted by this stream using the
mapper function. |
ProcessorBuilder<T,R> |
onComplete(Runnable action)
Returns a stream containing all the elements from this stream, additionally performing the provided action when this
stream completes.
|
ProcessorBuilder<T,R> |
onError(Consumer<Throwable> errorHandler)
Returns a stream containing all the elements from this stream, additionally performing the provided action if this
stream conveys an error.
|
ProcessorBuilder<T,R> |
onErrorResume(Function<Throwable,? extends R> errorHandler)
Returns a stream containing all the elements from this stream.
|
ProcessorBuilder<T,R> |
onErrorResumeWith(Function<Throwable,? extends PublisherBuilder<? extends R>> errorHandler)
Returns a stream containing all the elements from this stream.
|
ProcessorBuilder<T,R> |
onErrorResumeWithRsPublisher(Function<Throwable,? extends org.reactivestreams.Publisher<? extends R>> errorHandler)
Returns a stream containing all the elements from this stream.
|
ProcessorBuilder<T,R> |
onTerminate(Runnable action)
Returns a stream containing all the elements from this stream, additionally performing the provided action when this
stream completes or failed.
|
ProcessorBuilder<T,R> |
peek(Consumer<? super R> consumer)
Returns a stream containing all the elements from this stream, additionally performing the provided action on each
element.
|
SubscriberBuilder<T,Optional<R>> |
reduce(BinaryOperator<R> accumulator)
Perform a reduction on the elements of this stream, using the provided accumulation function.
|
SubscriberBuilder<T,R> |
reduce(R identity,
BinaryOperator<R> accumulator)
Perform a reduction on the elements of this stream, using the provided identity value and the accumulation
function.
|
ProcessorBuilder<T,R> |
skip(long n)
Discard the first
n of this stream. |
ProcessorBuilder<T,R> |
takeWhile(Predicate<? super R> predicate)
Take the longest prefix of elements from this stream that satisfy the given
predicate . |
SubscriberBuilder<T,Void> |
to(org.reactivestreams.Subscriber<? super R> subscriber)
Connect the outlet of the
Publisher built by this builder to the given Subscriber . |
<S> SubscriberBuilder<T,S> |
to(SubscriberBuilder<? super R,? extends S> subscriber)
Connect the outlet of this stream to the given
SubscriberBuilder graph. |
SubscriberBuilder<T,List<R>> |
toList()
Collect the elements emitted by this stream into a
List . |
<S> ProcessorBuilder<T,S> |
via(org.reactivestreams.Processor<? super R,? extends S> processor)
Connect the outlet of this stream to the given
Processor . |
<S> ProcessorBuilder<T,S> |
via(ProcessorBuilder<? super R,? extends S> processor)
Connect the outlet of the
Publisher built by this builder to the given ProcessorBuilder . |
<S> ProcessorBuilder<T,S> map(Function<? super R,? extends S> mapper)
mapper
function.
map
in interface TransformingOperators<R>
S
- The type of elements that the mapper
function emits.mapper
- The function to use to map the elements.<S> ProcessorBuilder<T,S> flatMap(Function<? super R,? extends PublisherBuilder<? extends S>> mapper)
mapper
function are emitted from this stream.
This method operates on one publisher at a time. The result is a concatenation of elements emitted from all the publishers produced by the mapper function.
Unlike TransformingOperators.flatMapRsPublisher(Function)
}, the mapper function returns a
org.eclipse.microprofile.reactive.streams
type instead of an
org.reactivestreams
type.
flatMap
in interface TransformingOperators<R>
S
- The type of the elements emitted from the new stream.mapper
- The mapper function.<S> ProcessorBuilder<T,S> flatMapRsPublisher(Function<? super R,? extends org.reactivestreams.Publisher<? extends S>> mapper)
mapper
function are emitted from this stream.
This method operates on one publisher at a time. The result is a concatenation of elements emitted from all the publishers produced by the mapper function.
Unlike TransformingOperators.flatMap(Function)
, the mapper function returns a org.eclipse.microprofile.reactive.streams
builder instead of an org.reactivestreams
type.
flatMapRsPublisher
in interface TransformingOperators<R>
S
- The type of the elements emitted from the new stream.mapper
- The mapper function.<S> ProcessorBuilder<T,S> flatMapCompletionStage(Function<? super R,? extends CompletionStage<? extends S>> mapper)
CompletionStage
, and flatten so that the elements the values redeemed by each
CompletionStage
are emitted from this stream.
This method only works with one element at a time. When an element is received, the mapper
function is
executed, and the next element is not consumed or passed to the mapper
function until the previous
CompletionStage
is redeemed. Hence this method also guarantees that ordering of the stream is maintained.
flatMapCompletionStage
in interface TransformingOperators<R>
S
- The type of the elements emitted from the new stream.mapper
- The mapper function.<S> ProcessorBuilder<T,S> flatMapIterable(Function<? super R,? extends Iterable<? extends S>> mapper)
Iterable
's, and flatten so that the elements contained in each iterable are
emitted by this stream.
This method operates on one iterable at a time. The result is a concatenation of elements contain in all the
iterables returned by the mapper
function.
flatMapIterable
in interface TransformingOperators<R>
S
- The type of the elements emitted from the new stream.mapper
- The mapper function.ProcessorBuilder<T,R> filter(Predicate<? super R> predicate)
Predicate
.
Any elements that return true
when passed to the Predicate
will be emitted, all other
elements will be dropped.
filter
in interface FilteringOperators<R>
predicate
- The predicate to apply to each element.ProcessorBuilder<T,R> distinct()
Object.equals(Object)
) of this stream.
distinct
in interface FilteringOperators<R>
ProcessorBuilder<T,R> limit(long maxSize)
maxSize
elements in length.
If maxSize
is reached, the stream will be completed, and upstream will be cancelled. Completion of the
stream will occur immediately when the element that satisfies the maxSize
is received.
limit
in interface FilteringOperators<R>
maxSize
- The maximum size of the returned stream.ProcessorBuilder<T,R> skip(long n)
n
of this stream. If this stream contains fewer than n
elements, this stream will
effectively be an empty stream.
skip
in interface FilteringOperators<R>
n
- The number of elements to discard.ProcessorBuilder<T,R> takeWhile(Predicate<? super R> predicate)
predicate
.
When the predicate
returns false, the stream will be completed, and upstream will be cancelled.
takeWhile
in interface FilteringOperators<R>
predicate
- The predicate.ProcessorBuilder<T,R> dropWhile(Predicate<? super R> predicate)
predicate
.
As long as the predicate
returns true, no elements will be emitted from this stream. Once the first element
is encountered for which the predicate
returns false, all subsequent elements will be emitted, and the
predicate
will no longer be invoked.
dropWhile
in interface FilteringOperators<R>
predicate
- The predicate.ProcessorBuilder<T,R> peek(Consumer<? super R> consumer)
peek
in interface PeekingOperators<R>
consumer
- The function called for every element.T
and emits the same elements. In between,
the given function is called for each element.ProcessorBuilder<T,R> onError(Consumer<Throwable> errorHandler)
onError
in interface PeekingOperators<R>
errorHandler
- The function called with the failure.T
and emits the same elements. If the
stream conveys a failure, the given error handler is called.ProcessorBuilder<T,R> onTerminate(Runnable action)
PeekingOperators.onError(Consumer)
and PeekingOperators.onComplete(Runnable)
. In addition, the action is called if
the stream is cancelled downstream.
onTerminate
in interface PeekingOperators<R>
action
- The function called when the stream completes or failed.T
and emits the same elements. The given
action is called when the stream completes or fails.ProcessorBuilder<T,R> onComplete(Runnable action)
onComplete
in interface PeekingOperators<R>
action
- The function called when the stream completes.T
and emits the same elements. The given
action is called when the stream completes.SubscriberBuilder<T,Void> forEach(Consumer<? super R> action)
The returned CompletionStage
will be redeemed when the stream completes, either successfully if the stream
completes normally, or with an error if the stream completes with an error or if the action throws an exception.
forEach
in interface ConsumingOperators<R>
action
- The action.SubscriberBuilder
that will invoke the action for each element of the stream.SubscriberBuilder<T,Void> ignore()
The returned CompletionStage
will be redeemed when the stream completes, either successfully if the
stream completes normally, or with an error if the stream completes with an error or if the action throws an
exception.
ignore
in interface ConsumingOperators<R>
SubscriberBuilder
for the stream.SubscriberBuilder<T,Void> cancel()
The returned CompletionStage
will be immediately redeemed as soon as the stream is run.
cancel
in interface ConsumingOperators<R>
SubscriberBuilder
for the stream.SubscriberBuilder<T,R> reduce(R identity, BinaryOperator<R> accumulator)
The result of the reduction is returned in the CompletionStage
.
reduce
in interface ConsumingOperators<R>
identity
- The identity value.accumulator
- The accumulator function.SubscriberBuilder
for the reduction.SubscriberBuilder<T,Optional<R>> reduce(BinaryOperator<R> accumulator)
The result of the reduction is returned as an Optional<T>
in the CompletionStage
. If there are no elements in this stream,
empty will be returned.
reduce
in interface ConsumingOperators<R>
accumulator
- The accumulator function.SubscriberBuilder
for the reduction.<S,A> SubscriberBuilder<T,S> collect(Collector<? super R,A,S> collector)
Collector
.
Since Reactive Streams are intrinsically sequential, only the accumulator of the collector will be used, the combiner will not be used.
collect
in interface ConsumingOperators<R>
S
- The result of the collector.A
- The accumulator type.collector
- The collector to collect the elements.SubscriberBuilder
that will emit the collected result.<S> SubscriberBuilder<T,S> collect(Supplier<S> supplier, BiConsumer<S,? super R> accumulator)
Collector
built from the given
supplier
and accumulator
.
Since Reactive Streams are intrinsically sequential, the combiner will not be used. This is why this method does not accept a combiner method.
collect
in interface ConsumingOperators<R>
S
- The result of the collector.supplier
- a function that creates a new result container. It creates objects of type <A>
.accumulator
- an associative, non-interfering, stateless function for incorporating an additional element into a
resultSubscriberBuilder
that will emit the collected result.SubscriberBuilder<T,List<R>> toList()
List
.
toList
in interface ConsumingOperators<R>
SubscriberBuilder
that will emit the list.SubscriberBuilder<T,Optional<R>> findFirst()
Publisher
, and return it in a
CompletionStage
.
If the stream is completed before a single element is emitted, then Optional.empty()
will be emitted.
findFirst
in interface ConsumingOperators<R>
SubscriberBuilder
.ProcessorBuilder<T,R> onErrorResume(Function<Throwable,? extends R> errorHandler)
By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
the stream invokes its subscriber's onError
method, and then terminates without invoking any more
of its subscriber's methods. This operator changes this behavior. If the current stream encounters an error,
instead of invoking its subscriber's onError
method, it will instead emit the return value of the
passed function. This operator prevents errors from propagating or to supply fallback data should errors be
encountered.
onErrorResume
in interface ErrorHandlingOperators<R>
errorHandler
- the function returning the value that needs to be emitting instead of the error.
The function must not return null
.ProcessorBuilder<T,R> onErrorResumeWith(Function<Throwable,? extends PublisherBuilder<? extends R>> errorHandler)
PublisherBuilder
instead.
By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
the stream invokes its subscriber's onError
method, and then terminates without invoking any more
of its subscriber's methods. This operator changes this behavior. If the current stream encounters an error,
instead of invoking its subscriber's onError
method, it will instead relinquish control to the
PublisherBuilder
returned from the given function, which invokes the subscriber's onNext
method if it is able to do so. The subscriber's original Subscription
is used to
control the flow of elements both before and after any error occurring.
In such a case, because no publisher necessarily invokes onError
on the stream's subscriber,
it may never know that an error happened.
onErrorResumeWith
in interface ErrorHandlingOperators<R>
errorHandler
- the function returning the stream that needs to be emitting instead of the error.
The function must not return null
.ProcessorBuilder<T,R> onErrorResumeWithRsPublisher(Function<Throwable,? extends org.reactivestreams.Publisher<? extends R>> errorHandler)
Publisher
instead.
By default, when a stream encounters an error that prevents it from emitting the expected item to its subscriber,
the stream invokes its subscriber's onError
method, and then terminates without invoking any more
of its subscriber's methods. This operator changes this behavior. If the current stream encounters an error,
instead of invoking its subscriber's onError
method, the subscriber will be fed from the
Publisher
returned from the given function, and the subscriber's onNext
method is called as the returned Publisher publishes. The subscriber's original Subscription
is used to
control the flow of both the original and the onError Publishers' elements.
In such a case, because no publisher necessarily invokes onError
,
the subscriber may never know that an error happened.
onErrorResumeWithRsPublisher
in interface ErrorHandlingOperators<R>
errorHandler
- the function returning the stream that need to be emitting instead of the error.
The function must not return null
.SubscriberBuilder<T,Void> to(org.reactivestreams.Subscriber<? super R> subscriber)
Publisher
built by this builder to the given Subscriber
.
The Reactive Streams specification states that a subscriber should cancel
any new stream subscription it receives if it already has an active subscription.
The returned result of this method is a stream that creates a subscription for the
subscriber passed in, so the resulting stream should only be run once.
For the same reason, the subscriber passed in should not have any active subscriptions
and should not be used in more than one call to this method.to
in interface ConnectingOperators<R>
subscriber
- The subscriber to connect.<S> SubscriberBuilder<T,S> to(SubscriberBuilder<? super R,? extends S> subscriber)
SubscriberBuilder
graph.
The Reactive Streams specification states that a subscriber should cancel
any new stream subscription it receives if it already has an active subscription.
For this reason, a subscriber builder, particularly any that represents a graph that
includes a user supplied Subscriber
or Processor
stage, should not be
used in the creation of more than one stream instance.to
in interface ConnectingOperators<R>
subscriber
- The subscriber builder to connect.<S> ProcessorBuilder<T,S> via(ProcessorBuilder<? super R,? extends S> processor)
Publisher
built by this builder to the given ProcessorBuilder
.
The Reactive Streams specification states that a subscribing processor should cancel
any new stream subscription it receives if it already has an active subscription.
For this reason, a processor builder, particularly any that represents a graph that
includes a user supplied Processor
stage, should not be
used in the creation of more than one stream instance.via
in interface ConnectingOperators<R>
processor
- The processor builder to connect.<S> ProcessorBuilder<T,S> via(org.reactivestreams.Processor<? super R,? extends S> processor)
Processor
.
The Reactive Streams specification states that a subscribing processor should cancel
any new stream subscription it receives if it already has an active subscription.
The returned result of this method is a stream that creates a subscription for the
processor passed in, so the resulting stream should only be run once.
For the same reason, the processor passed in should not have any active subscriptions
and should not be used in more than one call to this method.via
in interface ConnectingOperators<R>
processor
- The processor builder to connect.org.reactivestreams.Processor<T,R> buildRs()
ReactiveStreamsEngine
found by the ServiceLoader
.Processor
that will run this stream.org.reactivestreams.Processor<T,R> buildRs(ReactiveStreamsEngine engine)
ReactiveStreamsEngine
.engine
- The engine to run the stream with.Processor
that will run this stream.Copyright © 2019 Eclipse Foundation. All rights reserved.