Samstag, 12. Dezember 2015

Akka.NET Adventskalender – Tag 12

Ströme beeinflussen

Gestern haben wir mit einer Diskussion zu "Pipes and Filters" begonnen. Heute werden wir anfangen, die dazu notwendigen Aktoren zu entwickeln. Wir werden auch hier ein paar Kompromisse eingehen, die in der Praxis eventuell problematisch werden können. So kann ein langsam arbeitender Aktor schnell für einen unangenehmen Rückstau sorgen, der sich durch hohe zu haltende Datenmengen bemerkbar macht. Das kann mit einem besseren aber komplizierteren Protokoll vermieden werden. Aber uns soll es ja mehr um das Prinzip gehen, daher ignorieren wir solche Dinge.

Ein einfacher zeilenweise aus einer Datei lesender Quell-Aktor könnte wie folgt aussehen. (Lass Dich noch nicht vom gewählten Namensraum täuschen – das ist das Ziel, das wir am Ende erreichen werden).

using System;
using System.IO;
using Akka.Actor;
using System.Text;

namespace WordCount
{
    public class FileReader : ReceiveActor
    {
        private class ReadNextLine {}

        private IActorRef next;
        private FileStream fileStream;
        private StreamReader streamReader;

        public FileReader(string filePath, IActorRef next)
        {
            this.next = next;

            fileStream = new FileStream(filePath,
                FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
            streamReader = new StreamReader(fileStream, Encoding.UTF8);

            Self.Tell(new ReadNextLine());

            Receive<ReadNextLine>(ReadLine);
        }

        private void ReadLine(ReadNextLine r)
        {
            string line = streamReader.ReadLine();

            if (line != null)
            {
                next.Tell(line);
                Self.Tell(new ReadNextLine());
            }
            else
            {
                next.Tell(new End());
            }
        }
    }
}

So wirklich kompliziert ist das ja nicht. In der Tat ist das ein wesentliches Merkmal von Streams, dass die einzelnen Bausteine sehr einfach sind. Die Leistung erhält man durch Kombination.

Demnach verwundert es kaum, dass ein Ziel-Aktor, der den erhaltenen Stream auf der Konsole ausgibt, ebenso einfach ist.

using System;
using Akka.Actor;

namespace WordCount
{
    public class ConsoleWriter : ReceiveActor
    {
        public ConsoleWriter()
        {
            Receive<string>(Console.WriteLine);
            Receive<End>(_ => Context.System.Shutdown());
        }
    }
}

Über das Herunterfahren des gesamten Aktorsystems durch einen einzelnen Aktor kann man wiederum geteilter Meinung sein. Produktiv haben solche Konstrukte definitiv nichts verloren.

Zusammengebracht wird die einfachste Kette zum Beispiel so:

using System;
using Akka.Actor;

namespace WordCount
{
    class MainClass
    {
        public static void Main(string[] args)
        {
            var system = ActorSystem.Create("Words");

            var console = system.ActorOf(Props.Create<ConsoleWriter>());
            system.ActorOf(Props.Create<FileReader>(args[0], console));

            system.AwaitTermination();
        }
    }
}

Du musst beim Aufbau allerdings berücksichtigen, dass solch eine Kette immer vom Ende her konstruiert wird, weil wir ja im Konstruktor des Vorgängers die Referenz auf den nachfolgenden Aktor benötigen. Alternativ könnte man die Aktoren auch in willkürlicher Reihenfolge erzeugen und mittels einer Connect Nachricht die einzelnen Bestandteile der Kette zusammenfügen.

Wenn Du einen Aktor einfügen möchtest, ändert sich die Konstruktion der Kette wie folgt:

var console = system.ActorOf(Props.Create<ConsoleWriter>());
var lowerCaser = system.ActorOf(Props.Create<LowerCaser>(console));
system.ActorOf(Props.Create<FileReader>(args[0], lowerCaser));

Und der Aktor, der (war schwer zu raten, oder?) aus einem Stream an Zeilen einen Stream an aus Kleinbuchstaben bestehenden Zeilen erzeugt, könnte so aussehen und ist wie fast alle Ketten-Bestandteile sehr einfach...

using System;
using Akka.Actor;
using WordCount.Messages;

namespace WordCount
{
    public class LowerCaser : ReceiveActor
    {
        public LowerCaser(IActorRef next)
        {
            Receive<string>(s => next.Tell(s.ToLower()));
            Receive<End>(next.Tell);
        }
    }
}

Und morgen werfen wir einen Blick auf das Herzstück der Anwendung und den Grund, warum der Namensraum der obigen Klassen "WordCount" heißt :-)

Keine Kommentare:

Kommentar veröffentlichen