Windows Support Number

  • Subscribe to our RSS feed.
  • Twitter
  • StumbleUpon
  • Reddit
  • Facebook
  • Digg

Wednesday, 31 October 2012

Trying to be more functional with Rx

Posted on 14:55 by Unknown
I realised this week I'm not being as functional when creating an Rx extension method as I should could be.

This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.

   1:  public static IObservable<T> Pausable<T>(this IObservable<T> stream,
                                                IObservable<bool> paused,
                                                bool initialState = false)
   2:  {
   3:      ...
   4:  }

Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.

Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      ...
   4:  }

So my first attempt looked like this:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      var suspended = new ReplaySubject<bool>(1);
   4:      suspended.OnNext(initialState);
   5:              
   6:      suspend.Subscribe(suspended.OnNext);
   7:      return stream.Where(t => !suspended.Take(1).Wait());
   8:  }

And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:

   1:  [SetUp]
   2:  public void SetUp()
   3:  {
   4:      _generatorCount = 100;
   5:      _testScheduler = new TestScheduler();
   6:   
   7:      _generator = Observable.Generate(1,
   8:          x => x <= _generatorCount,
   9:          x => x + 1,
  10:          x => x,
  11:          x => TimeSpan.FromSeconds(1), _testScheduler);
  12:  }
  13:   
  14:  [Test]
  15:  public void should_recieve_values_after_single_resuming()
  16:  {
  17:      // ARRANGE
  18:      var count = 0;
  19:      var suspend = new Subject<bool>();
  20:   
  21:      _generator.Suspendable(suspend, true)
  22:          .Subscribe(n => count++);
  23:   
  24:      // ACT
  25:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1100).Ticks);
  26:      suspend.OnNext(false);
  27:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1010).Ticks);
  28:   
  29:      // ASSERT
  30:      Assert.That(count, Is.EqualTo(1));
  31:  }
  32:          
  33:  [Test]
  34:  public void should_recieve_values_after_multiple_resuming()
  35:  {
  36:      // ARRANGE
  37:      var count = 0;
  38:      var suspend = new Subject<bool>();
  39:   
  40:      _generator.Suspendable(suspend, true)
  41:          .Subscribe(n => count++);
  42:   
  43:      // ACT
  44:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  45:      suspend.OnNext(false);
  46:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  47:      suspend.OnNext(true);
  48:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  49:      suspend.OnNext(false);
  50:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  51:   
  52:      // ASSERT
  53:      Assert.That(count, Is.EqualTo(20));
  54:  }

So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?

Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.

A quick Google search for 'avoid Subject Rx' leads me to a post on the Rx forum by the Eric Meijer giving his view on the topic.

So how do I make this a more functional implementation?

The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      return Observable.Create<T>(o =>
   4:      {
   5:          var disposable = suspend.StartWith(initialState)
   6:                  .DistinctUntilChanged()
   7:                  .Select(s => s ? Observable.Empty<T>() : stream)
   8:                  .Switch()
   9:                  .Subscribe(o);
  10:   
  11:          return disposable;
  12:      });
  13:  }

This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.

Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:

   1:  _generatorCount = 100;
   2:  _testScheduler = new TestScheduler();
   3:   
   4:  _generator = Observable.Generate(1,
   5:      x => x <= _generatorCount,
   6:      x => x + 1,
   7:      x => x,
   8:      x => TimeSpan.FromSeconds(1), _testScheduler);

What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?

I'm sure there's something missing  wrong with the implementation of my Suspend<T> extension method, I'll continue to investigate...
Email ThisBlogThis!Share to XShare to FacebookShare to Pinterest
Posted in .Net, Development, Rx, TDD | No comments
Newer Post Older Post Home

0 comments:

Post a Comment

Subscribe to: Post Comments (Atom)

Popular Posts

  • Unit testing Rx methods Timeout & Retry with moq
    Earlier this week I was trying to unit test an asynchronous service (Foo) which used another asynchronous service (Bar) internally and ran i...
  • Be careful of the culture when using Bing Maps REST API
    When developing the Bing Maps Wrapper service for the WP7Contrib we weren't aware of the importance of the instance of the CultureInfo ...
  • WP7Contrib: Bing Maps REST Services Wrapper - Deep Dive
    Following on from Rich's post introducing the Bing Maps Service in the WP7Contrib I'm going to explain in more detail how we built ...
  • MVVM anti-pattern: View code behind with no implementation
    I've seen rather a lot of this anti-pattern recently, to be explicit about what I mean, lets define this in terms of a WPF user control....
  • Implementing a message box using a visual overlay in MVVM
    I've blogged about implementing a busy indicator before, this post is an extension of this pattern to implement a message box - this is...
  • MVVM anti-pattern: Injecting the IoC container into a View Model
    This is another anti-pattern I've seen a lot recently, the dynamic use of the IoC container inside a view model to resolve child view mo...
  • Coupling and cohesion
    I was reading ploeh's blog  this morning and it made me think about coupling and cohesion in general. These are import concepts in softw...
  • Using IoC nested lifetime scopes with View Models in MVVM
    A common pattern you see when developing web services is the use of the Unit of Work applied to the HTTP request - anything that happens dur...
  • WP7Contrib: URL shortening in a WP7 app
    I needed the ability to shorten a URL for a WP7 app the other day so I could share a URL via the ShareLinkTask, more info about this task ca...
  • MVVM anti-pattern: explicitly using data context in View code behind
    I believe explicitly using the data context in the code behind of the view (custom, user control etc) in any MVVM application is an anti-pat...

Categories

  • .Net
  • .Net 4.5
  • Abstractions
  • Advertising
  • Agile
  • Agile Courage
  • AOP
  • Async
  • automated testing
  • Azure
  • Azure IIS RESTful development
  • BDD
  • Bing Maps
  • Bounded Context
  • C#
  • C# 5.0
  • Caching
  • Chocolatey
  • CLoud
  • CodePlex
  • Coding
  • Coding Building CI Testing
  • Coding C#
  • coding C# IoC StructureMap
  • Coding Functional-Programming
  • Coding REST Knowledge
  • Coding Services
  • Coding TDD Refactoring Agile
  • Command
  • continuous testing
  • coupling
  • CultureInfo
  • DAL
  • databases
  • DDD
  • DDD Coaching
  • DDD Domain Events Auditing nHibernate
  • DDD Entities Value Objects
  • Debugging
  • Design Patterns
  • Design Patterns Databases Auditing
  • Developement
  • Development
  • Development Coding
  • Development Process
  • Development unit testing
  • Development VS 2011
  • Diagnostics
  • Disposable
  • Exceptions
  • FINDaPAD
  • FindaPad Property Rental Windows Phone 7 Mobile Devices
  • Fun Coding Duct-Tape
  • Hotfixes
  • integration testing
  • IoC
  • jasmine
  • javascript
  • Jobs Development
  • LINQ
  • marketplace
  • Mobile Devices
  • Mocking
  • MSDN Coding
  • MSpec
  • Multilingual
  • MVC
  • MVVM
  • nCrunch
  • nHbiernate Repository Pattern Criteria
  • nHibernate Auditing Design Fluent
  • nHibnerate Entities Events Listeners
  • node.js
  • nodes.js
  • Nokia
  • NoSQL RavenDB Azure Development
  • Observations
  • OO
  • ORM
  • Performance
  • Portable Class Library
  • Portable Library
  • PostSharp
  • Process
  • Rants
  • RavenDB IIS 7.5 Development
  • Reactive
  • Reactive Extension
  • Reactive Extensions
  • ReadOnlyCollections
  • Resharper
  • REST Distributed-Systems
  • REST HTTP
  • rest web
  • RESTful
  • Rx
  • Serialization
  • Silverlight
  • Silverlight Installation
  • Task
  • TDD
  • TDD IoC DI
  • TDD Mocking
  • TDD Team Observation
  • Telerik
  • testing
  • threading
  • TPL
  • UI
  • Undo-Redo
  • unit testing
  • ViewModels
  • VS 2012
  • wcf
  • web api
  • Web Services
  • web services mobile devices data
  • WebAPI
  • Windows
  • Windows 8
  • windows phone
  • Windows Phone 7
  • WP7
  • WP7 Bing Maps Development Network HTTP
  • WP7 Bing Maps Development UK Crime
  • WP7 Bing Maps Development UK Crime Clustering
  • WP7 Bing Maps Development UK Polygons Clustering Performance
  • WP7 cryptography bouncy castle
  • WP7 Cultures C#
  • WP7 feedback development app store
  • WP7 Javascript web browser
  • WP7 MSBuild
  • WP7 ORM Databases performance
  • WP7 Serialisation
  • WP7 SilverlightSerializer C#
  • WP7 sqlite performance development
  • WP7 WP7Contrib Bing Maps Development
  • WP7 WP7Contrib Bing Maps Polygon Development
  • WP7 WP7Contrib CodePlex
  • WP7 WP7Contrib CodePlex Bing Maps Development
  • WP7 WP7Contrib CodePlex ObservableCollection
  • WP7 WP7Contrib ILMerge .Net
  • WP7 WP7Contrib Phone Maps
  • WP7 WP7Contrib SilverlightSerializer C#
  • WP7Contrib
  • WP7Contrib Bing Maps WP7
  • WP7Contrib WP7 Geo-Location development C#
  • WP7Contrib WP7 HTTP Compression
  • WP7Contrib WP7 Url Development Rx
  • WP7Dev
  • WPF
  • WPF Cultures
  • WuApi
  • XAML

Blog Archive

  • ►  2013 (16)
    • ►  November (5)
    • ►  September (3)
    • ►  August (1)
    • ►  July (1)
    • ►  June (3)
    • ►  May (2)
    • ►  January (1)
  • ▼  2012 (44)
    • ►  November (2)
    • ▼  October (8)
      • Trying to be more functional with Rx
      • Building a simple Portable Class Library - Simple....
      • Tricky continuous testing and self hosting WebAPI ...
      • Exception handling for an async method
      • Self hosting a web service inside a test fixture u...
      • Using GetRequestStreamAsync and GetResponseAsync i...
      • Testing time based observable in Rx is so easy...
      • Using CompositeDisposable in base classes
    • ►  September (5)
    • ►  August (2)
    • ►  July (4)
    • ►  June (3)
    • ►  May (1)
    • ►  April (2)
    • ►  March (13)
    • ►  February (4)
  • ►  2011 (52)
    • ►  December (3)
    • ►  November (5)
    • ►  October (7)
    • ►  September (7)
    • ►  August (11)
    • ►  July (4)
    • ►  May (2)
    • ►  April (1)
    • ►  March (5)
    • ►  February (3)
    • ►  January (4)
  • ►  2010 (1)
    • ►  August (1)
  • ►  2009 (32)
    • ►  December (3)
    • ►  November (7)
    • ►  October (6)
    • ►  September (11)
    • ►  April (1)
    • ►  March (4)
Powered by Blogger.

About Me

Unknown
View my complete profile