Tuesday 2 October 2018

Observer pattern

In the observer pattern there are two types of "things"; the observerable and its observers; the observable can have many observers watching it. The observable pushes notifications to the observers.

you can see from the above that the observable notifies all the things that are observing it that something of interest has occurred within the context of the observable, this notification will invoke a function that is defined within the observers.

Before we dive into the UML of the observer pattern let's take a look at a very simple implementation using an event handler.

using System;

namespace pav.ObserverPattern {
    class Program {
        static void Main(string[] args) {
            var observable = new Observable();
            var o1 = new Observer { Name = "Observer one" };
            var o2 = new Observer { Name = "Observer two" };
            var o3 = new Observer { Name = "Observer three" };

            observable.Notify += o1.Notified;
            observable.Notify += o2.Notified;
            observable.Notify += o3.Notified;

            observable.NotifiyObservers();
        }
    }

    class Observable {
        public event EventHandler Notify;
        internal void NotifiyObservers() => Notify?.Invoke(this, new EventArgs());
    }

    class Observer {
        public string Name { get; set; }
        public void Notified(object sender, EventArgs args) => Console.WriteLine($"{Name} was notified");
    }
}

Above we see that we have an observable class that has a public field of the event type. This is the reference that the observable keeps to the observers execute methods. When the observable invokes the notify event it will execute all the registered methods of the observers.

Now if you are familiar with delegates then this may look very familiar however the two main differences between events and delegates is that events can not be overridden or invoked from outside of their containing class. 

Next let's look at a more structured approach to the observer pattern; c# has predefined generic interfaces for both an observable and its observers.

In this version of the observer pattern there are two main interfaces to realize and those are the IObservable and IObserver, as mentioned above the observable notifies all of the observers that something happened. The IObservable interface only defines one function "Subscribe" and this function returns an implementation of the IDisposable interface, this is meant to create a reference to each observer that can then be used to detach the observer from the observers notification list. As the "Subscribe" function name implies this is also where our observers are registered for notifications.

So let's implement these interfaces


now this looks like a shit show; and it doesn't even contain our application which holds references to the Observable and its Observers, so let's try and make some sense of it. The Observable keeps a list of observers that are "watching" it; for each observer that is Subscribed the Observable returns an unsubscriber object that the observer holds a reference to and uses to detach itself from the observable notification list.

Enough Jibber Jabber, let's get started. Firstly let's set up our Observer, because it's the simplest

class Observer<T> : IObserver<T>
{
    IDisposable unsubscriber;

    public void OnCompleted() => unsubscriber.Dispose();
    public void OnError(Exception error) => Console.WriteLine(error.Message);
    public void OnNext(T value) => Console.WriteLine(value);

    public virtual void Subscribe(IObservable<T> observable) =>unsubscriber = observable?.Subscribe(this);
}

That's all there's to that, the IObserver<T> interface defines three methods:
  • OnCompleted() which is generally used to detach the observer from the observable
  • OnError(error) which is used to let observer know that some sort of error occurred within the observable
  • OnNext(value) which is used execute some sort of logic when then observable signals that something occurred
The Observer class holds a reference to an IDisposable object which facilitates the detachment from the Observable as well as a "Subscribe" method that lets the Observer become linked with the Observerable and get that reference to the Unsubscriber.

next let's take a look at the observable 

class Observable<T> : IObservable<T>
{
    IList<IObserver<T>> observers = new List<IObserver<T>>();

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (!observers.Contains(observer))
            observers.Add(observer);
        return new Unsubscriber<T>(observer, observers);
    }

    public void NotifyObservers(T message)
    {
        foreach (var observer in observers)
            observer.OnNext(message);
    }

    private class Unsubscriber<T> : IDisposable
    {
        //removed for brevity
    }

}

The IObserverable<T> interface only defines one function which is meant to store a reference to the observers and returns that unsubscriber class that implements IDisposable. We provide our Observable with a "NotifyObservers" method which is used to pass a message on to whomever is listening/watching.

Above we left the Unsubscriber nested class empty as not to clutter the Observable, this class is nested because it's existence only makes sense within the context of the Observable.

private class Unsubscriber<T> : IDisposable
{
    IList<IObserver<T>> observers;
    IObserver<T> observer;

    public Unsubscriber(IObserver<T> observer, IList<IObserver<T>> observers)
    {
        this.observer = observer;
        this.observers = observers;
    }

    public void Dispose() => observers?.Remove(observer);

}

now the Unsubscriber also maintains a reference to the observers and the current observer it's associated to, this facilitates the removal of the observer from the observable's notification list.

next let's put it all together with a simple console application to execute our example

using System;
using System.Collections.Generic;

namespace pav.ObserverPatternSimple
{
    class Observer<T> : IObserver<T>
    {
        IDisposable unsubscriber;

        public void OnCompleted() => unsubscriber.Dispose();
        public void OnError(Exception error) => Console.WriteLine(error.Message);
        public void OnNext(T value) => Console.WriteLine(value);

        public virtual void Subscribe(IObservable<T> observable)
            => unsubscriber = observable?.Subscribe(this);
    }

    class Observable<T> : IObservable<T>
    {
        IList<IObserver<T>> observers = new List<IObserver<T>>();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (!observers.Contains(observer))
                observers.Add(observer);
            return new Unsubscriber<T>(observer, observers);
        }

        public void NotifyObservers(T message)
        {
            foreach (var observer in observers)
                observer.OnNext(message);
        }

        private class Unsubscriber<T> : IDisposable
        {
            IList<IObserver<T>> observers;
            IObserver<T> observer;

            public Unsubscriber(IObserver<T> observer, IList<IObserver<T>> observers)
            {
                this.observer = observer;
                this.observers = observers;
            }

            public void Dispose() => observers?.Remove(observer);
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var observable = new Observable<string>();
            var observers = new List<Observer<string>> {                 new Observer<string>(), new Observer<string>(), new Observer<string>() 
            };

            foreach (var observer in observers)
                observer.Subscribe(observable);

            observable.NotifyObservers("One");

            observers[1].OnCompleted();

            observable.NotifyObservers("Two");
        }
    }
}

now as you can see form the above we create an observable and a collection of observers. we subscribe the observable to the observers and call our notification method on our observable.


As you can see the first time around each of our observers writes it's message to the console, however the second time around when we call the "OnComplete" method on our second observer we only get two messages because it used the Unsubscriber to detach itself from the observables notification list.

Next let's look at still a contrived example, but something with a little more substance.

We are going to create a fire alarm as our observable and sprinklers as our observers, the idea being that when our fire alarm is activated it'll send a signal to the sprinklers to activate.

let's start with our Fire alarm.

class FireAlarm : IObservable<bool>
{
    IList<IObserver<bool>> sprinklers = new List<IObserver<bool>>();

    public IDisposable Subscribe(IObserver<bool> sprinkler)
    {
        if (!sprinklers.Contains(sprinkler))
            sprinklers.Add(sprinkler);

        return new Unsubscriber(sprinklers, sprinkler);
    }
       
    public void TurnOn()
    {
        foreach (var sprinkler in sprinklers)
            sprinkler.OnNext(true);
    }

    public void TurnOff()
    {
        foreach (var sprinkler in sprinklers)
            sprinkler.OnNext(false);
    }

    private class Unsubscriber : IDisposable
    {
        //removed for brevity
    }
}

First thing to notice is that the IObservable interface takes in a generic type, now this is the data that the observable will push to its observers, it can be a class or a struct, but in this case we are going to use a simple boolean which represents the state of the fire alarm On or Off.

Next notice that I've removed the implementation for the Unsubscribe class, we'll take a closer look next, but for now let's just focus on the heart of the FireAlarm.

If we inspect the IObservable interface we see that it only defines a contract for the Subscribe function; this function takes in an observer and returns an implementation of IDisposable. This subscribe function must store a reference to all the observers and provide them with functionality to stop listening to our Observable, namely the Unsubscriber class which i'll dive into later.

Our Observable also should have one or more functions or methods to notify the observers that something has occurred, in the above we created TurnOn and TurnOff which both just iterate through the registered observers and invoke the OnNext method with a payload that was defined at the class level when we committed to the IObservable interface and in this case is a bool representing to turn on or off.

now let's take a closer look at the Unsubscriber class

class FireAlarm : IObservable<bool>
{
    // removed for brevity

    private class Unsubscriber : IDisposable
    {
        private IList<IObserver<bool>> sprinklers;
        private IObserver<bool> splrinkler;

        public Unsubscriber(IList<IObserver<bool>> sprinklers, IObserver<bool> splrinkler)
        {
            this.sprinklers = sprinklers;
            this.splrinkler = splrinkler;
        }

        public void Dispose()
        {
            if (splrinkler != null)
                sprinklers?.Remove(splrinkler);
        }
    }
}

In this particular example the Unsubscriber class is a nested class of FireAlarm, this is because the Unsubscriber is meaningless outside the context of the observable; an instance is passed to the observers so that they can use it to remove themselves from the observable's notification list but other than that its existence is of no benefit to the rest of the system.

The unsubscriber class takes in a reference to the sprinkler itself and to the fire alarm's list of sprinklers so that the unsubscriber can access the sprinkler within the dispose method defined by the IDisposable interface and to be able to remove that itself from the fire alarms collection of sprinklers.

Next let's take a look at our Sprinklers

class Sprinkler : IObserver<bool>
{
    public string Name { get; set; }    
    IDisposable unsubscriber;

    public virtual void Subscribe(IObservable<bool> smokeDetector)
    { 
        this.unsubscriber = smokeDetector.Subscribe(this);
    }

    public virtual void Unsubscribe()
    {
        Console.WriteLine($"\r\n{Name} has been decommissioned");
        this.unsubscriber.Dispose();
    }

    public void OnCompleted() => Console.WriteLine("OnCompleted");

    public void OnError(Exception error) => Console.WriteLine("Execption");

    public void OnNext(bool value)
    {
        if (value)
            Console.WriteLine($"{Name}\tactivated");
        else
            Console.WriteLine($"{Name}\tdeactivated");
    }

}

Our Sprinkler has a unsubscriber field of the IDisposable type, this field will hold reference to the unsubscriber class discussed above. This unsubscriber provides the mechanism that our sprinklers (Observers) will use to remove themselves from our FireAlarm's (Observable) notification list.

we have a subscribe method that takes in a reference to the Fire alarm (observable) and simultaneously registers the sprinkler (observer) to receive notifications from the Observable through the fire alarm's Subscribe function that returns a reference to an unsubscribe object which is used to remove the observer from the observable's notification list.

now we also defined an Unsubscribe method that allows the sprinkler to remove itself from the fire alarms notification list via the unsubscriber field that is instantiated within the Subscribe method. Thus far none of the members of the Sprinkler class we've discussed are defined within the IObserver Interface.

The generic IObserver Interface defines three methods: OnCompleted(), OnError(Exception error), and OnNext(); the implementation of logic is really up to you, but the common approach for each is the following:

  • OnCompleted(): this is fired as a notification that the observable will no longer be sending notifications for whatever reason, thus if your observable will no longer send data it makes sense to stop listening, or maybe not, it's really up to you.
  • OnError(Exception error): should your Observer encounter some sort of internal exception here is the place where the observable could pass the exception to the observer and let each individual observer handle the exception there own way. In this example so far there is only one type of observer, however there could be many types.
  • OnNext(T value): this is the method that the observers business logic should fire, for example our sprinkler should turn on or off based on the value parameter, for a dispatcher it may call or recall the fire department based on the value parameter.

next to reiterate the fact that you can have multiple types of observers we created a Fire Dispatcher whose job is to call the fire department in the event that a fire alarm is activated and recall the fire department if deactivated.

class Dispatcher : IObserver<bool>
{
    public string Name { get; set; }
    IDisposable unsubscriber;

    public virtual void Subscribe(IObservable<bool> smokeDetector)
        => this.unsubscriber = smokeDetector.Subscribe(this);

    public virtual void Unsubscribe()
    {
        Console.WriteLine($"\r\n{Name} has been decommissioned");
        this.unsubscriber.Dispose();
    }

    public void OnCompleted()
    {
        Console.WriteLine("the firealarm will no longer dispatch notifications");
        Unsubscribe();
    }

    public void OnError(Exception error) => Console.WriteLine("error ");

    public void OnNext(bool value)
    {
        if (value)
            Console.WriteLine("the fire department has been called");
        else
            Console.WriteLine("false alarm FD has been recalled");

    }
}

As you can see it implements the IObserver interface but it's internal implementation can vary; since both sprinklers and dispatchers implement the IObserver interface they can both be leveraged by the observable.

Finally let's put it all together with a console interface

using System;
using System.Collections.Generic;

namespace pav.ObserverPatternFire
{
    class Program
    {
        static void Main(string[] args)
        {
            List<Sprinkler> sprinklers = new List<Sprinkler>() { new Sprinkler { Name = "Sprinkler One" },
                new Sprinkler { Name = "Sprinkler Two" }, new Sprinkler { Name = "Sprinkler Three" },
                new Sprinkler { Name = "Sprinkler Four" }, new Sprinkler { Name = "Sprinkler Five" }
            };

            FireAlarm fireAlarm = new FireAlarm();
            foreach (var sprinkler in sprinklers)
                sprinkler.Subscribe(fireAlarm);

            Dispatcher dispatcher = new Dispatcher();
            dispatcher.Subscribe(fireAlarm);

            ConsoleKey? commandKey = null;

            do
            {
                PrintMainMenu();
                switch (commandKey)
                {
                    case ConsoleKey.D1:
                        fireAlarm.TurnOn();
                        break;
                    case ConsoleKey.D2:
                        fireAlarm.TurnOff();
                        break;
                    case ConsoleKey.D3:
                        var sprinkler = GetSprinkler(sprinklers);
                        if (sprinkler != null)
                        {
                            sprinkler.Unsubscribe();
                            sprinklers.Remove(sprinkler);
                            PrintMainMenu(false);
                        }
                        break;
                }

            } while ((commandKey = Console.ReadKey().Key) != ConsoleKey.Escape);
        }

        static void PrintMainMenu(bool clear = true)
        {
            if (clear) Console.Clear();
            Console.WriteLine("1) Turn on Fire Alarm");
            Console.WriteLine("2) Turn off Fire Alarm");
            Console.WriteLine("3) Decommission a sprinkler\r\n");
        }

        static Sprinkler GetSprinkler(IList<Sprinkler> sprinklers)
        {
            if (sprinklers.Count == 0)
                return null;

            var index = -1;
            Console.WriteLine("\tSelect sprinker to decommision");
            for (var i = 0; i < sprinklers.Count; i++)
                Console.WriteLine($"\t{i}) {sprinklers[i].Name}");

            while (!int.TryParse(Console.ReadKey().KeyChar.ToString(), out index) && (index < 0 || index > sprinklers.Count)) ;


            return sprinklers[index];
        }

    }

    class FireAlarm : IObservable<bool>
    {
        IList<IObserver<bool>> sprinklers = new List<IObserver<bool>>();

        public IDisposable Subscribe(IObserver<bool> sprinkler)
        {
            if (!sprinklers.Contains(sprinkler))
                sprinklers.Add(sprinkler);

            return new Unsubscriber(sprinklers, sprinkler);
        }

        public void TurnOn()
        {
            foreach (var sprinkler in sprinklers)
                sprinkler.OnNext(true);
        }

        public void TurnOff()
        {
            foreach (var sprinkler in sprinklers)
                sprinkler.OnNext(false);
        }

        private class Unsubscriber : IDisposable
        {
            private IList<IObserver<bool>> sprinklers;
            private IObserver<bool> sprinkler;

            public Unsubscriber(IList<IObserver<bool>> sprinklers, IObserver<bool> sprinkler)
            {
                this.sprinklers = sprinklers;
                this.sprinkler = sprinkler;
            }

            public void Dispose()
            {
                if (sprinkler != null)
                    sprinklers?.Remove(sprinkler);
            }
        }
    }

    class Sprinkler : IObserver<bool>
    {
        public string Name { get; set; }

        IDisposable unsubscriber;

        public virtual void Subscribe(IObservable<bool> smokeDetector)
            => this.unsubscriber = smokeDetector.Subscribe(this);

        public virtual void Unsubscribe()
        {
            Console.WriteLine($"\r\n{Name} has been decommissioned");
            this.unsubscriber.Dispose();
        }

        public void OnCompleted() => Console.WriteLine("OnCompleted");
        public void OnError(Exception error) => Console.WriteLine("Execption");
        public void OnNext(bool value)
        {
            if (value)
                Console.WriteLine($"{Name}\tactivated");
            else
                Console.WriteLine($"{Name}\tdeactivated");
        }
    }

    class Dispatcher : IObserver<bool>
    {
        public string Name { get; set; }
        IDisposable unsubscriber;

        public virtual void Subscribe(IObservable<bool> smokeDetector)
            => this.unsubscriber = smokeDetector.Subscribe(this);

        public virtual void Unsubscribe()
        {
            Console.WriteLine($"\r\n{Name} has been decommissioned");
            this.unsubscriber.Dispose();
        }

        public void OnCompleted()
        {
            Console.WriteLine("the firealarm will no longer dispatch notifications");
            Unsubscribe();
        }

        public void OnError(Exception error) => Console.WriteLine("error ");

        public void OnNext(bool value)
        {
            if (value)
                Console.WriteLine("the fire department has been called");
            else
                Console.WriteLine("false alarm FD has been recalled");
        }
    }

}

Now there are many variations of the observer pattern, I focused on this one because c# provides generic interfaces for the observable and observers.