Sonntag, 20. Dezember 2015

Akka.NET Adventskalender – Tag 20

Nur zusammen ergibt alles Sinn

An den vergangenen zwei Tagen haben wir uns theoretisch mit Map/Reduce befasst. Heute wollen wir uns die Programmierung dazu näher ansehen (Hinweis: am Ende steht ein Link auf ein github Repository – Du musst also nicht so viel tippen heute).

Der erste Aktor, der die Rohdaten verarbeitet, ist der Map Aktor. Wie beabsichtigt erzeugt er eine Liste von {Sprache, Anzahl} Tupeln, wobei die Anzahl für alle erzeugten Listeneinträge "1" ist. Das Ergebnis wird dem Absender geantwortet und kommt so beim Master wieder an.

using System;
using MapReduce.Messages;
using Akka.Actor;
using System.IO;
using System.Linq;

namespace MapReduce.Actors
{
    public class Mapper : ReceiveActor
    {
        public Mapper()
        {
            Receive<string>(Map);
        }

        private void Map(string input)
        {
            var mapResult = new MapResult();

            using (var reader = new StringReader(input)) 
            {
                string line;
                while ((line = reader.ReadLine()) != null) 
                {
                    if (!String.IsNullOrWhiteSpace(line))
                    {
                        var language = line.Split(new [] { '|' }).Last().Trim();

                        mapResult.Counts.Add(new LanguageCount(language));
                    }
                }
            }

            Sender.Tell(mapResult);
        }
    }
}

Die MapResult Nachricht, die der Map Aktor erzeugt, wird durch den Master an den Reduce Aktor weiter gereicht, der sie dann entsprechend gruppiert:

using System;
using Akka.Actor;
using MapReduce.Messages;

namespace MapReduce.Actors
{
    public class Reducer : ReceiveActor
    {
        public Reducer()
        {
            Receive<MapResult>(Reduce);
        }

        private void Reduce(MapResult mapResult)
        {
            var reduceResult = new ReduceResult();

            foreach (var count in mapResult.Counts)
            {
                if (reduceResult.Result.ContainsKey(count.Language))
                {
                    reduceResult.Result[count.Language] += count.Count;
                }
                else
                {
                    reduceResult.Result[count.Language] = count.Count;
                }
            }

            Sender.Tell(reduceResult);
        }
    }
}

Da sowohl der Map Aktor als auch der Reduce Aktor parallel arbeiten und beide mehrfach existieren, brauchen wir noch einen einzelnen Aktor, der alle Ergebnisse nochmals zusammenfasst. Der Code ist relativ ähnlich zum Reduce Aktor, allerdings wird das finale Ergebnis im Aggregator Aktor gespeichert und kann anschließend abgerufen werden.

using System;
using Akka.Actor;
using MapReduce.Messages;

namespace MapReduce.Actors
{
    public class Aggregator : ReceiveActor
    {
        private ReduceResult reduceResult;

        public Aggregator()
        {
            reduceResult = new ReduceResult();

            Receive<ReduceResult>(Aggregate);
            Receive<GetResult>(_ => Sender.Tell(reduceResult));
        }

        private void Aggregate(ReduceResult result)
        {
            foreach (var language in result.Result.Keys)
            {
                if (reduceResult.Result.ContainsKey(language))
                {
                    reduceResult.Result[language] += result.Result[language];
                }
                else
                {
                    reduceResult.Result[language] = result.Result[language];
                }
            }
        }
    }
}

Wie erwähnt, sind alle Beispiele in diesem git Repository auf github abgelegt.

Liefen wirklich die Aktoren parallel?

Falls es Dir auch schon aufgefallen ist: Glückwunsch! Kaum etwas lief parallel bislang. Wir hatten jeweils nur einen Aktor und die Daten flossen vom einen zum anderen. Einzig zu dem Zeitpunkt, als wir mehr als ein "Dokument" analysieren ließen, konnten der eine Map Aktor und der eine Reduce Aktor parallel laufen.

Aber Du erinnerst Dich sicher noch an unser Konstrukt von vor 3 Tagen? Wenn Du das nutzt, dann werden wir parallel ablaufende Aufgaben erreichen.

someActor = Context.ActorOf(
    Props.Create<SomeActorClass>()
         .WithRouter(new RoundRobinPool(NrWorkers))

Falls Du noch mehr Beispiele für den Einsatz von Map/Reduce suchst, mir hat das Beispiel "Freunde finden" sehr gefallen.

Morgen werden wir unseren letzten Sprint starten. Wir haben bisher einzelne und zahlreiche Aktoren untersucht. Was Nachrichten anging, haben wir stets einen Empfänger addressiert. Das werden wir nächste Woche ändern.

Keine Kommentare:

Kommentar veröffentlichen