Reactive Extensions (Rx)
What is Rx
Rx is a library for composing asynchronous and event-based programs using observable collections.
Using
Rx, developers represent asynchronous data streams with Observables,
query asynchronous data streams using LINQ operators, and parameterize
the concurrency in the asynchronous data streams using Schedulers.
Simply put, Rx = Observables + LINQ + Schedulers.
The
Reactive Extensions (Rx) is a library for composing asynchronous and
event-based programs using observable sequences and LINQ-style query
operators.
Whether you
are authoring a traditional desktop or web-based application, you have
to deal with asynchronous and event-based programming from time to time.
Desktop applications have I/O operations and computationally expensive
tasks that might take a long time to complete and potentially block
other active threads. Furthermore, handling exceptions, cancellation,
and synchronization is difficult and error-prone.
Three core properties are reflected in here:
Asynchronous and event-based
– As reflected in the title, the bread and butter of Rx’s mission
statement is to simplify those programming models. Everyone knows what
stuck user interfaces look like, both on the Windows platform and on
the web. And with the cloud around the corner, asynchrony becomes
quintessential. Low-level technologies like .NET events, the
asynchronous pattern, tasks, AJAX, etc. are often too hard.
Composition
– Combining asynchronous computations today is way too hard. It
involves a lot of plumbing code that has little to nothing to do with
the problem being solved. In particular, the data flow of the operations
involved in the problem is not clear at all, and code gets spread out
throughout event handlers, asynchronous callback procedures, and
whatnot.
Observable collections
– By looking at asynchronous computations as data sources, we can
leverage the active knowledge of LINQ’s programming model. That’s right:
your mouse is a database of mouse moves and clicks. In the world of
Rx, such asynchronous data sources are composed using various
combinators in the LINQ sense, allowing things like filters,
projections, joins, time-based operations, etc.
Asynchronous Data Streams
Using
Rx, you can represent multiple asynchronous data streams (that come
from diverse sources, e.g., stock quote, tweets, computer events, web
service requests, etc.), and subscribe to the event stream using the
IObserver<T> interface. The IObservable<T> interface
notifies the subscribed IObserver<T> interface whenever an event
occurs.
LINQ in Rx
Because observable
sequences are data streams, you can query them using standard LINQ query
operators implemented by the Observable type. Thus you can filter,
project, aggregate, compose and perform time-based operations on
multiple events easily by using these static LINQ operators. In
addition, there are a number of other reactive stream specific operators
that allow powerful queries to be written. Cancellation, exceptions,
and synchronization are also handled gracefully by using the extension
methods provided by Rx.
How Rx works?
Take a look at the next code snippet:
How many times have you done this in your application? Consider for a moment the collections in .NET Framework; they are all unified by IEnumerable. Arrays, lists, they all implement the IEnumerable interface. If you take a look at these collections, they are pulling collection. If you are using a loop, you receive the data from a collection over a period of time, or in other words, you are pulling the data, saying “move next, move next”, pulling the elements out.
What we have here is an implementation of the Iterator pattern (IEnumerable\IEnumerator).
Now, if we take a look at the RX style collection, they are push collections. They push
values to you. And here is another key moment to understand. What does
“push values to you” mean? Take a look again at the next code snippet.
It represents another very common situation in the programming, namely,
the usage of events.
We are handling the MouseMove event. This is a pretty standard task. The mouse cursor’s location value is changing over a period of time, and an event notifies you about that - or in other words, the source is pushing values to you. If we again refer to the design patterns, we will see that we have an implementation of the Observer pattern. Yes, the centerpiece of the RX Framework is the Observer pattern. All RX style collections are unified by the IObservable interface. You can then subscribe to this observable using an instance of an object implementing the IObserver interface.
On one side – we have the Iterator pattern ( IEnumerable<T>\IEnumerator<T>) and on the other side we have the Observer pattern (IObservable<T>\IObserver<T>). The interesting point here is that the RX developers consider the IObservable<T> as a mathematical dual to the IEnumerable<T>
interface. Normally, any Enumerable collection can be turned into
Observable collection and vice versa. Basically the RX Framework
consists of two interfaces - IObservable<T> and IObserver<T>.
Just like the Enumerable collections where you can perform various LINQ
operations, the same are implemented for the Observable collections.
So if you have to remember something, it's that the Enumerable collections are pulling data from the source and the Observable collections are pushing data from the source. And any Enumerable collection can be turned into an IObservable collection and vice versa.
So if you have to remember something, it's that the Enumerable collections are pulling data from the source and the Observable collections are pushing data from the source. And any Enumerable collection can be turned into an IObservable collection and vice versa.
What can be done with Rx Framework?
Rx offers new, different and sometimes easier ways to approach existing problems:
- Asynchronous programming. RX provides you with a built-in threading support and lets you observe collections in background threads.
- Composing custom events. This is probably the most exciting feature of the RX Framework (at least for me). Imagine a drawing application. The most common tasks in such sort of applications are to select, draw and modify objects. The core part of these behaviors is the mouse and keyboard interaction, combining mouse buttons, implementing dragging operations, etc. The RX Framework addresses these issues.
- RX offers two new interfaces: IObservable<T> and IObserver<T>. You could create observable from existing event and enumerable, use it as a regular CLR class.
Here are some examples which will give
you some idea how to use Rx to implement some of the requirements which
are not easy to implement in traditional way.
From IEnumerable<T> to IObservable<T>
We
can start with something simple. As I said in the introduction, any
“enumerable collection can be turned into observable collection”. Let’s
see what this means. For the first demo I am using two simple ListBoxes,
which are populated in the code-behind.
<Grid x:Name="LayoutRoot" Background="White"> <Grid.ColumnDefinitions> <ColumnDefinition /> <ColumnDefinition /> </Grid.ColumnDefinitions> <ListBox x:Name="lbEnumerable"/> <ListBox x:Name="lbObservable" Margin="10"/> </Grid> |
The first ListBox is populated through a foreach loop.
List<string> products = new List<string> {
"Chai", "Chang", "Aniseed Syrup", "Konbu", "Tofu", "Geitost" };
foreach ( var item in products ) this.lbEnumerable.Items.Add( item ); |
And now the interesting part – the RX Framework exposes a ToObservable() extension method, which turns an IEnumerable collection into an IObservable collection.
IObservable<string> observable = products.ToObservable();
|
Once you have an Observable you can subscribe in the following fashion:
observable.Subscribe<string>( p =>
{
this.lbObservable.Items.Add( p ); } );
|
The
result from the previous code-snippet is that the lbObservable ListBox
is populated with items thanks to the reactive framework, which pushes
values to you (to the ListBox).
Another interesting extension method is Repeat(), which repeats the observable sequence “n” times. Or even more, you can repeat sequences indefinitely.
Another interesting extension method is Repeat(), which repeats the observable sequence “n” times. Or even more, you can repeat sequences indefinitely.
IObservable<string> observable = products.ToObservable().Repeat(10); |
The result is that all products will be pushed to the ListBox ten times.
There are several overloads of the Subscribe method. One of them accepts a delegate which is executed when the observable sequence ends.
There are several overloads of the Subscribe method. One of them accepts a delegate which is executed when the observable sequence ends.
observable.Subscribe<string>( p =>
{
this.lbObservable.Items.Add( p ); }, () =>
{
this.lbObservable.Items.Add( "-----Completed----" ); } );
|
And the result is:
Unsubscribing from Observables
An
important issue is to be able to unsubscribe from the observer. Take a
look at the Repeat() example again. And for some reason you would like
to create an indefinite sequence.
IObservable<string> observable = products.ToObservable().Repeat(); |
The
result from the above example is that the observable collection will
push elements to the observer infinitely. Each time the Subscribe() method is called, it returns an IDisposable objects that can be used to unsubscribe and to stop pushing elements to the observer, like this:
IDisposable disposable = observable.Subscribe<string>( p =>
{
this.lbObservable.Items.Add( p ); }, () =>
{
this.lbObservable.Items.Add( "-----Completed----" ); } );
//....
disposable.Dispose();
|
Turning Events into Observables
This is one of the coolest features of the RX Framework. The Observable object exposes a FromEvent ()
static method, which can be used to turn an event into an observable.
After that, the event can be used as any other regular object in your
application.
var mouseMoveEvent = Observable.FromEvent<MouseEventArgs>( this, "MouseMove" );
|
And of course you could subscribe to the observable of the mouse move event in the familiar fashion.
mouseMoveEvent.Subscribe(
p =>
{
// Do something when the mouse // moves around the screen } );
|
This
is pretty simple and couldn’t illustrate the power of the RX Framework.
You could attach to the mouse move event in the standard and old school
manner.
But
let’s complicate the situation a little bit. Imagine that you want to
update the text box only when you are moving the mouse around the screen
and in the same time the left mouse button is down. Which means that if
you use the standard approach for attaching to events, you should
have an additional Boolean flag which is raised each time the left mouse
button is pressed and to reset it when the left mouse button is
released. Take a look at how this can be handled using reactive
extensions. The only thing you should do is to create observables from
the MouseMove and MouseLeftButtonDown events, and to combine them using
the SkipUntil() extension method.
var mouseMoveEvent = Observable.FromEvent<MouseEventArgs>( this, "MouseMove" );
var mouseLeftButtonDown = Observable.FromEvent<MouseButtonEventArgs> ( this, "MouseLeftButtonDown" );
var events = mouseMoveEvent.SkipUntil( mouseLeftButtonDown );
|
The
final step is to stop observing, when the mouse left button is
released. Again this can be done in the same manner – we are observing
the MouseLeftButtonUp event. However, this time the TakeUntil() extension method will be used.
var mouseMoveEvent = Observable.FromEvent<MouseEventArgs>( this, "MouseMove" );
var mouseLeftButtonDown = Observable.FromEvent<MouseButtonEventArgs> ( this, "MouseLeftButtonDown" );
var mouseLeftButtonUp = Observable.FromEvent<MouseButtonEventArgs> ( this, "MouseLeftButtonUp" );
var events = mouseMoveEvent.SkipUntil( mouseLeftButtonDown ) .TakeUntil( mouseLeftButtonUp ).Repeat();
events.Subscribe(
p =>
{
Point mousePosition = p.EventArgs.GetPosition( this ); tbMousePosition.Text = String.Format( "Mouse position {0}, {1}", mousePosition.X, mousePosition.Y );
} );
|
What
we have here is that the text block is updated only when the mouse is
moving and the left button is pressed. We will stop updating the text
box when the mouse left button is released.
Drag and Drop
Now,
as we have some basic knowledge about the reactive extensions, lets
create the equivalent of the “Hello World” for RX, namely, this is a
drag and drop application. At the first moment I said: “wow drag and
drop, this is not simple”. But you’ll see how easy it could be with
RX.
To begin, I’ll add a
TextBox and Image inside a Canvas. You could check the source code for
this example in the Demo3 folder of the Demo Solution.
<Canvas> <TextBlock x:Name="textBlock" Text="Drag Me!!!" FontSize="20" Canvas.Top="10" Canvas.Left="10"/> <Image Source="/RXDemos;component/SilverlightShow.png" Canvas.Top="50" Canvas.Left="10" x:Name="image"/> </Canvas> |
Next, turn the MouseLeftButtonDown, MouseMove and MouseLeftButtonUp events into observables.
var mouseMoveEventImage = Observable.FromEvent<MouseEventArgs>( image, "MouseMove" );
var mouseLeftButtonDownImage = Observable.FromEvent<MouseButtonEventArgs> ( image, "MouseLeftButtonDown" );
var mouseLeftButtonUpImage = Observable.FromEvent<MouseButtonEventArgs> ( image, "MouseLeftButtonUp" ); |
You
can create a new observable to listen for a sequence of mouse positions
– starting when the mouse left button goes down, and then listening to
all positions in mouse move, until the mouse left button goes up.
var draggingEventsImage = mouseMoveEvent.SkipUntil( mouseLeftButtonDown ) .TakeUntil( mouseLeftButtonUp ).Repeat(); |
As
I mentioned in the introduction, you could turn an event into
observable and treat it as any regular CLR object. Which means that you
could write LINQ queries against it. Now check this out.
var draggingEventsImage = from pos in mouseMoveEventImage.SkipUntil( mouseLeftButtonDownImage ). TakeUntil( mouseLeftButtonUpImage )
.Let( mm => mm.Zip( mm.Skip( 1 ), ( prev, cur ) =>
new {
X = cur.EventArgs.GetPosition( this ).X - prev.EventArgs.GetPosition( this ).X, Y = cur.EventArgs.GetPosition( this ).Y - prev.EventArgs.GetPosition( this ).Y } ) ).Repeat()
select pos;
|
What
I am doing here is simply taking the delta between the previous
position and the current position, while the mouse is being dragged. The
Zip functions merge two observables sequences into one observable
sequences by using a selector function.
Finally, subscribe to the dragging events and set the new position of the image.
Finally, subscribe to the dragging events and set the new position of the image.
draggingEventsImage.Subscribe(
p =>
{
Canvas.SetLeft( image, Canvas.GetLeft( image ) + p.X );
Canvas.SetTop( image, Canvas.GetTop( image ) + p.Y );
} );
|
You could do exactly the same for the TextBlock or any other control inside the canvas.
Drawing with RX
Considering the previous drag and drop demo, now we can examine another common issue that is
addressed by the RX Framework.
You could create drawing application in the same manner, as in the drag and drop demo.
var mouseMoveEvent = Observable.FromEvent<MouseEventArgs>(this,"MouseMove");
var mouseLeftButtonDown = Observable.FromEvent<MouseButtonEventArgs>( this, "MouseLeftButtonDown" );
var mouseLeftButtonUp = Observable.FromEvent<MouseButtonEventArgs>( this, "MouseLeftButtonUp" );
var draggingEvents = from pos in mouseMoveEvent.SkipUntil( mouseLeftButtonDown ).
TakeUntil( mouseLeftButtonUp )
.Let( mm => mm.Zip( mm.Skip( 1 ), ( prev, cur ) =>
new
{
X2 = cur.EventArgs.GetPosition( this ).X,
X1 = prev.EventArgs.GetPosition( this ).X,
Y2 = cur.EventArgs.GetPosition( this ).Y,
Y1 = prev.EventArgs.GetPosition( this ).Y
} ) ).Repeat()
select pos;
draggingEvents.Subscribe(
p =>
{
Line line = new Line();
line.StrokeThickness = 2;
line.Stroke = new SolidColorBrush( Colors.Black );
line.X1 = p.X1;
line.Y1 = p.Y1;
line.X2 = p.X2;
line.Y2 = p.Y2;
this.LayoutRoot.Children.Add( line );
});
|
Asynchronous Programming with RX – the PI Calculator
Asynchronous
programming is by no means restricted to Web scenarios. But not
anymore, RX is ideal for composing async applications.
Let’s say that the value of PI in System.Math.PI, at only 20 digits, isn’t precise enough for you. In that case, you may find yourself writing an application that calculates pi to an arbitrary number of digits. Although this may not be a real world example, it will illustrate the idea of handling asynchronous programming with RX. Probably many of your applications need to perform long-running operations (e.g.: printing, web service call, or calculation).
Let’s say that the value of PI in System.Math.PI, at only 20 digits, isn’t precise enough for you. In that case, you may find yourself writing an application that calculates pi to an arbitrary number of digits. Although this may not be a real world example, it will illustrate the idea of handling asynchronous programming with RX. Probably many of your applications need to perform long-running operations (e.g.: printing, web service call, or calculation).
You could check the source code for this example in the Demo5 folder of the Demo Solution.
Check out the method, which is responsible for the pi calculation.
static IEnumerable<CalculationData> Calculate( int numberOfDigits )
{
for ( int i = 0; i < numberOfDigits; i += 9 )
{
int nineDigits = NineDigitsOfPi.StartingAt( i + 1 );
int digitCount = Math.Min( numberOfDigits - i, 9 );
string ds = string.Format( "{0:D9}", nineDigits );
yield return new CalculationData( ds.Substring( 0, digitCount ),
i + digitCount );
}
}
|
As
you know, any IEnumerable collection could be turned into IObservable
collection, and could be observed in a background thread.
IObservable<CalculationData> digits = Observable.ToObservable<CalculationData>(
Calculate( this.InputDigitsCount ) );
digits.Subscribe<CalculationData>(
item =>
{
if ( String.IsNullOrEmpty( this.Output ) )
this.Output = "3.";
this.Output = String.Concat( this.Output, item.NextDigits );
}, () =>
{
this.IsCalculationInProgress = false;
} );
|
When to use Rx
Use Rx for orchestrating asynchronous and event-based computations
Code
that deals with more than one event or asynchronous computation gets
complicated quickly as it needs to build a state-machine to deal with
ordering issues. Next to this, the code needs to deal with successful
and failure termination of each separate computation. This leads to
code that doesn’t follow normal control-flow, is hard to understand and
hard to maintain.
Rx makes these computations first-class citizens. This provides a model that allows for readable and composable APIs to deal with these asynchronous computations.
Sample
var scheduler = new ControlScheduler(this);
var
keyDown = Observable.FromEvent<KeyEventHandler, KeyEventArgs>(d
=> d.Invoke, h => textBox.KeyUp += h, h => textBox.KeyUp -= h);
var dictionarySuggest = keyDown
.Select(_ => textBox1.Text)
.Where(text => !string.IsNullOrEmpty(text))
.DistinctUntilChanged()
.Throttle(TimeSpan.FromMilliseconds(250), scheduler)
.SelectMany(
text => AsyncLookupInDictionary(text)
.TakeUntil(keyDown));
dictionarySuggest.Subscribe(
results =>
listView1.Items.AddRange(results.Select(result => new ListViewItem(result)).ToArray()),
error => LogError(error));
|
This sample models a common UI paradigm of receiving completion suggestions while the user is typing input.
Rx creates an observable sequence that models an existing KeyUp event (the original WinForms code did not have to be modified).
It then places several filters and projections on top of the event to make the event only fire if a unique value has come through. (The KeyUp event fires for every key stroke, so also if the user presses left or right arrow, moving the cursor but not changing the input text).
Next it makes sure the event only gets fired after 250 milliseconds of activity by using the Throttle operator. (If the user is still typing characters, this saves a potentially expensive lookup that will be ignored immediately). A scheduler is passed to ensure the 250 milliseconds delay is issued on the UI thread.
In traditionally written programs, this throttling would introduce separate callbacks through a timer. This timer could potentially throw exceptions (certain timers have a maximum amount of operations in flight).
Once the user input has been filtered down it is time to perform the dictionary lookup. As this is usually an expensive operation (e.g. a request to a server on the other side of the world), this operation is itself asynchronous as well.
The SelectMany operator allows for easy combining of multiple asynchronous operations. It doesn’t only combine success values; it also tracks any exceptions that happen in each individual operation.
In traditionally written programs, this would introduce separate callbacks and a place for exceptions occurring.
If the user types a new character while the dictionary operation is still in progress, we do not want to see the results of that operation anymore. The user has typed more characters leading to a more specific word, seeing old results would be confusing.
The TakeUntil(keyDown) operation makes sure that the dictionary operation is ignored once a new keyDown has been detected.
Finally we subscribe to the resulting observable sequence. Only at this time our execution plan will be used. We pass two functions to the Subscribe call:
- Receives the result from our computation.
- Receives exceptions in case of a failure occurring anywhere along the computation.
When to ignore this guideline
If
the application/library in question has very few
asynchronous/event-based operations or has very few places where these
operations need to be composed, the cost of depending on Rx
(redistributing the library as well as the learning curve) might
outweigh the cost of manually coding these operations.
Use Rx to deal with asynchronous sequences of data
Several
other libraries exist to aid asynchronous operations on the .NET
platform. Even though these libraries are powerful, they usually work
best on operations that return a single message. They usually do not
support operations that produce multiple messages over the lifetime of
the operation.
Rx follows the following grammar: OnNext* (OnCompleted|OnError)?
(see chapter 0). This allows for multiple messages to come in over
time. This makes Rx suitable for both operations that produce a single
message, as well as operations that produce multiple messages.
Sample
//open a 4GB file for asynchronous reading in blocks of 64K
var inFile = new FileStream(@"d:\temp\4GBfile.txt", FileMode.Open, FileAccess.Read, FileShare.Read, 2 << 15, true);
//open a file for asynchronous writing in blocks of 64K
var outFile = new FileStream(@"d:\temp\Encrypted.txt", FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 2 << 15, true);
inFile.AsyncRead(2 << 15).Select(Encrypt).WriteToStream(outFile)
.Subscribe(_ => Console.WriteLine("Successfully encrypted the file."),
error => Console.WriteLine("An error occurred while encrypting the file: {0}", error.Message));
|
In this sample, a 4 GB file is read in its entirety, encrypted and saved out to another file.
Reading the whole file into memory, encrypting it and writing out the whole file would be an expensive operation.
Instead, we rely on the fact that Rx can produce many messages.
We read the file asynchronously in blocks of 64K. This produces an observable sequence of byte arrays. We then encrypt each block separately (for this sample we assume the encryption operation can work on separate parts of the file). Once the block is encrypted, it is immediately sent further down the pipeline to be saved to the other file. The WriteToStream operation is an asynchronous operation that can process multiple messages.
When to ignore this guideline
If the application/library in question has very few operations with multiple messages, the cost of depending on Rx (redistributing the library as well as the learning curve) might outweigh the cost of manually coding these operations.
Comments
Post a Comment