Donnerstag, 24. Dezember 2015

Akka.NET Adventskalender – Tür 24

Zum Schluss

Zum Abschluss unseres kleinen Ausflugs in die Welt von Akka.NET wollen wir uns noch ein ebenfalls häufig notwendiges Mittel ansehen: den Umgang mit Zeit. Relativ oft müssen wir nach einer bestimmten Zeitspanne entweder bestimmte Aktionen auslösen oder kontrollieren, ob eine Aktion ausgelöst wurde, damit wir eventuell passende Maßnahmen einleiten können. Der vollständige Code dieses Beispiels ist wieder im üblichen github Repository.

Und es ist eigentlich klar, dass wir nicht wieder selbst zum Schraubendreher greifen müssen. Auch dafür gibt es eine vorgefertigte Lösung. Nehmen wir an, wir hätten einen Aktor, dessen ActorRef über die Variable controller erreichbar ist und wir wollen ihm regelmäßig alle 300 Millisekunden aber frühestens in 2 Sekunden die Nachricht Tick senden. Das ist so einfach wie:

system
    .Scheduler
    .ScheduleTellRepeatedly(
        TimeSpan.FromSeconds(2),
        TimeSpan.FromMilliseconds(300),
        controller,
        new Tick(),
        ActorRefs.NoSender);

das einzige was zu erklären ist, ist das letzte Argument. Obiges Programmfragment stammt aus einem Kommandozeilen Programm. Das ist natürlich kein Aktor, kann also nicht als Absender der Nachricht eingetragen werden. Würden wir solch eine Nachricht aus einem Aktor heraus versenden, nutzen wir selbstverständlich Self (also uns) als Absender. Das ActorSystem ist von einem Aktor heraus über Context.System erreichbar.

Und damit sind wir so weit, dass wir unser letztes Akka.NET Programm in dieser Artikelserie von github herunterladen und einmal vorsichtig anstarten können.

Du wirst schnell die Logik hinter dem Programm verstehen: Sämtliche Ausgaben auf den Bildschirm werden aus Gründen der Synchronisierung durch den Writer Aktor ausgeführt, der Controller empfängt alle 300 Millisekunden einen Tick und reagiert, indem er Lichter einer bestimmten Farbe jeweils ein- oder ausschaltet. Jedes einzelne Licht wird durch jeweils einen eigenen Light Aktor repräsentiert. Und wie bei unserem Sudoku Programm nutzen wir den Publish/Subscribe Mechanismus zur Kommunikation, anstelle die einzelnen Aktoren miteinander bekannt zu machen.

Frohes Fest!

Wenn Dir die Artikelserie gefallen hat, schreib etwas darüber. Hat Dir die Artikelserie oder einzelne Teile davon nicht gefallen, schreib mir bitte. Wir haben große Teile von Akka.NET vollkommen unberührt gelassen z.B. Remote, Cluster, Persistence, Tests. Wenn Interesse daran besteht, wären weitere Artikel zu diesen Themen oder ein Workshop auf einer geeigneten Konferenz sicher denkbar.

Ich wünsche euch frohe Weihnachten und ein gutes neues Jahr!

Mittwoch, 23. Dezember 2015

Akka.NET Adventskalender – Tür 23

Hab was vergessen

Leider hat unsere gestrige Variante es nicht geschafft, das schwierige Sudoku zu lösen. Natürlich sind wir selbst daran schuld, denn wir haben eine Sudoku Regel schlicht vergessen zu implementieren. "Jede Ziffer darf pro Zeile, Spalte oder Block nur einmal auftreten." Dass das einfache Sudoku geklappt hat, war purer Zufall (ehrlich gesagt musste ich ein wenig suchen, um ein geeignetes einfaches Sodoku zu finden...). Wir brauchen also noch ein paar weitere Dinge, damit wir auch schwierige Sudoku Aufgaben lösen können.

Als erstes müssen wir uns darum kümmern, dass jemand die Häufigkeit der potentiell platzierbaren Ziffern pro Zeile, Spalte und Block für uns mit verfolgt. Jedesmal wenn eine Zelle eine Ziffer gesetzt bekommt, oder eine Ziffer als mögliche Lösung für eine Zelle ausschließt, müssen wir die passende Statistik dazu anpassen. Stellen wir dabei fest, dass eine Ziffer irgendwo exakt einmal auftritt, dann können wir daraus kombinieren, dass diese Ziffer in der jeweiligen Zeile, Spalte oder Block gesetzt werden darf – sie ist ja die einzige ihrer Art.

Grundsätzlich benötigen wir also 3 Typen (Zeile, Spalte, Block) mal 9 (für jede Ziffer) Aktoren für unsere Statistik nach diesem Muster. Aufgrund der einfacheren Lesbarkeit verzichten wir hier auf die Erzeugung einer Basisklasse und wiederholen den doch recht einfachen Code anstelle ihn zu generalisieren.

using System;
using Akka.Actor;
using SudokuSolver.Messages;
using System.Collections.Generic;
using System.Linq;

namespace SudokuSolver.Actors
{
    public class SudokuCol : SudokuActor
    {
        private readonly int col;

        private List<int> statistics;

        public SudokuCol(int col)
        {
            this.col = col;

            statistics = Enumerable.Range(1, 9).Select(_ => 9).ToList();

            Receive<SetDigit>(SetStatistics, s => s.Col == col);
            Receive<StrikeDigit>(UpdateStatistics, s => s.Col == col);
        }

        private void SetStatistics(SetDigit setDigit)
        {
            var digit = setDigit.Digit;

            statistics[digit - 1] = 1;
        }

        private void UpdateStatistics(StrikeDigit strikeDigit)
        {
            var digit = strikeDigit.Digit;

            if (--statistics[digit-1] == 1)
                Publish(new FindColDigit(col, digit));
        }
    }
}

Und wir müssen unsere Aktoren für die einzelnen Zellen dahingend schlauer machen, dass sie dann wenn Ziffern als potentielle Lösungen ausscheiden, jeweils die passende StrikeDigit Nachricht aussenden sowie auf FindXxxDigit Nachrichten reagieren.

Unsere Sudoku Aktoren für die einzelnen Zellen verarbeiten also diese 3 weiteren Nachrichten:

Receive<FindRowDigit>(FindDigitHandler, f => f.Row == row);
Receive<FindColDigit>(FindDigitHandler, f => f.Col == col);
Receive<FindBlockDigit>(FindDigitHandler, f => f.Block == block);

und reagieren darauf wie folgt:

private void FindDigitHandler(FindDigit findDigit)
{
    var digit = findDigit.Digit;

    if (possibleDigits.Contains(digit))
        Publish(new SetDigit(row, col, digit));
}


Damit haben wir auch die gestern vergessene letzte Sudoku Regel mit implementiert und zählen nun 108 Aktoren, die gemeinsam und allein durch wirres Hin- und Herschreien ein Sudoku lösen. Irgendwo faszinierend, dass das funktionert.

Wiederum ist für den fertigen Code ein github Repository vorhanden.

Dienstag, 22. Dezember 2015

Akka.NET Adventskalender – Tür 22

Lass mal hören

Gestern haben wir die Idee zur Lösung unseres Sudoku Spiels entwickelt (Jeder schreit herum, welche Ziffer er eben erhalten hat, andere ziehen daraus ihre Schlüsse).

Dazu müssen wir unser Feld aus 9x9 Aktoren aufbauen. Dabei müssen wir in keinster Weise daran denken, die einzelnen Aktoren miteinander zu verbinden, denn wir setzen ja auf das Muster "Publish/Subscribe", so dass Mitteilungen einfach abgesetzt werden und interessierte sich für den Empfang von Nachrichten anmelden. Damit ist die Konstruktion unseres Spielfeldes extrem einfach:

for (int row = 0; row < 9; row++)
    for (int col = 0; col < 9; col++)
        system.ActorOf(
            Props.Create(printer, row, col), 
            String.Format("{0}-{1}", row, col));

Die nahfolgend gelistete SudokuCell Klasse zeigt die wesentlichen Teile der Implementierung der Sudoku Zelle. Die einzige Nachricht, die wir bislang definiert haben, war SetCell, die das Kommando ist, mit dem wir die Ziffer einer Zelle direkt setzen können. Alle anderen Aktoren registrieren sich für diese Nachricht und verarbeiten sie, wenn die eben gesetzte Zelle in der gleichen Zeile, der gleichen Spalte oder im gleichen 3x3 Block sitzt.

Dazu nutzen wir eine spezielle Überladung der Receive<> Methode, die es uns erlaubt, ein Prädikat mit anzugeben, welches nur dann einen wahren Rückgabewert liefert, wenn wir uns für die Verarbeitung der Nachricht interessieren. Klingt in Worten schlimmer als im Code, den Du gleich sehen wirst. Die ersten Nachrichten vom Typ SetCell werden beim eintragen der berets bekannten Angaben unseres Sudokus ausgelöst. Damit allerdings werden weitere Zellen beeinflusst und das Sudoku löst sich so mit jeder zusätzlichen Angabe immer weiter.

public class SudokuCell : SudokuActor
{
    private readonly int row;
    private readonly int col;
    private readonly int block;
    private HashSet<int> possibleDigits;
 
    public SudokuCell(IActorRef printer, int row, int col)
        : base(printer)
    {
        this.row = row;
        this.col = col;
        this.block = row / 3 * 3 + col / 3;
  
        possibleDigits = new HashSet<int>(Enumerable.Range(1,9));
  
        Receive<SetDigit>(RestrictPossibilities, IsRowOrColOrBlock);
        Receive<SetDigit>(SolveCell, IsMyCell);
    }
 
    private bool IsRowOrColOrBlock(SetDigit setDigit)
    {
        if (IsMyCell(setDigit))
            return false;

        return setDigit.Row == row
            || setDigit.Col == col
            || setDigit.Block == block;
    }
 
    private bool IsMyCell(SetDigit setDigit)
    {
        return setDigit.Row == row && setDigit.Col == col;
    }


    private void RestrictPossibilities(SetDigit setDigit)
    {
        var digit = setDigit.Digit;
  
        if (possibleDigits.Contains(digit))
        {
            possibleDigits.Remove(digit);
            if (possibleDigits.Count == 1)
                Publish(new SetDigit(row, col, possibleDigits.First()));
        }
    }
    
    private void SolveCell(SetDigit setDigit)
    {
        var digit = setDigit.Digit;
  
        possibleDigits.Clear();
        possibleDigits.Add(digit);
    }
 
    private bool IsSolved(int digit)
    {
        return possibleDigits.Contains(digit) && possibleDigits.Count == 1;
    }
}

Eigentlich war es das. Gut, es kommen noch ein paar Kleinigkeiten dazu, damit das ganze tatsächlich funktioniert. Aber bevor wir solche Dinge herunterleiern und Du mühsam alles zusammentragen musst, lade Dir lieber den Code von meinem github Repository.

Wenn Du experimentierfreudig bist, und in der Kommandozeilen-Anwendung versuchst, das schwierige Soduko auflösen zu lassen, wirst Du eine Überraschung erleben – es löst sich nicht. Unser Ansatz war also doch zu einfach.

Müssen wir also morgen nochmal ran.

Montag, 21. Dezember 2015

Akka.NET Adventskalender – Tag 21

Was der Franke unter "blägn" versteht

Während der vergangenen Sprints haben wir uns immer an einzelne bekannte Aktoren gewandt, wenn wir Nachrichten verschickt haben. Diese Woche werden wir uns eine weitere Benachrichtigungs-Strategie ansehen: Ausstrahlung (Broadcast). Das bedeutet, dass wir einen Absender einer Nachricht und keinen, einen oder beliebig viele Empfänger dieser Nachricht haben werden.

Dieses Muster wird auch als "Publish/Subscribe" bezeichnet. Selbstverständlich beherrscht Akka.NET auch dieses Muster. Diesmal werden wir nicht nochmal das Rad neu erfinden, indem wir dieses Muster nachprogrammieren. Stattdessen werden wir direkt die Möglichkeiten nutzen, die wir standardgemäß zur Verfügung haben.

Um eine Nachricht oder ein Ereignis (Ereignisse waren definiert als etwas vergangenes, daher nutzen wir in solchen Fällen gerne die Vergangenheit beim Verb) ausstrahlen möchten, sieht das so aus:

Context.System.EventStream.Publish(new SomethingHappened());

Wenn wir auf ein Ereignis oder eine Nachricht lauschen wollen, dann können wir eine dieser Methoden dazu einsetzen:

protected override void PreStart()
{
    // the most basic way of subscribing
    Context.System.EventStream.Subscribe(Self, typeof(SomethingHappened));
 
    // if you are `using Akka.Event;` you might also do:
    Context.System.EventStream.Subscribe<SomethingHappened>(Self);
}

Um zu demonstrieren, wie leistungsfähig das Ausstrahlen und Abonnieren von Nachrichen ist, werden wir ein kleines Sudoku Spiel zusammen programmieren. Die Idee dazu kam mir kürzlich nach einem Event-Storming Workshop, bei dem knapp 20 Personen im Raum scheinbar planlos umherschossen und am Ende ein durchdachtes Konzept an der Tafel stand. Faszinierend :-)

Und genau so können wir beim Sudoku vorgehen. Wir setzen in jedes einzelne Feld einen Aktor, der für genau dieses Feld zuständig ist. Jeder dieser Aktoren schnappt wertvolle Hinweise darüber auf, ob in seiner Zeile, seiner Spalte oder seinem Block etwas verändert wurde und streicht eventuell eben gesetzte Ziffern von seiner Liste der noch zur Verfügung stehenden Möglichkeiten. Bleibt nur noch eine Ziffer übrig, so ist diese Zelle gelöst. Das wiederum erfahren alle anderen und so löst sich das Rätsel schrittweise von selbst.

Da wir noch mehr Aktoren brauchen werden, packen wir die von allen gemeinsam genutzten Funktionalitäten in eine Basisklasse (morgen gibt es wieder ein github Repository). In dieser Basisklasse verhalten wir uns auch sehr tolerant, indem wir nicht verarbeitete Ereignisse einfach ignorieren. Normalerweise sollte man sich so etwas wirklich gut überlegen.

using Akka.Actor;
using Akka.Event;
using SudokuSolver.Messages;

namespace SudokuSolver.Actors
{
    /// 
    /// Base class for all Sudoko actors
    /// 
    public class SudokuActor : ReceiveActor
    {
        private readonly IActorRef printActor;

        public SudokuActor(IActorRef printActor)
        {
            this.printActor = printActor;
        }

        protected override void PreStart()
        {
            Context.System.EventStream.Subscribe<SetDigit>(Self);
        }

        protected void Publish(object message)
        {
            Context.System.EventStream.Publish(message);
        }

        protected override void Unhandled(object message)
        {
            // we do nothing, just ignore unhandled messages
        }
    }
}

und natürlich müssen wir die Nachricht definieren, die die Ziffer einer Zelle setzt. Darauf lauschen ja alle Zellen und ziehen ihre Schlußfolgerungen.

namespace SudokuSolver
{
    public class SetDigit
    {
        public int Row { get; set; }
        public int Col { get; set; }
        public int Block { get { return Row / 3 * 3 + Col / 3; } }

        public int Digit { get; set; }

        public SetDigit(int row, int col, int digit)
        {
            Row = row;
            Col = col;
            Digit = digit;
        }
    }
}

Wie wir dann tatsächlich das Sudoku lösen, verrate ich euch morgen.

Sonntag, 20. Dezember 2015

Akka.NET Adventskalender – Tag 20

Nur zusammen ergibt alles Sinn

An den vergangenen zwei Tagen haben wir uns theoretisch mit Map/Reduce befasst. Heute wollen wir uns die Programmierung dazu näher ansehen (Hinweis: am Ende steht ein Link auf ein github Repository – Du musst also nicht so viel tippen heute).

Der erste Aktor, der die Rohdaten verarbeitet, ist der Map Aktor. Wie beabsichtigt erzeugt er eine Liste von {Sprache, Anzahl} Tupeln, wobei die Anzahl für alle erzeugten Listeneinträge "1" ist. Das Ergebnis wird dem Absender geantwortet und kommt so beim Master wieder an.

using System;
using MapReduce.Messages;
using Akka.Actor;
using System.IO;
using System.Linq;

namespace MapReduce.Actors
{
    public class Mapper : ReceiveActor
    {
        public Mapper()
        {
            Receive<string>(Map);
        }

        private void Map(string input)
        {
            var mapResult = new MapResult();

            using (var reader = new StringReader(input)) 
            {
                string line;
                while ((line = reader.ReadLine()) != null) 
                {
                    if (!String.IsNullOrWhiteSpace(line))
                    {
                        var language = line.Split(new [] { '|' }).Last().Trim();

                        mapResult.Counts.Add(new LanguageCount(language));
                    }
                }
            }

            Sender.Tell(mapResult);
        }
    }
}

Die MapResult Nachricht, die der Map Aktor erzeugt, wird durch den Master an den Reduce Aktor weiter gereicht, der sie dann entsprechend gruppiert:

using System;
using Akka.Actor;
using MapReduce.Messages;

namespace MapReduce.Actors
{
    public class Reducer : ReceiveActor
    {
        public Reducer()
        {
            Receive<MapResult>(Reduce);
        }

        private void Reduce(MapResult mapResult)
        {
            var reduceResult = new ReduceResult();

            foreach (var count in mapResult.Counts)
            {
                if (reduceResult.Result.ContainsKey(count.Language))
                {
                    reduceResult.Result[count.Language] += count.Count;
                }
                else
                {
                    reduceResult.Result[count.Language] = count.Count;
                }
            }

            Sender.Tell(reduceResult);
        }
    }
}

Da sowohl der Map Aktor als auch der Reduce Aktor parallel arbeiten und beide mehrfach existieren, brauchen wir noch einen einzelnen Aktor, der alle Ergebnisse nochmals zusammenfasst. Der Code ist relativ ähnlich zum Reduce Aktor, allerdings wird das finale Ergebnis im Aggregator Aktor gespeichert und kann anschließend abgerufen werden.

using System;
using Akka.Actor;
using MapReduce.Messages;

namespace MapReduce.Actors
{
    public class Aggregator : ReceiveActor
    {
        private ReduceResult reduceResult;

        public Aggregator()
        {
            reduceResult = new ReduceResult();

            Receive<ReduceResult>(Aggregate);
            Receive<GetResult>(_ => Sender.Tell(reduceResult));
        }

        private void Aggregate(ReduceResult result)
        {
            foreach (var language in result.Result.Keys)
            {
                if (reduceResult.Result.ContainsKey(language))
                {
                    reduceResult.Result[language] += result.Result[language];
                }
                else
                {
                    reduceResult.Result[language] = result.Result[language];
                }
            }
        }
    }
}

Wie erwähnt, sind alle Beispiele in diesem git Repository auf github abgelegt.

Liefen wirklich die Aktoren parallel?

Falls es Dir auch schon aufgefallen ist: Glückwunsch! Kaum etwas lief parallel bislang. Wir hatten jeweils nur einen Aktor und die Daten flossen vom einen zum anderen. Einzig zu dem Zeitpunkt, als wir mehr als ein "Dokument" analysieren ließen, konnten der eine Map Aktor und der eine Reduce Aktor parallel laufen.

Aber Du erinnerst Dich sicher noch an unser Konstrukt von vor 3 Tagen? Wenn Du das nutzt, dann werden wir parallel ablaufende Aufgaben erreichen.

someActor = Context.ActorOf(
    Props.Create<SomeActorClass>()
         .WithRouter(new RoundRobinPool(NrWorkers))

Falls Du noch mehr Beispiele für den Einsatz von Map/Reduce suchst, mir hat das Beispiel "Freunde finden" sehr gefallen.

Morgen werden wir unseren letzten Sprint starten. Wir haben bisher einzelne und zahlreiche Aktoren untersucht. Was Nachrichten anging, haben wir stets einen Empfänger addressiert. Das werden wir nächste Woche ändern.

Samstag, 19. Dezember 2015

Akka.NET Adventskalender – Tür 19

Datenmengen schrittweise in Griff bekommen

Gestern haben wir uns mit den Grundlagen von Map/Reduce befasst und die Vorteile dieses Algorithmus bei großen Datenmengen kennengelernt. Heute werden wir uns an unserem konstruierten Beispiel ansehen, welche Schritte notwendig sind.

Als erstes benötigen wir eine zentrale Instanz. Nennen wir ihn Master. Er koordiniert alle anderen Beteiligten, verteilt und sammelt wieder ein.

using System;
using Akka.Actor;
using MapReduce.Actors;
using MapReduce.Messages;

namespace MapReduce.Actors
{
    public class Master : ReceiveActor
    {
        IActorRef mapper;
        IActorRef reducer;
        IActorRef aggregator;

        public Master()
        {
            mapper = Context.ActorOf(Props.Create<Mapper>());
            reducer = Context.ActorOf(Props.Create<Reducer>());
            aggregator = Context.ActorOf(Props.Create<Aggregator>());

            // 1. forward a string to mapper
            Receive<string>(mapper.Tell);

            // 2. forward map result to reducer
            Receive<MapResult>(reducer.Tell);

            // 3. forward reduce result to aggregator
            Receive<ReduceResult>(aggregator.Tell);

            // allow asking for aggregated result at any time
            Receive<GetResult>(aggregator.Forward);
        }
    }
}

Als nächstes definieren wir die diversen Nachrichten, die von den diversen Beteiligten gesendet und empfangen werden.

In Objekten der Klasse LanguageCount wollen wir Tupel der Form (Sprache, Anzahl) festhalten. Wir werden das öfter benötigen.

namespace MapReduce.Messages
{
    public class LanguageCount
    {
        public string Language { get; set; }
        public int Count { get; set; }

        public LanguageCount(string language, int count = 1)
        {
            Language = language;
            Count = count;
        }
    }
}

Der Map Schritt erzeugt Listen solcher Tupel:

using System;
using System.Collections.Generic;
using System.Linq;

namespace MapReduce.Messages
{
    public class MapResult
    {
        public List<LanguageCount> Counts { get; set; }

        public MapResult()
        {
            Counts = new List<LanguageCount>();
        }

        public override string ToString()
        {
            return string.Format("[MapResult: {0}]", 
                String.Join(", ", 
                    Counts.Select(c => String.Format("{0}:{1}", 
                        c.Language, c.Count))
                )
            );
        }
    }
}

Wie schon erwähnt wird diese Nachricht dann an den Reduce Schritt übergeben, der dann eine nach Sprache gruppierte Liste erzeugt. Das ist mit einer Dictionary in .NET wunderbar abbildbar.

using System;
using System.Collections.Generic;
using System.Linq;

namespace MapReduce.Messages
{
    public class ReduceResult
    {
        public Dictionary<string,int> Result { get; set; }

        public ReduceResult()
        {
            Result = new Dictionary<string,int>();
        }

        public override string ToString()
        {
            return string.Format("[ReduceResult: {0}]", 
                String.Join(", ", 
                    Result.Keys.Select(l => String.Format("{0}:{1}", l, Result[l]))
                )
            );
        }

    }
}

Mit diesen Hilfsmitteln ausgestattet werden wir morgen die notwendige Logik programmieren.

Freitag, 18. Dezember 2015

Akka.NET Adventskalender – Tür 18

Wenn's mal ganz heftig wird

Sicher hast Du schon einmal von "Map/Reduce" gehört – mir ist der Begriff auch vor ein paar Tagen rausgerutscht. Erfunden wurde der Algorithmus von Google mit dem Ziel, große Datenmengen auf verteilten Systemen verarbeiten zu können. Eine zentrale Instanz kümmert sich um die Partitionierung und Verteilung der Daten sowie der Sammlung der Ergebnisse sowie um die Überwachung aller Beteiligten.

Wie funktioniert Map/Reduce?

Der erste Schritt, "Map" genannt, erzeugt aus den Rohdaten eine Liste mit Schlüsseln und einem Wert, der später von Bedeutung ist. Der Wert kann im einfachsten Fall die Zahl "1" sein. Die Mächtigkeit der Liste kann hierbei sehr groß sein. Unter Umständen kann sogar ein Rohdatensatz mehrere Werte erzeugen (z.B. wenn man sich für Vokale in Worten interessiert könnte das Wort "Eisenbahnschiene" die Liste {e,4}, {i,2}, {a,1} produzieren).

Diese Liste wird dann im "Reduce" Schritt nach den Schlüsseln der ersten Liste gruppiert und mit den Werten eine vorher festgelegte Operation ausgeführt (z.B. Summierung). Das Ergebnis ist wieder eine Liste mit so vielen Einträgen, wie es unterschiedliche Schlüssel gibt. Aus unserem obigen Vokal-Beispiel würde dann eine Liste mit nur den 5 Schlüsseln "a,e,i,o,u" und der entsprechenden Summe der Anzahlen werden.

Sowohl der "Map", als auch der "Reduce" Schritt kann auf Teilmengen der (großen) Original-Daten operieren und auf beliebig viele Systeme verteilt werden. Auf diese Weise lassen sich solche Aufgaben trotz der großen Datenmengen im Vergleich zu iterativen Ansätzen, die nur auf einer Maschine laufen, in deutlich kürzeren Zeiten abwickeln.

Was wollen wir analysieren?

Das kleine Map/Reduce Projekt, das wir anlegen werden, soll folgende Aufgaben erfüllen:

  • Aus einer großen Anzahl von Text Dokumenten (wir haben ein Dokument pro Stadt) extrahieren wir Zeilen, die aus Werten für Firmen-Name, Name eines Entwicklers und der von ihm benutzbaren Programmiersprache. Spricht ein Entwickler mehrere Sprachen, existieren mehrere Zeilen für ihn. Und besitzt eine Firma mehrere Entwickler, wiederholen sich die Zeilen ebenfalls.
  • Der "Map" Schritt bekommt diese Zeilen und erzeugt eine Liste aus {Sprache, 1} Tupeln.
  • Der "Reduce" Schritt gruppiert die Liste nach Sprache und summiert die Zahlen hoch.
  • Der Aggregator (unsere zentrale Instanz) erhält die Liste, die der Reduce-Schritt erzeugt hat und fasst beliebig viele davon zu einer Liste zusammen.
  • Das Ergebnis ist eine Übersicht, welche Programmiersprachen von wie vielen Entwicklern gesprochen wird.
Heute werden wir unsere Umgebung aufsetzen und das Gerüst unserer Kommandozeilen Applikation erstellen. (Pssst: Du kannst aber auch bis übermorgen warten, dann gibt es wieder ein git Repository).

Damit die Anwendung einfach und ohne problematische Pfad Handhabung zwischen Windows und *nix portierbar wird, habe ich die "Dateien" als statische Methoden in die Applikation eingebaut. die viel gepriesenen großen Datenmengen werden zum Zwecke der Demonstration auch etwas übersichtlicher ausfallen.

using System;
using Akka.Actor;
using MapReduce.Actors;

namespace MapReduce
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("MapReduce");
            var master = system.ActorOf(Props.Create<Master>());

            master.Tell(Sample1());
            master.Tell(Sample2());

            Console.WriteLine("Press [enter] to continue");
            Console.ReadLine();

            system.Shutdown();
            system.AwaitTermination();
        }

        private static string Sample1()
        {
            return @"
ACME, Inc | Mr. Green | C#
ACME, Inc | Mr. Blue | JavaScript

Facilities | Coolman | Ruby

FastCoders | Speedy | C#

SlowCoders | Snail | Java
";
        }

        private static string Sample2()
        {
            return @"
Foo | Mr. Green | C#
Foo | Mr. Blue | Java
Foo | Mr. Black | C#
Foo | Mr. White | F#

Bla | Coolman | Python
Bla | Catwoman | C#

SlowMovers | Snail | C#
SlowMovers | Snail | F#
";
        }
    }
}

Morgen werden wir die Aktoren für die beiden Haupt-Aufgaben hinzufügen.

Donnerstag, 17. Dezember 2015

Akka.NET Adventskalender – Tür 17

Die Router im Einzelnen

Hoffentlich hat sich Dein Frust von gestern über die umständliche Lösung wieder gelegt. Aber ich denke es macht Sinn, so grundlegende Dinge wie einen Router zu verstehen. Es könnte Umstände geben, unter denen Du möglicherweise doch einen speziellen Router benötigst und Dir die Bordmittel von Akka.NET eben nicht weiter helfen.

Ansonsten werden wir uns jetzt den Weg ansehen, auf dem wir das gestrige Problem auch ohne einen eigenen Router hätten lösen können. Wir hätten im Inneren unseres Master Routers diese Sequenz verwenden können, um unsere Worker zu erzeugen:

worker = Context.ActorOf(
    Props.Create<Worker>()
         .WithRouter(new RoundRobinPool(NrWorkers))
);

Technisch gesehen wäre genau das gleiche passiert wie mit unserem Router Marke Eigenbau. Akka.NET kennt eine Vielzahl an verschiedenen Router Modellen, die auf die gleiche Weise zum Einsatz kommen können.

Zunächst unterscheidet Akka.NET zwischen

  • Pool: hierbei werden die Aktoren vom Router angelegt
  • Group: es werden Pfade auf vorhandene Aktoren dem Router übergeben. Mittels einer ActorSelection wird dem bzw. den ausgewählten Routern die Nachrichten übermittelt.
Und was die Routing Strategien angeht, so stehen zur Verfügung:
  • RoundRobin: reihum wird der nächste der zur Verfügung stehenden Aktoren beauftragt.
  • Broadcast: alle Aktoren erhalten die Nachricht gleichzeitig
  • Random: ein Ziel-Aktor wird zufällig gewählt
  • ConstantHash: basierene auf eine Eigenschaft der Nachrichten (z.B. eine ID) wird einer der Ziel-Aktoren ausgewählt. So kann sichergestellt werden, dass eine bestimmte ID für verschiedene Nachrichten-Typen immer beim gleichen Aktor landet. 
  • TailChopping: Sollte ein zufälliger Aktor nicht innerhalb einer gewählten Zeitspanne antworten, so wird ein anderer bestimmt in der Hoffnung, d ass der antwortet
  • ScatterGatherFirstCompleted: Eine Nachricht wird zu allen Routern gleichzeitig gesandt und der, der zuerst die Antwort weiß, gewinnt.
  • Smallest Mailbox: Der Router mit den wenigsten Nachrichten in dessen Mailbox erhält den Zuschlag

Ich möchte an dieser Stelle lieber auf die Akka.NET Dokumentation verweisen, denn dort sind die einzelnen Router und deren Benutzung sehr genau erklärt.

Morgen werden wir uns wieder ein neues Thema vornehmen.

Mittwoch, 16. Dezember 2015

Akka.NET Adventskalender – Tür 16

Endlich Parallel

Gestern haben wir erfolgreich unsere Rechenaufgabe in kleinere Teile zerlegt, die wir allerdings nur einem Worker Aktor zur Bearbeitung gegeben haben. Weil jeder Aktor nacheinander seine Nachrichten abarbeitet, konnten wir leider noch keinen Erfolg verbuchen, was die Verbesserung der Laufzeit unserer Berechnung angeht.

Wir müssen uns also etwas einfallen lassen, um die Aufgabe tatsächlich zu parallelisieren. Nur so können wir die erwartete Beschleunigung erhalten. Auch nun ahnst Du sicher schon, was kommt: Neue Aufgabe – neuer Aktor. Richtig! Das Muster hinter dem wir heute her sind, trägt den Namen "Router". Ein Router enpfängt Nachrichten und verteilt diese an einen oder mehrere Aktoren nach einem in ihm definierten Regelwerk. Wenn wir also einen Router erzeugen, der mehrere Aktoren hat, unter denen er alle eintreffenden Rechen-Aufgaben sinnvoll aufteilt (z.B. nach Round Robin), dann müsste sich doch die ersehnte Beschleunigung erreichen lassen.

Also passen wir zunächst unsere Erzeugung von Worker Aktoren im Master so an, dass wir anstelle eines Worker Aktors den Router erzeugen. Dieser kümmert sich dann um den Rest, verhält sich aber aus Master Sicht genau wie ein Worker.

public Master()
{
    // removed sequential worker
    // worker = Context.ActorOf(Props.Create<Worker>());
    worker = Context.ActorOf(Props.Create<RoutingWorker>());
 
    Receive<CalculatePi>(c => DoCalculation(c));
    Receive<double>(s => AddToSum(s));
}

Jetzt müssen wir uns für den Router die passende Logik einfallen lassen. Problem dabei ist, dass unsere Worker ihre Ergebnisse via "Sender.Tell()" dem Master mitgeteilt haben. Wenn wir nun einen Router zwischen die beiden setzen, müssen wir im Router entweder genau die Struktur der Antworten kennen und diese dann unsererseits weiterleiten – oder es gibt noch einen Trick. Zum Glück ist Letzteres der Fall, da Konstrukte wie ein Router doch recht häufig notwendig werden. Ein Aktor bietet eine Forward() Methode, mit der man Nachrichten unter Beibehaltung des ursprünglichen Absenders weiter leiten kann. Das wenden wir nun an wenn wir unseren Router schreiben:

public class RoutingWorker : ReceiveActor
{
    const int NrWorkers = 4;
    
    private IActorRef[] workers;
    private int nextWorker;
    
    public RoutingWorker()
    {
        workers = new IActorRef[NrWorkers];
        for (var i = 0; i < NrWorkers; i++)
            workers[i] = Context.ActorOf(Props.Create<Worker>());
        nextWorker = 0;
        
        Receive<Worker.CalculateRange>(r => ForwardToWorker(r));
    }
    
    private void ForwardToWorker(Worker.CalculateRange message)
    {
        workers[nextWorker].Forward(message);
        nextWorker = (nextWorker + 1) % NrWorkers;
    }
}

Sieht wie üblich nicht allzu kompliziert aus. Wenn Du nun das Berechnungsprogramm startest, wirst Du abhängig von Deinem Rechner (genauer gesagt den Fähigkeiten Deiner CPU) und der Konstante NrWorkers mehr oder weniger große Unterschiede feststellen können. Am besten Du nimmst Dir ein paar Minuten und spielst ein wenig mit diesem Parameter. Bin gespannt wie hoch Deine Steigerung ausfällt...

Da das Muster "Routing" so häufige Verwendung findet, bietet Akka.NET dafür eine sehr einfache Lösung an. Ich hoffe Du verteufelst mich nun nicht dafür, dass wir zuerst den umständlichen Weg gegangen sind und unseren Router selbst geschrieben haben. Aber es gibt ja das goldene Gesetz der IT, dass jeder Entwickler während seiner Karriere einmal ein JavaScript-Framework, eine HTML Template Engine und einen Akka.NET Router geschrieben haben muss. Was den letzten Punkt angeht, den können wir hiermit gemeinsam abhaken. Ab morgen kennen wir einen besseren Weg.

Dienstag, 15. Dezember 2015

Akka.NET Adventskalender – Tür 15

Teile und Herrsche

Gestern sind wir auf ein Problem gestoßen. Eine lang laufende CPU-intensive Berechnung schrie förmlich nach Verbesserung. Der Gedanke zur Lösung war, die Aufgabe in kleinere voneinander unabhängige Teile zu zerlegen, die wir dann jeweils einzelnen Akoten zur Bearbeitung geben konnten.

Damit wir besser mit verschiedenen Partitionierungs-Strategien experimentieren können, erzeugen wir uns zunächst einen Generator, der den Bereich von dem wir ausgehen, in eine vorgegebene Anzahl jeweils gleich großer Bereiche aufteilt.

Dafür definieren wir eine Klasse Range, die jeweils den Bereich (inklusive der Grenzen) definiert:

public class Range
{
    public int StartAt { get; set; }
    public int EndAt { get; set; }

    public Range (int startAt, int endAt)
    {
        StartAt = startAt;
        EndAt = endAt;
    }

    public override string ToString()
    {
        return string.Format("[Range: StartAt={0}, EndAt={1}]", 
            StartAt, EndAt);
    }
}

Die Überladung von ToString() brauchen wir später um einige Diagnose-Ausgaben einfacher erzeugen zu können. Und eine Generator-Methode könnte so aussehen:

public static IEnumerable<Range> Distribute(int rangeMax, int nrParts)
{
    int rangeStartAt = 0;
    int nrPartsRemaining = nrParts;

    while (nrPartsRemaining > 0)
    {
        int rangeEndAt = rangeStartAt + 
            (rangeMax - rangeStartAt) / nrPartsRemaining;
        
        yield return new Range(rangeStartAt, rangeEndAt);
        
        rangeStartAt = rangeEndAt + 1;
        nrPartsRemaining--;
    }
}

Zur Verdeutlichung ein paar Beispiele:

20 - 3 parts: 
    [Range: 0...6], 
    [Range: 7...13], 
    [Range: 14...20]

1000 - 6 parts: 
    [Range: 0...166], 
    [Range: 167...333], 
    [Range: 334...500], 
    [Range: 501...667], 
    [Range: 668...834], 
    [Range: 835...1000]

Bewaffnet mit dieser Verteilungs-Logik können wir nun einen Aktor erzeugen, der die Berechnung der Summe für einen bestimmten Bereich vornimmt. Erst mal belassen wir es bei einem rechnenden Aktor bis wir wissen ob alles funktioniert, danach teilen wir auf.

Tatsächlich allerdings brauchen wir zwei Aktoren, einen Master, der die Arbeit an seine Worker verteilt, die Ergebnisse sammelt und die tatsächliche Summe bildet und die Worker, die die Berechnungen ausführen.

Die zentralen Teile des Master könnten so aussehen:

public class Master : ReceiveActor
{
    public class CalculatePi {}

    IActorRef worker;
    double sum;

    public Master()
    {
        worker = Context.ActorOf(Props.Create<Worker>());

        Receive<CalculatePi>(c => DoCalculation(c));
        Receive<double>(s => AddToSum(s));
    }

    private void DoCalculation(CalculatePi _)
    {
        sum = 0;

        foreach (var range in Distribute(RangeTo, NrParts))
            worker.Tell(new Worker.CalculateRange(range));
    }

    private void AddToSum(double d)
    {
        sum += d;
    }
}

und unser Worker könnte zum Beispiel so implementiert werden:

public class Worker : ReceiveActor
{
    public class CalculateRange : Range
    {
        public CalculateRange(Range range) 
            : base(range) {}

        public CalculateRange(int startAt, int endAt) 
            : base(startAt, endAt) {}
    }
 
    public Worker()
    {
        Receive<CalculateRange>(CalculateRangeHandler);
    }
 
    private void CalculateRangeHandler(CalculateRange range)
    {
        // calculate
        double sum = 0;
        for (int n = range.StartAt; n <= range.EndAt; n++)
            sum += Math.Pow(-1, n) / (2 * n + 1);

        // send back result. If we multiply by 4 here, the sender will
        // automatically contain pi.
        Sender.Tell(sum * 4);
    }
}

Damit Du das ganze nicht mühsam abtippen musst, habe ich ein kleines github Repository vorbereitet, in dem die gesamte Applikation steckt.

Damit haben wir die Arbeit in gleiche Teile aufgeteilt und diese jeweils an unsere Worker Aktoren weitergegeben. Allerdings haben wir derzeit nur eine Instanz des Worker Aktors, der die eigentliche Arbeit erledigt. Sicher errätst Du was das bedeutet – die Gesamtzeit für die Lösung der Aufgabe hat sich leider noch nicht verbessert. Die Ursache liegt auf der Hand: wir haben nur einen Aktor, der alle Nachrichten empfängt und es ist ja ein wesentliches Merkmal eines Aktors immer nur eine Nachricht zu verarbeiten.

Damit bleibt noch etwas Raum für Verbesserungen, die wir morgen in Angriff nehmen werden.

Montag, 14. Dezember 2015

Akka.NET Adventskalender – Tür 14

Wenn Aktoren im Bündel auftreten 

Bislang hatten wir von jedem Aktor, den wir geschaffen haben, immer nur eine Instanz am Laufen. Damit konnten wir Aufgaben auf mehrere "Schultern" verteilt abarbeiten lassen anstelle alles von einem Aktor erledigen zu lassen. Jeder Aktor war dabei komplett für jeweils eine Teilaufgabe zuständig. Was passiert nun, wenn wir etwa eine CPU-intensive Aufgabe lösen wollen? Schaffen wir es die Arbeit ebenfalls unter mehreren Aktoren aufzuteilen? Damit werden wir uns diese Woche befassen.

Damit wir ein Gefühl für die Vorteile der parallelen Bearbeitung bekommen, sehen wir uns von einer Aufgabenstellung zunächst die iterative Version an und haben dann einen Ausgangspunkt für Verbesserungen.

Bei der Scala-Version von Akka habe ich ein nettes Tutorial gefunden, in welchem Pi berechnet wird. Das ist eine verständliche aber auch leicht parallelsierbare Aufgabe. Also legen wir los und sehen zu wie wir die Aufgabe mit konventionellen Mitteln vermutlich gelöst hätten:

using System;
using System.Diagnostics;

namespace PiIterative
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            Console.WriteLine("Calculating PI -- please be patient...");

            const int million   = 1000 * 1000;
            const int rangeFrom = 0;
            const int rangeTo   = 200 * million;

            var stopWatch = Stopwatch.StartNew();

            double sum = 0;
            for (int n = rangeFrom; n <= rangeTo; n++)
                sum += Math.Pow(-1, n) / (2 * n + 1);

            stopWatch.Stop();

            Console.WriteLine("Pi = {0}, Elapsed: {1:F1}s", 
                4 * sum, 
                stopWatch.ElapsedMilliseconds / 1000.0);
        }
    }
}

Mein MacBook Air von 2011 braucht für diese Aufgabe unter Mono etwa 8 Sekunden. Wenn Deine Maschine schneller ist, erhöhe einfach die Anzahl der Iterationen, damit wir später auch Steigerungen erleben können...

Wir Du siehst, ist dieser Algorithmus relativ einfach und besteht lediglich aus ein paar Millionen einzelner Schritte, die es auszuführen gilt. Jeder einzelne Schleifendurchlauf ist sicher nicht der Rede Wert, aber bei 200 Millionen oder mehr Wiederholungen summiert sich die Rechenzeit eben doch.

Wollen wir diese Aufgabe parallelisieren, müssen wir einen geeigneten Kompromiss finden. 200 Millionen Aktoren einzusetzen wäre sicher unklug. Allein die Instantiierungs-Zeit dafür würde den Rechenaufwand eines Schleifendurchlaufs weit übersteigen. An den Speicherbedarf mag ich gar nicht denken. Also macht es vermutlich Sinn, mit relativ wenigen Aktoren zu arbeiten, die dann große (aber eben nicht die vollständige Menge) Arbeitspakete erhalten. Da wir nur den Anfangs- und Endwert der Schleifendurchläufe benötigen, sind die Aufgaben-Pakete einfach zu transportieren. Anschließend hat jeder Aktor ein wenig zu tun und liefert seinen Teil an der Gesamtsumme ab. Irgendwer muss dann noch die Teilsummen aufaddieren und Pi ist berechnet.

Im Detail sehen wir uns das morgen an.

Sonntag, 13. Dezember 2015

Akka.NET Adventskalender – Tür 13

Ströme beeinflussen

Wie wir gestern gesehen haben, hat eine "Pipe and Filters" Architektur eine sehr positive Eigenschaft: Erweiterbarkeit. Wir können an jeder Stelle unserer Verarbeitungskette weitere Komponenten hinzufügen und damit die Funktionalität anpassen oder erweitern.

Heute werden wir "berechnende" Aufgaben in unsere Verarbeitungskette einfügen. Das einfache Beispiel, das wir heute erstellen werden, ist ein Wort-Zähler. Bestimmt kommt dir das Stichwort "Map-Reduce" dabei in den Sinn, wenn es um solche Aufgaben geht. Allerdings unterscheidet sich der Ansatz, den Map-Reduce geht deutlich von dem, was wir heute machen. Map-Reduce wurde erfunden, um große Datenmengen parallel zu verarbeiten und damit die Verarbeitungszeit zu reduzieren. Genau das werden wir in ein paar Tagen auch tun. Für heute werden wir uns mit einem "rechnenden" Aktor begnügen, den wir in unsere Verabeitungskette einfügen. Dadurch bekommen wir möglicherweise einen bestimmten Grad an Parallelität, denn die hinterander geschalteten Stufen können durchaus gleichzeitig tätig sein.

Aber zurück zu unserem eigentlichen Anliegen: Wörter zählen. Bislang lesen wir ja unsere Textdatei zeilenweise, insofern müssen wir uns um zwei Dinge kümmern: Zeilen in Wörter zerlegen und diese anschließend zählen. Du ahnst es schon: wir werden zwei zusätzliche Komponenten in unsere Kette einfügen.

Fangen wir mit dem Zerlegen an. Es muss also ein Aktor her, der eine Zeile in Worte zerlegt. Reguläre Ausdrücke sind ein prima Mittel dafür (Naürlich gibt es noch 12 Millionen anderer Wege). Falls Du eine kleine Auffrischung zum Verständnis des gleich verwendeten Ausdrucks brauchst, hier ein paar Hinweise. Die vollständige Doku der .NET Implementierung liefert Microsoft.
  • "\w" ist eine Meta-Sequenz, die Wort-Bestandteile (Buchstaben und den Unterstrich) in einem angelieferten Text findet
  • "?", "+" und "*" sind Quantifizierer, die kein- oder einmal (?), ein oder mehrmals (+) oder gar nicht bis beliebig oft (*) den vorausgegangenen Ausdruck treffen.
Unser WordSplitter Aktor könnte dann so aussehen (alle heutigen Quelltexte sind in einem github Repository):

using System;
using Akka.Actor;
using System.Text.RegularExpressions;
using WordCount.Messages;

namespace WordCount.Actors
{
    public class WordSplitter : ReceiveActor
    {
        private IActorRef next;

        public WordSplitter(IActorRef next)
        {
            this.next = next;

            Receive<string>(SplitIntoWords);
            Receive<End>(next.Tell);
        }

        private void SplitIntoWords(string s)
        {
            var matches = Regex.Matches(s, @"\w+");
            foreach (Match match in matches)
                next.Tell(match.Value);
        }
    }
}

Mittlerweile sind wir ja nicht mehr überrascht, dass die Logik innerhalb einzelner Aktoren so kurz ist. Allerdings fällt der eigentliche Wort-Zähler etwas umfangreicher aus und verhält sich auch anders als bisherige Aktoren. Grund ist, dass wir erst nach dem letzten Wort genau sagen können, welches Wort wie häufig im Text vorkam. Also müssen wir solange Wörter beim zählenden Aktor ankommen, eine entsprechende interne Struktur aufbauen und geben diese erst nach dem Eintreffen der End Nachricht wieder aus. Aber die Logik ist für Dich sicher auf anhieb verständlich:

using System;
using System.Linq;
using Akka.Actor;
using System.Collections.Generic;
using WordCount.Messages;

namespace WordCount.Actors
{
    public class WordCounter : ReceiveActor
    {
        private IActorRef next;
        private Dictionary<string, int> wordCount;

        public WordCounter (IActorRef next)
        {
            this.next = next;
            wordCount = new Dictionary<string, int>();

            Receive<string>(CountWord);
            Receive<end>(Terminate);
        }

        private void CountWord(string word)
        {
            if (wordCount.ContainsKey(word))
                wordCount[word]++;
            else
                wordCount.Add(word, 1);
        }

        private void Terminate(End end)
        {
            var counts =
                wordCount.Keys
                    .OrderBy(w => w)
                    .Select(w => String.Format("{0}: {1}", w, wordCount[w]));
           
            foreach (var count in counts)
                next.Tell(count);

            next.Tell(end);
        }
    }
}

Damit alles funktionert, müssen wir unsere Verarbeitungskette wieder entsprechend erweitern. Wie immer fangen wir hinten an und arbeiten uns nach vorne durch.

using System;
using Akka.Actor;
using Akka.Routing;
using WordCount.Actors;

namespace WordCount
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Words");

            var console = system.ActorOf(Props.Create<ConsoleWriter>());
            var counter = system.ActorOf(Props.Create<WordCounter>(console));
            var splitter = system.ActorOf(Props.Create<WordWplitter>(counter));
            var caser = system.ActorOf(Props.Create<LowerCaser>(splitter));
            system.ActorOf(Props.Create<FileReader>(args[0], caser));

            system.AwaitTermination();
        }
    }
}

Zum Starten des Programms müssen wir in einem Terminal nach dem Programm-Namen den Namen einer existierenden Datei angeben, aus der wir die Worte lesen möchten. Alternativ kannst Du selbstversändlich auch den Pfad auf eine Datei beim Erzeugen des FileReader Aktors gleich mit angeben.

Damit sind wir auch schon am Ende unseres zweiten Sprints angekommen. Wie letzten Sonntag habe ich einen kleinen Bonus. Es gibt ein großartiges Buch, das gerade am Entstehen ist "Reactive Design Patterns" (in englisch) geschrieben von Roland Kuhn. Jeweils die fertig geschriebenen Kapitel werden verfügbar gemacht.

Ab morgen werden wir uns mit deutlich mehr Aktoren als bisher befassen. Da werden wir parallel laufende Berechnungen und deren Implementierung mit Akka.NET untersuchen.

Samstag, 12. Dezember 2015

Akka.NET Adventskalender – Tag 12

Ströme beeinflussen

Gestern haben wir mit einer Diskussion zu "Pipes and Filters" begonnen. Heute werden wir anfangen, die dazu notwendigen Aktoren zu entwickeln. Wir werden auch hier ein paar Kompromisse eingehen, die in der Praxis eventuell problematisch werden können. So kann ein langsam arbeitender Aktor schnell für einen unangenehmen Rückstau sorgen, der sich durch hohe zu haltende Datenmengen bemerkbar macht. Das kann mit einem besseren aber komplizierteren Protokoll vermieden werden. Aber uns soll es ja mehr um das Prinzip gehen, daher ignorieren wir solche Dinge.

Ein einfacher zeilenweise aus einer Datei lesender Quell-Aktor könnte wie folgt aussehen. (Lass Dich noch nicht vom gewählten Namensraum täuschen – das ist das Ziel, das wir am Ende erreichen werden).

using System;
using System.IO;
using Akka.Actor;
using System.Text;

namespace WordCount
{
    public class FileReader : ReceiveActor
    {
        private class ReadNextLine {}

        private IActorRef next;
        private FileStream fileStream;
        private StreamReader streamReader;

        public FileReader(string filePath, IActorRef next)
        {
            this.next = next;

            fileStream = new FileStream(filePath,
                FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            streamReader = new StreamReader(fileStream, Encoding.UTF8);

            Self.Tell(new ReadNextLine());

            Receive<ReadNextLine>(ReadLine);
        }

        private void ReadLine(ReadNextLine r)
        {
            string line = streamReader.ReadLine();

            if (line != null)
            {
                next.Tell(line);
                Self.Tell(new ReadNextLine());
            }
            else
            {
                next.Tell(new End());
            }
        }
    }
}

So wirklich kompliziert ist das ja nicht. In der Tat ist das ein wesentliches Merkmal von Streams, dass die einzelnen Bausteine sehr einfach sind. Die Leistung erhält man durch Kombination.

Demnach verwundert es kaum, dass ein Ziel-Aktor, der den erhaltenen Stream auf der Konsole ausgibt, ebenso einfach ist.

using System;
using Akka.Actor;

namespace WordCount
{
    public class ConsoleWriter : ReceiveActor
    {
        public ConsoleWriter()
        {
            Receive<string>(Console.WriteLine);
            Receive<End>(_ => Context.System.Shutdown());
        }
    }
}

Über das Herunterfahren des gesamten Aktorsystems durch einen einzelnen Aktor kann man wiederum geteilter Meinung sein. Produktiv haben solche Konstrukte definitiv nichts verloren.

Zusammengebracht wird die einfachste Kette zum Beispiel so:

using System;
using Akka.Actor;

namespace WordCount
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Words");

            var console = system.ActorOf(Props.Create<ConsoleWriter>());
            system.ActorOf(Props.Create<FileReader>(args[0], console));

            system.AwaitTermination();
        }
    }
}

Du musst beim Aufbau allerdings berücksichtigen, dass solch eine Kette immer vom Ende her konstruiert wird, weil wir ja im Konstruktor des Vorgängers die Referenz auf den nachfolgenden Aktor benötigen. Alternativ könnte man die Aktoren auch in willkürlicher Reihenfolge erzeugen und mittels einer Connect Nachricht die einzelnen Bestandteile der Kette zusammenfügen.

Wenn Du einen Aktor einfügen möchtest, ändert sich die Konstruktion der Kette wie folgt:

var console = system.ActorOf(Props.Create<ConsoleWriter>());
var lowerCaser = system.ActorOf(Props.Create<LowerCaser>(console));
system.ActorOf(Props.Create<FileReader>(args[0], lowerCaser));

Und der Aktor, der (war schwer zu raten, oder?) aus einem Stream an Zeilen einen Stream an aus Kleinbuchstaben bestehenden Zeilen erzeugt, könnte so aussehen und ist wie fast alle Ketten-Bestandteile sehr einfach...

using System;
using Akka.Actor;
using WordCount.Messages;

namespace WordCount
{
    public class LowerCaser : ReceiveActor
    {
        public LowerCaser(IActorRef next)
        {
            Receive<string>(s => next.Tell(s.ToLower()));
            Receive<End>(next.Tell);
        }
    }
}

Und morgen werfen wir einen Blick auf das Herzstück der Anwendung und den Grund, warum der Namensraum der obigen Klassen "WordCount" heißt :-)

Freitag, 11. Dezember 2015

Akka.NET Adventskalender – Tag 11

Strömende Quelle

Diese Woche befassen wir uns ja durchgehend mit einzeln auftretenden Aktoren. Wenn Du gerne mit Terminals (im deutschen Windows Eingabeaufforderung genannt) arbeitest, dann kennst Du bestimmt die Möglichkeiten, die eine Pipe (das | Symbol) besitzt. Oder wenn Du C# programmierst, dann schätzt Du vermutlich die vielfältigen Möglichkeiten, die sich mit Linq bieten oder Du hast bereits Erfahrungen mit Reactive Extensions gemacht. Man schaltet einfach gesprochen mehrere für sich betrachtet einfache Operationen zu einer komplexeren großen Operation hintereinander.

Dazu entwickeln wir heute ein sehr einfaches Stream-Protokoll. Es wird viele Kompromisse und Einschränkungen haben, aber zum Kennenlernen der Möglichkeiten reicht das vollkommen aus.

Unsere Stream Implementierung besitzt dabei ein primitives Protokoll, das aus zwei verschiedenen Nachrichten-Typen besteht:
  • String – eine Text Nachricht, die zu einem Aktor gesendet wird. Man erwartet vom Ziel-Aktor, dass er den Text entgegennimmt und entsprechend seiner Bestimmung verarbeitet.
  • End – ein End Objekt deutet das Ende des Streams an. Danach dürfen keine weiteren String-Nachrichten mehr folgen.
Grundsätzlich sind drei verschiedene Arten von Aktoren denkbar:
  • Quell Aktoren, die keinen Eingang besitzen, aber an deren Ausgang String- und End-Nachrichten bereit stellen. Anwendungen sind Laden oder Eingabe
  • Durchlass Aktoren, die Ein- und Ausgang besitzen. Dieser Typ Aktor kann wahlweise puffern bis der Stream zu Ende ist bzw. erhaltene Nachrichten sofort oder blockweise weiter reichen.
  • Ziel Aktoren, die nur einen Eingang haben, Nachrichten entgegen nehmen und final verarbeiten. Anwendungen solcher Aktoren sind Speicherung oder Ausgabe.
Für unsere Lernzwecke verarbeiten wir keine Streams parallel, fügen keine Streams zusammen oder teilen sie auf. Und wir behandeln keine Fehler, die über das Neustart-Verhalten von Akka.NET hinausgehen. Da solch ein Verhalten in der Praxis mit Datenverlusten einhergeht, ist es nicht ratsam, das im produktiven Umfeld so ebenfalls zu tun.

Ein einfacher Durchlass-Aktor könnte so aussehen:

class SourceActor : ReceiveActor
{
    SourceActor(IActorRef next)
    {
        // simple passthru without change 
        Receive<string>(s => next.Tell(s));
        Receive<End>(end => next.Tell(end));
    }
}

Ein Quell Aktor wird typischerweise keine Receive<> Anweisungen für unser Protokoll besitzen, sondern irgendwoher Daten erhalten, die er dann an den nächsten Aktor der Kette weiterleitet.

Und ein Ziel-Aktor wird keinen nächsten Aktor als Konstruktor-Argument besitzen aber die Daten gemäß dem Protokoll verarbeiten.

Wie das funktioniert und was man damit anfangen kann, sehen wir morgen.

Donnerstag, 10. Dezember 2015

Akka.NET Adventskalender – Tür 10

Wenn etwas schief gehen kann wird es schief gehen

Wir haben gestern den theoretischen Fall eines fehlschlagenden (Exception werfenden) Aktors angeschnitten. Was macht man in solchen Situationen? Was kann passieren und wie gehen wir mit solchen Fehlersituationen um?

Un die Situationen, die auftreten können zu analysieren, sehen wir uns die möglichen "Bruchstellen" in unserem Spiel an.
  • Der Game Aktor könnte eine Exception werfen. Das ist nicht unser Problem, weil der Aktor, der das Spiel gestartet hat, dieses Fehlverhalten behandeln muss. Das Standardverhalten ist, den Aktor neu zu starten, wir bekommen also ein neues Spiel, das dann (hoffentlich) glatt läuft.
  • Der Chooser könnte sterben. Würden wir den einfach neu starten lassen, würde er sich eine neue Zufallszahl ausdenken und die Antworten, die der Enquirer erhält wären möglicherweise inkonsistent. In diesem Fall macht es also Sinn, den Chooser und den Enquirer (also alle "Kinder" des Game-Actors) neu zu starten.
  • Der Enquirer fällt aus. Zwar wären seine bisherigen Versuche damit verloren, aber er kann ja nochmal neu anfangen, wenn wir ihn neu starten. Der neue Aktor wird dann die Zahl schon erraten.
Das hört sich nach viel Arbeit an. Die gute Nachricht ist, es hört sich zwar kompliziert an, ist aber ganz einfach. Aber erst einmal müssen wir unsere Aktoren mit Sollbruchstellen ausstatten, damit wir das Fehlverhalten für einen nachfolgenden lauf simulieren können.

Dem Enquirer spendieren wir einen crashCounter, der exakt beim 3. Rateversuch zu plötzlichem Ableben führt, danach aber nie wieder zuschlägt.

public class Enquirer : ReceiveActor
{
 ...
    private static int crashCounter = 3;
 
    ...
    
    private void MakeATry()
    {
        if (--crashCounter == 0)
            throw new InvalidOperationException();
  ...
 }
...

Nachdem wir unser Spiel gestartet haben, wird dieser Aktor wie geplant sterben. Aber danach passiert nichts mehr. Zwar haben wir gehört, dass ein Neustart die Standard Option ist, aber der Aktor erhielt keine Start Nachricht nach dem Neustart. Demnach kam er gar nicht auf die Idee, mit dem Raten neu zu beginnen. Das könnte man mit der PostRestart Methode lösen. (Genau genommen ist das in unserem Fall sogar falsch, nämlich dann wenn beide Aktoren neu gestartet werden, denn dann fallen wir möglicherweise in das anfangs diskutierte Problem. Aber wir wollen uns für diese einfache konstruierte Aufgabe das Leben nicht unnötig kompliziert machen)

protected override void PostRestart(Exception reason)
{
    Console.WriteLine("Enquirer: PostRestart");
    
    // we crashed during processing. So it is wise to Start again
    Self.Tell(new Start());
}

Mit dieser kleinen Anpassung wird die erste Anforderung erfüllt. Aber wie erfüllen wir die zweite Anforderung beide Kinder neu zu starten? Das ist eine Aufgabe, die mit einer SupervisorStragegy gelöst werden kann. Der Standard dafür ist die OneForOneStrategy (Neustart des einen verstorbenen Aktors), wir wählen für die von uns geforderte Anforderung die AllForOneStrategy, mit der wir beim Ableben eines Kindes alle Kind Aktoren neu starten.

Wir passen also die Erzeugung des Chooser Aktors wie folgt an:

chooser = Context.ActorOf(
    Props.Create<Chooser>()
         .WithSupervisorStrategy(
             new AllForOneStrategy(ex => Directive.Restart)),
    "Chooser");

Verwendet man in unserem Fall die kombinierte Logik, kann man je nach Bedarf individuell auf Fehler reagieren. Zugegeben, diese Situationen sind konstruiert und nicht ganz korrekt implementiert, aber ich hoffe, ich konnte das Prinzip dahinter verdeutlichen. Die Änderungen sind wieder im github Repository zu finden. Die dort befindlichen Klassen sind mit reichlich Ausgaben ausgestattet, so dass nachvollziehbar ist, wer wann was macht.

Morgen werden wir uns weitere Anwendungsmöglichkeiten für einzelne Aktoren ansehen.

Mittwoch, 9. Dezember 2015

Akka.NET Adventskalender – Tür 9

Manche Dinge will man gar nicht wissen

Gestern haben wir damit begonnen, ein Zahlenrate-Spiel zu entwickeln. Allerdings mussten wir dabei wissen, wie wir die beiden Aktoren zusammenschalten müssen, damit das Spiel funktioniert. Das ist sehr unflexibel. Heute ist die Gelegenheit diese Unschönheit auszubügeln. Wünschenswert wäre, wenn wir nur einen Aktor erzeugen müssten, der das gesamte Spiel steuert.

Das klingt eigentlich ganz einfach. Wir fügen einen Game Aktor hinzu, der die Aufgaben erfüllt, die wir vorher in der Kommandozeilen Anwendung hatten. Außerdem lassen wir die Aktoren nicht sofort loslaufen, sondern senden ihnen Nachrichten, mit denen deren Aktivität implizit gestartet wird.

Die nachfolgenden Zeilen sehen zunächst richtig aus, sind aber falsch. Schauen wir uns den Code einmal an von dem ich spreche und diskutieren wir das verborgene Problem. Also Niemals, wirklich niemals so etwas hier schreiben bitte:

// innerhalb eines Aktors -- FALSCH!
var actor1 = Context.ActorOf(...);
var actor2 = Context.ActorOf(...); // spricht mit actor1

actor1.Tell(new Start());
actor2.Tell(new Start()); // kommt evtl. vorher

Wo ist das Problem eigentlich? Der Code wird zwar sequentiell ausgeführt, aber die Nachrichten, die wir den Aktoren actor1 und actor2 in deren Mailbox gestellt haben, erheben keinen Anspruch darauf, Aktor übergreifend in richtiger Reihenfolge ausgeführt zu werden. Lediglich innerhalb eines Aktors ist die Reihenfolge garantiert. Schlimmer wird das noch, wenn Aktoren verteilt sind. Dann treten teilweise messbare Latenzen auf, die solch ein Verhalten noch schlimmer machen können.

Angewendet auf unser Zahlenratespiel könnte das bedeuten, dass der Chooser Aktor (der die zufällige Zahl auswählt) noch gar keine Wahl getroffen hat, aber bereits eine erste Anfrage erhält. Wenn er erst einmal seine Zahl gewählt hat, werden die Antworten dann vielleicht anders ausfallen, als das vorher der Fall war. Das Spiel wird möglicherweise nicht terminieren.

Um solche Überraschungen zu vermeiden, erweitern wir unser Protokoll. Eine typische Unterhaltung zwischen unseren Aktoren wird dann in dieser Reihenfolge stattfinden:

// zu Beginn
Game -> Chooser: Start
(Chooser wählt eine Zufallszahl)
Chooser -> Game: Started
Game -> Enquirer: Start

// wiederholt, solange Zahl nichterraten ist
Enquirer -> Chooser: TestTry(x)
Chooser -> Enquirer: TooBig(x) | TooSmall(x) | Guessed(x)

// wenn erraten
Chooser -> Game: Guessed(x)

Leider fallen die Änderungen, die wir heute vornehmen müssen, etwas zu umfangreich für einen blog aus. Daher ist der aktuelle Stand in einem github Repository zu finden.

Im Gegensatz zur gestrigen Version hat die heutige Veränderung den Vorteil, dass wir nicht mehr wissen müssen, wie wir das gesamte Spiel in Gang bekommen. Es startet sich einfach so:

var system = ActorSystem.Create("Numb3rs");
var game = system.ActorOf(Props.Create<GuessGame>(), "Game");
game.Tell(new Start());

Was folgern wir daraus?

Jede zusätzliche Funktionalität könnte in einem neuen Aktor erfolgen. Allerdings verkompliziert sich das Protokoll mit jedem weiteren Aktor. Das können wir wiederum kompensieren, indem wir erneut einen Aktor "vorschalten" der die zusätzliche Komplexität kapselt.

Ist unsere Anwendung damit komplett?

Naja, das kommt darauf an... Führt man diese kleine Anwendung auf einem einzelnen Computer aus, kann man sicher mit einer hundert prozentigen Quote an erfolgreich zugestellten Nachrichten rechnen. Aber stell Dir einmal etwas komplexere Protokolle vor, die über mehrere Systeme verteilt laufen. Was würde passieren, wenn z.B. der Chooser in eine Exception läuft?

Auch wenn das Beispiel zu klein dafür ist, werden wir uns die Details dafür morgen zu Gemüte führen.

Dienstag, 8. Dezember 2015

Akka.NET Adventskalender – Tür 8

Ich weiß was, das Du nicht weißt

Willkommen zurück! Wie gestern versprochen werden wir heute zusammen ein kleines Spiel programmieren. Sicher hast Du als Kind mit Deinen Spielkameraden auch "Zahlen Raten" gespielt. Einer sucht sich eine Zahl aus und der Andere versucht sie durch geschicktes Fragen einzukreisen bis die Zahl erraten ist. Hierzu wird eine Vermutung geäußert und als Antwort erfährt man ob die Vermutung zu klein, zu groß oder richtig war. Heute sind wir erfahrene Entwickler und wissen, was binäre Suche ist. Insofern haben wir den Lösungsweg schon im Kopf.

Wie werden die Aktoren miteinander reden?

Der einfachste Weg, dieses Spiel zu programmieren, ist für die jeweiligen Verantwortlichkeiten einen Aktor einzusetzen.

Der Erste – nennen wir ihn Chooser sucht sich eine zufällige Zahl im Bereich von 1 bis 100 aus. Die einzige Nachricht, auf die er antworten muss, ist die Bitte, einen Versuch zu prüfen. Nennen wir die Klasse der Nachricht einfach TestTry. Auf diese Anfrage erhalten wir eine von drei Antworten: TryTooBig, TryTooSmall oder Guessed.

Der zweite Mitspieler, nennen wir ihn Enquirer, kennt den Chooser und bittet ihn wiederholt, eine Zahl zu prüfen. Aufgrund der Antwort wird der verbleibende Zahlenbereich eingeschränkt und erneut gefragt. Das ganze wiederholt sich solange bis Guessed als Antwort eintrifft. Damit ist das Spiel vorüber.

OK, fangen wir an. Heute werden wir eine Datei pro Nachrichten-Klasse anlegen und alle Nachrichten in ein Verzeichnis (und damit einen eigenen Namensraum) legen. Selbstverständlich steht es Dir frei, es anders zu machen.

Die Nachrichten Klassen könnten so wie folgt ausehen. An dieser Stelle habe ich trotz der Wiederholungen aufgrund der einfacheren Lesbarkeit auf eine gemeinsame Basisklasse verzichtet. Auch hier darf Deine Implementierung gerne anders ausfallen.

namespace GuessMyNumber.Messages
{
    public class TestTry
    {
        public int Number { get; private set; }

        public TestTry(int number)
        {
            Number = number;
        }
    }
}

namespace GuessMyNumber.Messages
{
    public class TryTooBig
    {
        public int Number { get; private set; }

        public TryTooBig(int number)
        {
            Number = number;
        }
    }
}

namespace GuessMyNumber.Messages
{
    public class TryTooSmall
    {
        public int Number { get; private set; }

        public TryTooSmall(int number)
        {
            Number = number;
        }
    }
}

namespace GuessMyNumber.Messages
{
    public class Guessed
    {
        public int Number { get; private set; }

        public Guessed(int number)
        {
            Number = number;
        }
    }
}

Das war nicht schwer. Einfache Klassen mit jeweils nur einer Eigenschaft. Gerne dürfen die natürlich auch in einer Datei definiert werden – ganz nach Deinem Geschmack oder Gewohnheit.

Der Chooser könnte so wie nachfolgend aussehen. Wir werden ihn morgen nochmals anpassen um ihn flexibler zu machen, aber für heute sollte das ein brauchbarer Anfang sein:

using System;
using Akka.Actor;
using GuessMyNumber.Messages;

namespace GuessMyNumber.Actors
{
    public class Chooser : ReceiveActor
    {
        private int mySecretNumber;

        public Chooser()
        {
            var generator = new Random();
            mySecretNumber = generator.Next(1, 101);

            Console.WriteLine("Pssst: my secret is: {0}", mySecretNumber);

            Receive<TestTry>(t => HandleTestTry(t));
        }

        private void HandleTestTry(TestTry testTry)
        {
            var triedNumber = testTry.Number;

            Console.WriteLine("Received Guess: {0}", triedNumber);

            if (triedNumber < mySecretNumber)
            {
                Sender.Tell(new TryTooSmall(triedNumber));
            }
            else if (triedNumber > mySecretNumber)
            {
                Sender.Tell(new TryTooBig(triedNumber));
            }
            else
            {
                Sender.Tell(new Guessed(triedNumber));
            }
        }
    }
}

Und schließlich folgt der Enquirer, der versucht die Zahl zu erraten. Er muss wissen, wen er fragt, daher statten wir den Konstruktur mit einem IActorRef Argument aus. Einzige Einschränkung dabei ist, dass wir so keine zirkulären Referenzen realisieren können und die Reihenfolge des Aufbaus der Aktoren entscheidend ist. In unserem Fall ist das problemlos. Diese Klasse benötigt deutlich mehr Logik, aber mit dem Hintergrundwissen der binären Suche ist die Logik sicher einfach nachvollziehbar. Selbstverständlich hätten wir uns den letzten Versuch auch selbst merken können, anstelle sowohl bei der Frage als auch der Antwort jeweils die Zahl mitzuführen. Wie immer führen mehrere Wege zum Ziel.

using System;
using Akka.Actor;
using GuessMyNumber.Messages;

namespace GuessMyNumber.Actors
{
    public class Enquirer : ReceiveActor
    {
        private readonly IActorRef chooser;

        // possible range of numbers including boundaries
        private int rangeFrom;
        private int rangeTo;

        public Enquirer(IActorRef chooser)
        {
            this.chooser = chooser;

            rangeFrom = 1;
            rangeTo = 100;

            Receive<TryTooSmall>(t =>HandleTooSmallTry(t));
            Receive<TryTooBig>(t => HandleTooBigTry(t));
            Receive<Guessed>(g =>HandleGuessed(g));

            MakeATry();
        }

        private void MakeATry()
        {
            var triedNumber = rangeFrom + (rangeTo - rangeFrom) / 2;

            Console.WriteLine("Range: {0} - {1}, trying: {2}",
                rangeFrom, rangeTo, triedNumber);

            chooser.Tell(new TestTry(triedNumber));
        }

        private void HandleTooSmallTry(TryTooSmall guessTooSmall)
        {
            rangeFrom = guessTooSmall.Number + 1;
            MakeATry();
        }

        private void HandleTooBigTry(TryTooBig guessTooBig)
        {
            rangeTo = guessTooBig.Number - 1;
            MakeATry();
        }

        private void HandleGuessed(Guessed guessed)
        {
            Console.WriteLine("Guessed: {0}", guessed.Number);

            Context.System.Shutdown();
        }
    }
}

Und wie immer brauchen wir eine Anwendung, innerhalb derer wir das ActorSystem starten und das Ganze ans Laufen bringen:

using System;
using Akka.Actor;
using GuessMyNumber.Actors;

namespace GuessMyNumber
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            Console.WriteLine("Number Guess Starting");

            var system = ActorSystem.Create("Numb3rs");
            var chooser = system.ActorOf(
                Props.Create<Chooser>(),
                "Chooser"
            );
            var enquirer = system.ActorOf(
                Props.Create<Enquirer>(chooser),
                "Enquirer"
            );

            system.AwaitTermination();
        }
    }
}

Wie? Gar kein Shutdown() Aufruf? Das machen wir diesmal im Enquirer, wenn wir die Guessed Nachricht erhalten. Du errätst es schon, das ist sicher kein guter Stil, denn solche Aktoren sind bestimmt nicht wiederverwendbar. Also: nicht nachmachen außer heute...

Der Programmablauf könnte so aussehen:

Number Guess Starting
Pssst: my secret is: 70
Range: 1 - 100, trying: 50
Received Guess: 50
Range: 51 - 100, trying: 75
Received Guess: 75
Range: 51 - 74, trying: 62
Received Guess: 62
Range: 63 - 74, trying: 68
Received Guess: 68
Range: 69 - 74, trying: 71
Received Guess: 71
Range: 69 - 70, trying: 69
Received Guess: 69
Range: 70 - 70, trying: 70
Received Guess: 70
Guessed: 70

Damit haben wir heute gesehen, wie einfach sich Aktoren miteinander unterhalten können. Das einzig unschöne war, dass wir wissen mussten, wie wir die diversen Aktoren miteinander "verdrahten" müssen, damit das Spiel wie geplant läuft.

Das werden wir morgen verbessern.

Montag, 7. Dezember 2015

Akka.NET Adventskalender – Tür 7

Aktor klein ging allein...

Willkommen zu unserem zweiten Sprint. Diese Woche werden wir uns auf einzeln auftretende Aktoren konzentrieren und uns gemeinsam ansehen, was wir alles mit ihnen anstellen können.

Wenn Du vorher noch nicht viel mit Aktoren gearbeitet hast wirst Du Dir sicher ein paar Fragen stellen:
  • Wie umfangreich soll so ein Aktor werden?
  • Wie finden die Aktoren sich gegenseitig, wenn sie sich unterhalten wollen?
  • Gibt es eine maximale Anzahl von Aktoren, die Du anlegen darfst?

Wie umfangreich soll so ein Aktor werden?

Wenn Du diese Artikel liest, vermute ich, Du kommst mit einem objektorientierten Hintergrund. Dann kennst Du bestimmt das Single-Responsibility Prinzip, das Robert C. Martin zuerst unter dieser Bezeichnung in seinem Buch "Clean Code" veröffentlicht hat. Gemäß diesem Prinzip bekommt jede Klasse exakt eine Verantwortlichkeit (Original-Formulierung: "one reason to change"). Für schlechtes Design würde man Klassen halten, die zu viel Aufgaben erledigen wie z.B. ConfigurationAdminNotificationRestarterNagiosReporter.  Solch eine Klasse gibt es sicher in keinem Deiner Projekte.

Das gleiche Prinzip gilt für Aktoren. Ein Aktor erfüllt eine Aufgabe und er erfüllt sie vollständig und gut. Versuche also Deine Aktoren klein zu halten, wenn es irgend möglich ist. Wenn ein Aktor mehr erledigen muss, überlege Dir ob das nicht eine Aufgabe für einen weiteren Aktor werden könnte. Dieses Vorgehen hält Aktoren testbar, verständlich und wartbar.

Das führt uns zur zweiten Frage:

Wie finden sich die Aktoren?

Hier hast Du mehrere Möglichkeiten:
  • Wenn Du einen übergeordneten Aktor hast (im OO Umfeld kennst Du dieses Muster unter dem Namen "Mediator"), dann erzeugt dieser seine Mitstreiter und verbindet sie miteinander, indem die diversen Referenzen der einzelnen Aktoren den jeweiligen Aktoren mitgeteilt werden. So "kennt" jeder die notwendigen Aktoren.
  • Alternativ können die Kind-Aktoren auch an den übergeordneten berichten, der dann wiederum entsprechend reagiert.
  • Akka.NET kennt ein Konstrukt namens ActorSelection. Du weißt ja, dass Aktoren in einer Baumstruktur angeordnet sind. Eine Selektion ist ein Pfad (wahlweise absolut von der Wurzel aus oder relativ zur aktuellen Stelle) mit wahlweise Joker-Zeichen. Solch eine Selektion könnte wie "../worker*" oder "/user/dashboard/*" aussehen. Ersteres Beispiel würde alle mit "worker" beginnenden Eltern-Aktoren betreffen, das zweite Beispiel alle unter "user/dashoard" sitzenden Aktoren. Benutzt wird solch eine Selektion fast wie eine AktorRef, zumindest was die Möglichkeiten angeht, einen Tell() Aufruf auszulösen.
  • Es gibt einen globalen EventStream, der es ermöglicht, Ereignisse zu publizieren und zu abonnieren. Auch dazu werden wir in ein paar Tagen ein Beispiel sehen.

Etwas mehr Details zur Addressierung von Aktoren finden sich in der Akka.NET Dokumentation.

Zum demonstrieren erzeugen wir einen einfachen Aktor, der seinen Namen und den erhaltenen Text ausgibt:

using System;
using Akka.Actor;

namespace ActorSelection
{
    public class Writer : ReceiveActor
    {
        public Writer()
        {
            Receive<string>(s =>
                Console.WriteLine("{0} received {1}", Self.Path.Name, s));
        }
    }
}

Und die passende Applikation dazu:

using System;
using Akka.Actor;
using System.Threading;

namespace ActorSelection
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Selection");

            var writer1 = system.ActorOf(Props.Create<Writer>(), "writer1");
            var writer2 = system.ActorOf(Props.Create<Writer>(), "writer2");
            var writer3 = system.ActorOf(Props.Create<Writer>(), "writer3");

            var xxx1 = system.ActorOf(Props.Create<Writer>(), "xxx1");
            var xxx2 = system.ActorOf(Props.Create<Writer>(), "xxx2");

            var printer = system.ActorOf(Props.Create<Writer>(), "printer");


            // use ActorRef
            writer2.Tell("hello");
            xxx1.Tell("Ola");

            // use ActorSelection
            system.ActorSelection("/user/writer*").Tell("hi");

            Thread.Sleep(500);
            Console.WriteLine("press [enter] to continue");
            Console.ReadLine();

            system.Shutdown();
            system.AwaitTermination();
        }
    }
}

Die Ausgabe könnte so aussehen. Die Reihenfolge kann bei jedem Aufruf anders sein.

writer1 received hi
writer2 received hello
writer2 received hi
writer3 received hi
xxx1 received Ola

Bleibt noch die dritte Frage ungeklärt.

Wieviele Aktoren darf ich erzeugen?

Die Antwort darauf ist einfach: Bis Dein Speicher voll ist. Aktoren selbst belegen reativ wenig Speicher. Die genaue Zahl kann ich aktuell leider nicht nennen, aber es kursieren immer wieder Werte von 300-400 Byte pro Aktor. Zu einem Thread werden Aktoren erst, sobald eine Nachricht empfangen wird und dieser Vorgang erfolgt kontrolliert durch das Laufzeitsystem. Also: es darf durchaus einer mehr sein, das spielt überhaupt keine Rolle, selbst tausende von Aktoren sind kein Drama.

Das war's für heute. Morgen werden wir ein kleines Spiel gemeinsam programmieren.

Sonntag, 6. Dezember 2015

Akka.NET Adventskalender – Tür 6

Arten von Nachrichten

Glückwunsch! Du hast bis zum Ende unseres ersten (leider recht theoretischen) Sprints geschafft. Nächste Woche wird es deutlich praktischer zugehen – versprochen! Aber heute werden wir noch ein paar Muster rund um Nachrichten besprechen.

Bislang haben wir hauptsächlich String-Objekte an unsere Aktoren gesandt. Es mag Situationen geben, in denen das vollkommen ausreicht, aber die Versuchung ist sicher groß, Objekte als Nachrichten zu versenden. Doch – wie sollen wir solche Objekte benennen? Vaughn Vernon nennt in seinem Buch "Reactive Messaging Patterns with the Actor Model" drei verschiedene Muster, die auch erheblichen Einfluss auf die Benennung der Klassen haben. Ich werde nachfolgend englische Klassen-Namen verwenden, wenn eure Ubiquitäre Sprache (Ubiquitous language) deutsch ist, sollten selbstverständlich deutsche Namen hier Verwendung finden.

Kommandos (Command)

Wenn Du vor hast, einem Aktor ein Kommando zu erteilen, dann ist ein imperativ die grammatikalische Form, die Du in zwischenmenschlicher Kommunikation verwenden würdest. Das würde ich durch ein Verb in Gegenwart gefolgt von einem Substantiv ausdrücken. Im englischen würden dadurch Namen entstehen wie "PrintLine", "SendMail", "Speak" oder "SplitWhatever".

Das führt dann zu sehr ausdrucksstarken Code Zeilen wie

printer.Tell(new PrintLine( ... ));
notifier.Tell(new SendMail( ... ));
speaker.Tell(new Speak( ... ));
splitter.Tell(new SplitWhatever( ... ));

Die Intention hinter solchen Zeilen sind ohne Zweifel verständlich.

Ereignisse (Event)

Wenn ein Aktor seinen internen Zustand ändert und diese Tatsache anderen mitteilen möchte, dann informiert er über etwas das soeben passierte und nicht mehr zu ändern ist. Es fand ja bereits statt. Insofern ist ein Verb in der ersten Vergangenheit mit einem vorangestellten Substantiv eine gute Wahl. Auch hier wird jeder verstehen, was eben los war. Das führt dann zu Klassen-Namen wie "LinePrinted", "MailSent", "StatusUpdated" or "Spoke".

Auch hier sind die nachfolgenden Code-Zeilen für jeden nachvollziehbar.

someone.Tell(new StatusUpdated( ... ));
someone.Tell(new MailSent( ... ));

Dokumente (Document)

Das dritte Muster für die Namensgebung, die Vaughn Vernon nennt, sind Dokument Nachrichten. Sie bestehen lediglich aus einem Substantiv und deuten an, was vom Inhalt der Nachricht zu erwarten ist. In aller Regel sind solche Nachrichten die Antwort auf eine Anfrage bei einem Aktor.

Um die Probe auf's Exempel zu machen: sind die nachfolgenden Zeilen verständlich?

Receive<GetStatus>(_ => Sender.Tell(new Status( ... )));
Receive<Read>(_ => Sender.Tell(new SensorValue( ... )));

Wie organisiert man Nachrichten?

Gute Frage. Vermutlich wird jeder hier eigene Vorlieben haben. Im Akka.NET Universum wird man vorwiegend diese Vorgehen finden:
  • Nachrichten-Klassen sind verschachtelte Klassen, die innerhalb der Aktoren definiert werden, die diese Nachrichten senden oder empfangen. Wenn Du das machst, wirst Du sehr lesbare Klassen-Namen erhalten, denn die Namen bestehen aus dem Klassen-Namen des Aktors und durch einen Punkt getrennt, der Nachrichten-Klasse. Solche Nachrichten wird man jedoch kaum bei anderen Aktoren verwenden wollen und die Aktor-Klassen werden an Länge und damit Unübersichtlichkeit zunehmen.
  • Wie bisher auch: Jede Klasse kommt in eine Datei und wenn es zu viele werden packst Du die Nachrichten Klassen in einen eigenen Ordner, den Du wahlweise als namespace nutzt.
  • Eine Misch-Form aus beiden. Es liegt bei Dir.

In unserem heutigen Beispiel werden wir die erste Methode verwenden also Nachrichten innerhalb unseres Aktors definieren.

Unser erster Aktor soll zwei Typen von Nachrichten beantworten können: Die Anfrage der aktuellen Zeit und die Bitte, auf eine Zahl 4 zu addieren. Die Antwort wird dem Absender der Anfrage übergeben.

Erzeuge eine Konsolen Anwendung mit einem Actor System wie bisher auch und erstelle diesen Aktor:

using System;
using Akka.Actor;

namespace RequestReply
{
    public class Replyer : ReceiveActor
    {
        #region command messages
        public class ReadTime {}

        public class Add4
        {
            public int Number { get; private set; }

            public Add4 (int number)
            {
                Number = number;
            }
        }
        #endregion

        public Replyer()
        {
            Receive<ReadTime>(_ =>
                Sender.Tell(String.Format("{0:HH:mm:ss}", DateTime.Now))
            );

            Receive<Add4>(add4 =>
                Sender.Tell(add4.Number + 4)
            );
        }
    }
}

Dank der eingebetteten Klassen sind die Receive<>() Aufrufe direkt verständlich und einfach. Auf der Sende-Seite hingegen müssen längere Namen getippt werden (x.Tell(new Replyer.ReadTime())). Aber verständlich ist der Code allemal. Unsere Main Methode des Konsolen-Programms könnte dann so aussehen:

public static void Main(string[] args)
{
    var system = ActorSystem.Create("Reply");
    
    var replyer = system.ActorOf(Props.Create());
    
    // Ask() will reply with a task
    Task time = replyer.Ask(new Replyer.ReadTime());
    time.Wait();
    
    Console.WriteLine("Time is: {0}", time.Result);
    
    Task number = replyer.Ask(new Replyer.Add4(17));
    number.Wait();
    
    Console.WriteLine("Number is: {0}", number.Result);
    
    Console.WriteLine("Press [enter] to continue");
    Console.ReadLine();
    
    system.Shutdown();
    system.AwaitTermination();
}

Moment mal. Bisher war immer die Rede davon, einem Actor mittels Tell() eine Nachricht zu übermitteln, nun verwenden wir plötzlich Ask(). Und noch dazu verwenden wir ominöse Wait() Aufrufe, dabei hieß es doch, dass wir bei Akka.NET niemals mit Tasks arbeiten dürfen. Stimmt dann, wenn es um das Innenleben von Aktoren geht. Da solltest Du auf die Benutzung von Tasks auf jeden Fall verzichten. Auch die Benutzung von Ask() innerhalb Aktoren spricht für schlechtes Design und sollte eher vermieden werden. Das Motto lautet "Tell() – don't Ask()".
Aber immer dann, wenn von Akka-fremdem Code (wie unsere Kommandozeilen-Applikation, aber auch eine Web API wäre ein Beispiel) Aufrufe von Akka.NET Aktoren getätigt werden sollen, die eine Antwort geben. In der Akka.NET Dokumentation zu diesem Thema ist beim Rückgabewert von einer "Future" die Rede, das passendste .NET Mittel dafür ist ein Task. Solltest Du zufällig mit async Methoden arbeiten, wirst Du diese Entscheidung der Akka.NET Entwickler lieben!

So, die Grundlagen hättest Du erfolgreich überstanden. Leider sind wir an vielen Stellen nicht zu tief eingestiegen, ich hoffe dennoch, dass ich einen angenehmen Überblick über die grundlegendsten Dinge rund um Akka.NET vermitteln konnte. Morgen starten wir mit einem neuen Sprint, in dem wir uns mit einzelnen Aktoren und deren Zusammenspiel befassen werden. Ich hoffe, Du bist wieder dabei.

Samstag, 5. Dezember 2015

Akka.NET Adventskalender – Tür 5

Fortpflanzung auf Aktor-isch

Gestern haben wir uns angesehen, wie wir Nachrichten an einen Aktor senden und dort behandeln können. Ich hatte auch schon erwähnt, dass ein Aktor Kinder erzeugen kann, damit er Arbeit an diese abgeben kann. Wenn wir uns für diesen Weg entscheiden, müssen wir uns der Verantwortung bewusst werden, die wir damit eingehen. Unser Aktor ist verantwortlich für seine Kinder und muss sie kontrollieren, beaufsichtigen und eventuell beenden.

Insgesamt entsteht durch die Aktoren eine Hierarchie, die am besten mit einem Dateisystem vergleichbar ist. Direkt aus dem ActorSystem erzeugte Aktoren liegen im "/user" Pfad des Baumes, Kind-Aktoren wie bei verschachtelten Dateisystemen entsprechend an die Eltern-Aktoren angehängt.

Von einem Aktor aus angelegte Kinder werden mittels des sogenannten Context angelegt. Dem Context werden wir noch ein paar mal begegnen. Mich persönlich hat er anfangs eher verwirrt, dabei ist die Trennung zwischen Aktor und Context eigentlich ganz logisch. Der Aktor ist ein Objekt, welches das Verhalten und den Zustand repräsentiert. Der Context wird von der Laufzeitumgebung bereit gestellt und enthält infrastrukturelle Dinge wie die Verbindung zu Eltern, Kindern, dem Aktorsystem etc.

Neue Aktoren werden also so erzeugt:

// system == unser Aktor System
// Aktor unter /user anlegen
system.ActorOf( ... );

// wenn wir uns innerhalb eines Aktors befinden
// legen wir so ein Kind an
Context.ActorOf( ... );

Der Aktor Lebenszyklus

Wenn wir eine Hierarchie von Aktoren aufbauen, dann geschieht das im Sinne einer sinnvollen Arbeitsteilung. Jede Ebene operiert auf unterschiedlichem Niveau und delegiert und kontrolliert damit eventuell darunter liegende Aktoren. Wie im täglichen Leben haben höher angesiedelte Aktoren weniger Risiko aber mehr Verantwortung, weiter unten angesiedelte Aktoren ein deutlich höheres Risiko aber kaum Verantwortung. Wenn ein Aktor stirbt, wird er durch ein baugleiches Modell ersetzt. Klingt grausam, ist aber Realität in der Welt der Aktoren.

Damit wir in den diversen Lebenszyklen (starten, neu starten und anhalten) eingreifen können (möglicherweise dürfen wir bestimmte Daten nicht verlieren), bieten Aktoren vier verschiedene Methoden, die wir bereit stellen können. Sie werden durch die Laufzeitumgebung zum gegebenen Zeitpunkt aufgerufen.

 * protected override void PreStart()
 * protected override void PreRestart(Exception reason, object message)
 * protected override void PostRestart(Exception reason)
 * protected override void PostStop()

Etwas genauer steht es in der Akka.NET Dokumentation.

Um die Ergebnisse der diversen Methodenaufrufe einmal untersuchen zu können, möchte ich Dich bitten, ein kleines Konsolen-Projekt anzulegen und das nachfolgende Programm darin abzulegen. Anschließend werden air einen Aktor mit einem Kind anlegen, das leider öfter Exceptions erzeugt. Standardmäßig wird solch ein Aktor neu gestartet.

using System;
using Akka.Actor;
using System.Threading;

namespace AkkaSuperVision
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Hooks");

            var supervisor = system.ActorOf(
                Props.Create<Supervisor>(),
                // Props.Create<Supervisor>().WithSupervisorStrategy( ... ),
                "Supervisor");

            for (var i=0; i<500; i++)
                supervisor.Tell("message " + i);


            Thread.Sleep(500);
            Console.WriteLine("Press [enter] to continue...");
            Console.ReadLine();

            system.Shutdown();
            system.AwaitTermination();
        }
    }
}

Unser etwas zickiges Kind könnte so wie hier aussehen:

using System;
using Akka.Actor;

namespace AkkaSuperVision
{
    public class Child : ReceiveActor
    {
        private int nrMessagesHandled;

        public Child ()
        {
            nrMessagesHandled = 0;
            Receive<string>(s => HandleStringMessage(s));
        }

        protected override void PreStart()
        {
            Console.WriteLine("PreStart Actor '{0}'", Self.Path.Name);
        }

        protected override void PreRestart(Exception reason, object message)
        {
            Console.WriteLine("PreRestart Actor '{0}', reason: {1}, message: {2}",
                Self.Path.Name, reason.Message, message);
        }

        protected override void PostRestart(Exception reason)
        {
            Console.WriteLine("PostRestart Actor '{0}', reason: {1}",
                Self.Path.Name, reason.Message);
        }

        protected override void PostStop()
        {
            Console.WriteLine("PostStop Actor '{0}'", Self.Path.Name);
        }

        private void HandleStringMessage(string message)
        {
            Console.WriteLine("Received: '{0}'", message);

            if (++nrMessagesHandled >= 3)
                throw new InvalidMessageException("haha");
        }
    }
}

Und natürlich brauchen wir einen überwachenden Aktor. Er wird ebenfalls eine String-Nachricht erhalten und sie einfach an das (zickige) Kind weiterleiten.

using System;
using Akka.Actor;

namespace AkkaSuperVision
{
    public class Supervisor : ReceiveActor
    {
        public IActorRef child;

        public Supervisor()
        {
            child = Context.ActorOf(Props.Create<Child>(),"Child");

            Context.Watch(child);

            Receive<string>(s => HandleStringMessage(s));
        }

        private void HandleStringMessage(string message)
        {
            child.Tell(message);
        }

        // protected override SupervisorStrategy SupervisorStrategy()
        // {
        //     return new OneForOneStrategy(
        //         10,
        //         TimeSpan.FromSeconds(10),
        //         exception =>
        //         {
        //             return Directive.Restart;
        //         }
        //     );
        // }
    }
}

Wenn Du das Kommandozeilen Programm startest, wirst Du einen Kind-Aktor erleben, der die String-Nachrichten empfängt, aber bei jeder 3. Nachricht wie beabsichtigt, eine Exception auslöst. Aber er wird automatisch neu gestartet, ohne dass wir uns darum kümmern müssen. Dahinter steckt die sogenannte OneForOne Strategie. Wenn wir eine eigene Strategie hinterlegen möchten, können wir das in einem Aktor durch Überladen der SupervisorStrategy() Methode oder beim Erzeugen des Aktors erledigen. Damit können wir das Verhalten beim Sterben von Kind-Aktoren entsprechend anpassen.

Das Ergebnis beim Programmlauf könnte so aussehen:

PreStart Actor 'Child'
Received: 'message 0'
Received: 'message 1'
Received: 'message 2'
PreRestart Actor 'Child', reason: haha, message: message 2
PostRestart Actor 'Child', reason: haha

...

Received: 'message 497'
PreRestart Actor 'Child', reason: haha, message: message 497
PostRestart Actor 'Child', reason: haha
Received: 'message 498'
Received: 'message 499'
Press [enter] to continue...

"Stimmt nicht ganz" wirst Du gleich sagen. Hmmm, ok, ein wenig geschummelt habe ich. Standardgemäß werden alle Exceptions mitgeloggt und die Console ist das normalerweise eingestellte  Ziel für die Log Ausgaben. Um es abzuschalten, könntest Du deinem Projekt die nachfolgende Konfigurations-Datei beisteuern. Akka.NET verwendet eine eigene Syntax, die innerhalb der Tags "akka" und "hocon" eingebaut sind. Hier kann z.B. das Logging temporär deaktiviert werden. Für praktische Projekte ist das nicht empfehlenswert, aber für unsere Experimente durchaus legitim

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>
    <section name="akka" type="Akka.Configuration.Hocon.AkkaConfigurationSection, Akka" />
  </configSections>

  <akka>
    <hocon>
      <![CDATA[
        akka {
            loggers = ["Akka.Event.StandardOutLogger"]
         log-config-on-start = off
           stdout-loglevel = OFF
           loglevel = DEBUG
           actor {
               debug {  
                #  receive = on 
                #   autoreceive = on
                #  lifecycle = on
                #   event-stream = on
                # unhandled = on
               }
           }
      ]]>
    </hocon>
  </akka>
</configuration>

Die kompletten Details zur Konfiguration findest Du in der Akka.NET Dokumentation.

Zeit für Experimente. Was passiert, wenn Du die SupervisorStrategy() Methode oben auskommentierst? Kannst Du Dir das Verhalten erklären? Wie müsstest Du den Code verändern, um das erwartete Verhalten zu erreichen? Und zum Schluss könntest Du mit der WithSuperVisorStrategy() Methode experimentieren – das wäre zumindest der kürzeste Weg!

Morgen werden wir uns einiges rund um Nachrichten ansehen