-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Adding Instrumentation for reactive results and when they finish #4084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Adding Instrumentation for reactive results and when they finish #4084
Conversation
*/ | ||
@SuppressWarnings("TypeParameterUnusedInFormals") | ||
@PublicApi | ||
public class InstrumentationReactiveResultsParameters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New parameters for the Instrumentation method
@Nullable | ||
default InstrumentationContext<Void> beginReactiveResults(InstrumentationReactiveResultsParameters parameters, InstrumentationState state) { | ||
return noOp(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice how its Void - the end of a Publisher has no result object - it has many objects but there is no actual result at the end
*/ | ||
public static <T> Publisher<T> whenPublisherFinishes(Publisher<T> publisher, Consumer<? super Throwable> atTheEndCallback) { | ||
return new AtTheEndPublisher<>(publisher, atTheEndCallback); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helper allows us to wrap a Publisher so that a callback is made when its finished.
if (keepOrdered) { | ||
mappingPublisher = new CompletionStageMappingOrderedPublisher<>(upstreamPublisher, mapper); | ||
} else { | ||
mappingPublisher = new CompletionStageMappingPublisher<>(upstreamPublisher, mapper); | ||
} | ||
publisher = ReactiveSupport.whenPublisherFinishes(mappingPublisher, whenDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we now delegate and invoke the whenDone callback
|
||
// | ||
// wrap this Publisher into one that can call us back when the publishing is done either in error or successful | ||
publisher = ReactiveSupport.whenPublisherFinishes(publisher, throwable -> ctx.onCompleted(null, throwable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - we delegate to the original Publisher but the callback here is done when the Publisher is finished
Test Results 326 files - 649 326 suites - 649 3m 31s ⏱️ - 7m 5s Results for commit d064c7f. ± Comparison against base commit 3ba9750. This pull request removes 558 and adds 203 tests. Note that renamed tests count towards both.
This pull request skips 1 test.
♻️ This comment has been updated with latest results. |
…ainedInstrumentation fixups
reactiveCtx.onDispatched(); | ||
|
||
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered, | ||
throwable -> reactiveCtx.onCompleted(null, throwable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we pass in the call back here. Why?
We want the SubscriptionPublisher to be the implementation we pass back - we have tests for this.
So I changed SubscriptionPublisher internally to know when its finished
|
||
called | ||
throwable.message == "BANG" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing that the new helper works
return t1 == t2 | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make it easier to check large list of values for ordered things. Helpful with Instrumentation "step" testing
["a"], ["b"], ["c"], ["X"], ["e"], ["f"], ["g"]) | ||
then: | ||
!actual | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test thats the TestUtil helpers work
]) | ||
|
||
// last of all it finishes | ||
instrumentation.executionList.last == "end:reactive-results-defer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test that @defer publisher can tell us when they are finished
…tation callback - fixed tests
…lback-after-reactive-publishers-finish
…tation callback -null marked
]) | ||
|
||
// last of all it finishes | ||
TestUtil.last(instrumentation.executionList) == "end:reactive-results-defer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than test ALL the instrumentation steps again - we look at sub sections and make sure they are in order
This balances completeness with code readability / maintain ability
This adds Insrtrumentation support for reactive results - namely defer Publishers and Subscription Publisher.
When the Publisher is finished (throwable or not error) then the Instrumentation context is considered finished and called.
See #4083 for related code