I'm currently working on a project with @HamishDotNet & @LordHanson where we make heavy use of Rx (Reactive Extensions) for processing streams of data which are generated asynchronously. We have multiple pipeline processes based around an Rx stream, they look something like this:
The method Sequence returns an instance of IObservable<T>which is then acted upon by 4 or 5 methods before the Subscriber is called, some of these methods might mutate the state along the way. The important part is the idea of the pipeline to process the Rx stream as it is generated by the asynchronous Sequence method.
So what I wanted to do is log what's going on - how long each step takes (in the pipeline) as well as how long the Rx stream is alive and when the stream generates a value. The issue we have is we don't want to overly modify the code just support such logging\telemetry. I don't want to end up with something like this:
Don't get me wrong, if I had only a couple of Rx calls in the whole app then I'd probably go with the quick option above, but we have a lot of Rx calls.
What I want is an AOP approach - the lightest touch possible. I chose to use PostSharp for the AOP, the simple reason being it's the one I've heard most about.
The first step was to create a simple console app and add references for Rx & PostSharp via NuGet - couldn't be simpler:
Next I needed a couple of Rx methods to test with, I created two, one generating a continual sequence of numbers with an interval of 500 ms and one generating a single number after an initial delay of 1000 ms, it then completes the Rx stream:
The two methods are rather contrived but this is a demo after all. This is used in the following console app.
What you can see is the main thread is blocked by the call to the Console.ReadLine method inside the Using statement, but because of the asynchronous nature of Rx the application will coninute to work on a background thread. This background thread will call back to the Subscriber set up to call the Display static method every time data is published onto the Rx stream:
As you can see from above I'm using the Sequence method initialised with 42, this produces the following output ad-infinitum until the enter key is pressed:
Once enter is pressed our subscription to the Rx stream is automatically disposed by the Using statement:
I've now got the example code working so the next step is to add some telemetry\logging to the method. The first approach is to add a basic PostSharp attribute to the methods. After a quick google for an example I came up with the following, it simply writes to the console when the method is entered and exited:
This is then applied to the Sequence method on the Generator class as an attribute:
You can probably guess what's produced if you're familiar with the Rx style of coding - because the method is asynchronous the OnEntry & OnExit methods of the attribute are called immediately before anything is published to the Rx stream - Ithe telemtry is output as green text in the console output:
So How can I get telemetry to output when ever something is published to the Rx Stream?
Now this is where the knowledge of Rx comes into play - anyone who's used Rx for sometime will understand about the interface IObservable<T> & IObserver<T> In this case I'm particularly interested in the IObservable<T>, it exposes the subscribe method which allow a user to subscribe to the Rx stream:
I know PostSharp provides access to the return parameter for a method and since my Sequence method returns an IObservable<Int> I should be able to subscribe to the Rx stream and then output telemetry via the console:
Now the easiest solution is to cast the return value into an IObservable<Int> and then subscribe to the Rx stream:
This produces the required output; but there is a big problem with the implementation - it's not a generic solution, it only works for IObservable<Int>
To produce a truly generic solution that works with any implementation of IObservable<T> I need to use reflection to subscribe to the OnNext method of the return value passed to the OnExit method:
What you see above is the reflection code need to successfully execute the Subscribe method on the IObservable<T>.
I've introduced an instance of the Stopwatch class to capture timing information and importantly introduced a custom class, TraceObservable<T>, this class actually writes out the telemetry information to the console. The instance of this class ('traceInstance') is passed to the Invoke method of the MethodInfo class for the Subscribe method on the IObservable<T> of the return value:
What you'll also see above is I've added an implementation for the OnCompleted method - this will required another PostSharp Aspect to be defined but for now this is the output this produces - again the telemetry info can seen as green text in the console window:
Now I'm pretty much there, the only difference now for the test program is the addition of a couple of attributes to the Generator class:
An interesting side effect of the test program is when we dispose of our Rx subscription the PostSharp aspect continues to output telemetry information - this is actually completely logical and will only stop once the instance of the Generator class is destroyed:
To see how the OnCompleted method is called for the TraceObservable<T> we need another PostSharp aspect as stated previously:
This is then like the other PostSharp aspects as method attributes on the Generator class:
This produces the following output when the Single method on the Generator class is used in the test program:
That pretty much covers it...
I've been able to add the telemetry I want without actually changing the method implementation, I've only had to add a couple of method attributes
And I also tried this in Silverlight - it works for version 4 and beyond....
The code is available for download.
The method Sequence returns an instance of IObservable<T>
So what I wanted to do is log what's going on - how long each step takes (in the pipeline) as well as how long the Rx stream is alive and when the stream generates a value. The issue we have is we don't want to overly modify the code just support such logging\telemetry. I don't want to end up with something like this:
Don't get me wrong, if I had only a couple of Rx calls in the whole app then I'd probably go with the quick option above, but we have a lot of Rx calls.
What I want is an AOP approach - the lightest touch possible. I chose to use PostSharp for the AOP, the simple reason being it's the one I've heard most about.
The first step was to create a simple console app and add references for Rx & PostSharp via NuGet - couldn't be simpler:
Next I needed a couple of Rx methods to test with, I created two, one generating a continual sequence of numbers with an interval of 500 ms and one generating a single number after an initial delay of 1000 ms, it then completes the Rx stream:
The two methods are rather contrived but this is a demo after all. This is used in the following console app.
What you can see is the main thread is blocked by the call to the Console.ReadLine method inside the Using statement, but because of the asynchronous nature of Rx the application will coninute to work on a background thread. This background thread will call back to the Subscriber set up to call the Display static method every time data is published onto the Rx stream:
As you can see from above I'm using the Sequence method initialised with 42, this produces the following output ad-infinitum until the enter key is pressed:
Once enter is pressed our subscription to the Rx stream is automatically disposed by the Using statement:
I've now got the example code working so the next step is to add some telemetry\logging to the method. The first approach is to add a basic PostSharp attribute to the methods. After a quick google for an example I came up with the following, it simply writes to the console when the method is entered and exited:
This is then applied to the Sequence method on the Generator class as an attribute:
You can probably guess what's produced if you're familiar with the Rx style of coding - because the method is asynchronous the OnEntry & OnExit methods of the attribute are called immediately before anything is published to the Rx stream - Ithe telemtry is output as green text in the console output:
So How can I get telemetry to output when ever something is published to the Rx Stream?
Now this is where the knowledge of Rx comes into play - anyone who's used Rx for sometime will understand about the interface IObservable<T> & IObserver<T> In this case I'm particularly interested in the IObservable<T>, it exposes the subscribe method which allow a user to subscribe to the Rx stream:
I know PostSharp provides access to the return parameter for a method and since my Sequence method returns an IObservable<Int> I should be able to subscribe to the Rx stream and then output telemetry via the console:
Now the easiest solution is to cast the return value into an IObservable<Int> and then subscribe to the Rx stream:
This produces the required output; but there is a big problem with the implementation - it's not a generic solution, it only works for IObservable<Int>
To produce a truly generic solution that works with any implementation of IObservable<T> I need to use reflection to subscribe to the OnNext method of the return value passed to the OnExit method:
What you see above is the reflection code need to successfully execute the Subscribe method on the IObservable<T>.
I've introduced an instance of the Stopwatch class to capture timing information and importantly introduced a custom class, TraceObservable<T>, this class actually writes out the telemetry information to the console. The instance of this class ('traceInstance') is passed to the Invoke method of the MethodInfo class for the Subscribe method on the IObservable<T> of the return value:
What you'll also see above is I've added an implementation for the OnCompleted method - this will required another PostSharp Aspect to be defined but for now this is the output this produces - again the telemetry info can seen as green text in the console window:
Now I'm pretty much there, the only difference now for the test program is the addition of a couple of attributes to the Generator class:
An interesting side effect of the test program is when we dispose of our Rx subscription the PostSharp aspect continues to output telemetry information - this is actually completely logical and will only stop once the instance of the Generator class is destroyed:
To see how the OnCompleted method is called for the TraceObservable<T> we need another PostSharp aspect as stated previously:
This is then like the other PostSharp aspects as method attributes on the Generator class:
This produces the following output when the Single method on the Generator class is used in the test program:
That pretty much covers it...
I've been able to add the telemetry I want without actually changing the method implementation, I've only had to add a couple of method attributes
And I also tried this in Silverlight - it works for version 4 and beyond....
The code is available for download.
0 comments:
Post a Comment