Sonntag, 13. Dezember 2015

Akka.NET Adventskalender – Tür 13

Ströme beeinflussen

Wie wir gestern gesehen haben, hat eine "Pipe and Filters" Architektur eine sehr positive Eigenschaft: Erweiterbarkeit. Wir können an jeder Stelle unserer Verarbeitungskette weitere Komponenten hinzufügen und damit die Funktionalität anpassen oder erweitern.

Heute werden wir "berechnende" Aufgaben in unsere Verarbeitungskette einfügen. Das einfache Beispiel, das wir heute erstellen werden, ist ein Wort-Zähler. Bestimmt kommt dir das Stichwort "Map-Reduce" dabei in den Sinn, wenn es um solche Aufgaben geht. Allerdings unterscheidet sich der Ansatz, den Map-Reduce geht deutlich von dem, was wir heute machen. Map-Reduce wurde erfunden, um große Datenmengen parallel zu verarbeiten und damit die Verarbeitungszeit zu reduzieren. Genau das werden wir in ein paar Tagen auch tun. Für heute werden wir uns mit einem "rechnenden" Aktor begnügen, den wir in unsere Verabeitungskette einfügen. Dadurch bekommen wir möglicherweise einen bestimmten Grad an Parallelität, denn die hinterander geschalteten Stufen können durchaus gleichzeitig tätig sein.

Aber zurück zu unserem eigentlichen Anliegen: Wörter zählen. Bislang lesen wir ja unsere Textdatei zeilenweise, insofern müssen wir uns um zwei Dinge kümmern: Zeilen in Wörter zerlegen und diese anschließend zählen. Du ahnst es schon: wir werden zwei zusätzliche Komponenten in unsere Kette einfügen.

Fangen wir mit dem Zerlegen an. Es muss also ein Aktor her, der eine Zeile in Worte zerlegt. Reguläre Ausdrücke sind ein prima Mittel dafür (Naürlich gibt es noch 12 Millionen anderer Wege). Falls Du eine kleine Auffrischung zum Verständnis des gleich verwendeten Ausdrucks brauchst, hier ein paar Hinweise. Die vollständige Doku der .NET Implementierung liefert Microsoft.
  • "\w" ist eine Meta-Sequenz, die Wort-Bestandteile (Buchstaben und den Unterstrich) in einem angelieferten Text findet
  • "?", "+" und "*" sind Quantifizierer, die kein- oder einmal (?), ein oder mehrmals (+) oder gar nicht bis beliebig oft (*) den vorausgegangenen Ausdruck treffen.
Unser WordSplitter Aktor könnte dann so aussehen (alle heutigen Quelltexte sind in einem github Repository):

using System;
using Akka.Actor;
using System.Text.RegularExpressions;
using WordCount.Messages;

namespace WordCount.Actors
{
    public class WordSplitter : ReceiveActor
    {
        private IActorRef next;

        public WordSplitter(IActorRef next)
        {
            this.next = next;

            Receive<string>(SplitIntoWords);
            Receive<End>(next.Tell);
        }

        private void SplitIntoWords(string s)
        {
            var matches = Regex.Matches(s, @"\w+");
            foreach (Match match in matches)
                next.Tell(match.Value);
        }
    }
}

Mittlerweile sind wir ja nicht mehr überrascht, dass die Logik innerhalb einzelner Aktoren so kurz ist. Allerdings fällt der eigentliche Wort-Zähler etwas umfangreicher aus und verhält sich auch anders als bisherige Aktoren. Grund ist, dass wir erst nach dem letzten Wort genau sagen können, welches Wort wie häufig im Text vorkam. Also müssen wir solange Wörter beim zählenden Aktor ankommen, eine entsprechende interne Struktur aufbauen und geben diese erst nach dem Eintreffen der End Nachricht wieder aus. Aber die Logik ist für Dich sicher auf anhieb verständlich:

using System;
using System.Linq;
using Akka.Actor;
using System.Collections.Generic;
using WordCount.Messages;

namespace WordCount.Actors
{
    public class WordCounter : ReceiveActor
    {
        private IActorRef next;
        private Dictionary<string, int> wordCount;

        public WordCounter (IActorRef next)
        {
            this.next = next;
            wordCount = new Dictionary<string, int>();

            Receive<string>(CountWord);
            Receive<end>(Terminate);
        }

        private void CountWord(string word)
        {
            if (wordCount.ContainsKey(word))
                wordCount[word]++;
            else
                wordCount.Add(word, 1);
        }

        private void Terminate(End end)
        {
            var counts =
                wordCount.Keys
                    .OrderBy(w => w)
                    .Select(w => String.Format("{0}: {1}", w, wordCount[w]));
           
            foreach (var count in counts)
                next.Tell(count);

            next.Tell(end);
        }
    }
}

Damit alles funktionert, müssen wir unsere Verarbeitungskette wieder entsprechend erweitern. Wie immer fangen wir hinten an und arbeiten uns nach vorne durch.

using System;
using Akka.Actor;
using Akka.Routing;
using WordCount.Actors;

namespace WordCount
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Words");

            var console = system.ActorOf(Props.Create<ConsoleWriter>());
            var counter = system.ActorOf(Props.Create<WordCounter>(console));
            var splitter = system.ActorOf(Props.Create<WordWplitter>(counter));
            var caser = system.ActorOf(Props.Create<LowerCaser>(splitter));
            system.ActorOf(Props.Create<FileReader>(args[0], caser));

            system.AwaitTermination();
        }
    }
}

Zum Starten des Programms müssen wir in einem Terminal nach dem Programm-Namen den Namen einer existierenden Datei angeben, aus der wir die Worte lesen möchten. Alternativ kannst Du selbstversändlich auch den Pfad auf eine Datei beim Erzeugen des FileReader Aktors gleich mit angeben.

Damit sind wir auch schon am Ende unseres zweiten Sprints angekommen. Wie letzten Sonntag habe ich einen kleinen Bonus. Es gibt ein großartiges Buch, das gerade am Entstehen ist "Reactive Design Patterns" (in englisch) geschrieben von Roland Kuhn. Jeweils die fertig geschriebenen Kapitel werden verfügbar gemacht.

Ab morgen werden wir uns mit deutlich mehr Aktoren als bisher befassen. Da werden wir parallel laufende Berechnungen und deren Implementierung mit Akka.NET untersuchen.

Keine Kommentare:

Kommentar veröffentlichen