News

Ein Ein­blick in die fas­zi­nie­ren­de Welt des Stream Pro­­ces­sing

In den ersten aufregenden Wochen bei Oepfelbaum wurde mir die Gelegenheit geboten, mich mit einem akuten Problem eines Kunden auseinanderzusetzen. Konkret ging es darum, einen überholten Importvorgang durch die Anwendung einer vergleichsweise neuen Technologie aufzuwerten – des Event Streaming.

Ausgangslage

Bisher wurden sämtliche Daten aus dem Core-Banking täglich über CSV-Dateien geliefert – eine pro Datentyp. Ein Prozess, der auf den Abhängigkeiten zwischen den verschiedenen Datentypen basierte, übernahm anschliessend die Verarbeitung.

Bei näherer Betrachtung erschien diese Verarbeitungsmethode als recht statisch und ineffizient. Also stellten sich Fragen wie: Müssen wirklich täglich alle Daten neu importiert werden? Und wie weit lässt sich ein solcher Prozess parallelisieren? In Zusammenarbeit mit einem erfahrenen Oepfelbäumler und unserem Kunden wurden die Grundlagen für die Optimierung des Prozesses geschaffen.

Grundlagen der neuen Lösung

Die Daten sollten über Kafka, ein Tool zur Speicherung und Verarbeitung von Datenströmen, geliefert werden. Verschiedene Kanäle (Topics) sollten die alten CSV-Dateien ersetzen und durch Partitionierung eine Parallelisierung ermöglichen.

Meine ersten Schritte mit Kafka waren wie ein spannendes Experiment. Der Aufbau einer lokalen Kafka-Instanz war schnell erledigt, und die Interaktionen mit Kafka durch sogenannte «Consumer und Producer» liessen sich ohne grosse Hürden in Java implementieren.

Abstraktion der Problemstellung und Herausforderungen

Um die Abhängigkeiten zwischen verschiedenen Datentypen zu modellieren, griff ich auf Entitäten zurück, die in einer Datenbank über Fremdschlüssel miteinander verbunden wurden. Doch schon bald wurde klar, dass es die verschiedenen Arten von Abhängigkeiten waren, die die grössten Herausforderungen mit sich brachten.

Schlussendlich reicht es nicht, nur die Verbindungen zwischen den verschiedenen Datentypen zu beachten. Denn sobald man Teillieferungen erlauben möchte, muss auch sichergestellt werden, dass Updates des gleichen Eintrags in der richtigen Reihenfolge verarbeitet werden.

Kafka und Java – sonst nichts

Mein erster Ansatz bestand darin, eine Eigenkreation mit Kafka und Java zu entwickeln – im «Consumer und Producer»-Prinzip mit etwas Logik dazwischen. Doch die Komplexität stieg schnell, besonders im Umgang mit den verschiedenen Arten von Abhängigkeiten, was sich negativ auf die Performance auswirkte.

Zentral sind vor allem folgende Fragen, die ich für unseren Kunden beantwortet habe: 

  • Wie soll festgestellt werden, dass eine Vorbedingung erfüllt ist?
    Durch den direkten Zugriff auf das Zielsystem kann geprüft werden, ob die Fremdschlüssel aufgelöst werden können.
  • Wie soll die Applikation reagieren, wenn die Vorbedingung eines Eintrags noch nicht erfüllt ist?
    Die ID des Elements wird in einer Datenbank gespeichert, und das Element selbst wird auf einem Retry Topic publiziert.
  • Was passiert, wenn mehrere Versionen des gleichen Eintrags auf eine Vorbedingung warten?
    Bei der Verarbeitung wird überprüft, ob ein Datenbankeintrag mit der ID des Elements existiert. Ist dies der Fall, wird das Update auf dem Retry Topic veröffentlicht, wobei sowohl das Element als auch der Datenbankeintrag eine Sequenz-Nummer erhalten. Diese Nummer dient der Beibehaltung der Reihenfolge beim Verarbeiten des Retry Topic.

Grob zusammengefasst lässt sich der Prozess auch mit folgender Grafik beschreiben:

Abbildung 1: Finaler Prozess der Eigenkreationen

Betrachtet man die Lösung genauer, wird deutlich, dass sie viele Interaktionen mit der Datenbank erfordert und einige Operationen durchgeführt werden müssen, selbst wenn die Reihenfolge der Elemente bereits richtig ist.

Mein Blick über den Tellerrand

Bereits während der Umsetzung meiner Lösung kam die Idee auf, dass man auch ein Framework verwenden könnte, das bereits auf die Verarbeitung von Datenströmen spezialisiert ist. Viele solche Frameworks existieren schon, darunter bekannte von der Apache Software Foundation wie Spark, Storm oder Flink. Die Tatsache, dass grosse Unternehmen wie Amazon, Spotify und eBay diese frei zugänglichen Apache-Projekte häufig verwenden, lässt auf hohe Skalierbarkeit und Stabilität schliessen.

Jedes Framework hat seine Eigenheiten, sei es der Fokus auf Batch-Verarbeitung oder unterschiedliche Garantien bei der Elementverarbeitung (exactly-once vs. at-least-once). Nach Abwägungen entschieden wir uns für Flink, das unserer Meinung nach das Beste aus allen Welten vereint.

Warum ist Flink besser als andere Frameworks?

Der erste grosse Vorteil liegt in der Ausrichtung von echtzeitbasiertem Stream Processing. Bei einigen Alternativen lag der Fokus zunächst auf dem Batch Processing, und erst später wurde eine Lösung entwickelt, um die Latenz bei kontinuierlicher Verarbeitung zu reduzieren.

Ein weiterer Pluspunkt sind die Verarbeitungsgarantien, die je nach Verwendung und Konfiguration sogar über Applikationsabstürze hinweg bestehen bleiben und auch Umsysteme einbeziehen können. Diese Garantien werden durch einen Checkpointing-Mechanismus und ein Two-Phase-Commit-Protokoll erreicht.

Das in-memory State Management spielte in meinem Proof of Concept (PoC) eine grosse Rolle und machte im Vergleich zur ersten Lösung die Datenbank überflüssig. Dies ermöglichte es mir, Verbindungen zwischen verschiedenen Elementen in einem Stream herzustellen.

Nicht zuletzt ist die Möglichkeit hervorzuheben, verschiedene Streams dynamisch miteinander zu verbinden, ähnlich einem Join von Datenbanktabellen. Dies ist bei anderen Frameworks oft nur für begrenzte Stream-Abschnitte vorhanden.

Einfach und schnell zu einer stabilen, skalierbaren Lösung

Mit den Erfahrungen aus dem ersten Ansatz begann ich, eine Flink-Lösung für das bekannte Problem zu entwickeln. Der Start gestaltete sich etwas herausfordernd, da in den letzten Jahren einige Verbesserungen an Flink vorgenommen worden waren und ausgewählte Komponenten bereits veraltet waren. Da die Problemstellung davon ausgeht, dass verschiedene Entitäten über einen Fremdschlüssel miteinander verbunden sind, lässt sich das Problem einfach umformulieren.

Reichen die bekannten Daten aus, um einen Join zu bilden?

Ich begann, mit kombinierten Streams zu arbeiten, auf denen beide Typen der beitragenden Streams auftauchen können. Auf den ersten Blick mag das chaotisch klingen, aber Flink ermöglicht es, Schlüssel auf Streams zu definieren, die abhängig von den verarbeiteten Elementen sein können. Hier kommt auch das bereits vorgestellte State Management ins Spiel.

Abbildung 2: Kombination zweier Streams

Was eignet sich als Schlüssel auf den kombinierten Streams?

Zwar wären die IDs der Elemente gute Kandidaten, aber dadurch würde kein wirklicher Zusammenhang zwischen einem Element und seiner Vorbedingung hergestellt. Die Lösung besteht darin, die ID der Vorbedingung und den Fremdschlüssel des abhängigen Elements zu verwenden. Dies erlaubt es, bei jeder Erwähnung des Schlüssels auf den zugehörigen State des Schlüssels zuzugreifen.

Abbildung 3: Definition des Schlüssels

Nun zum State Management

Was genau muss in einem State gespeichert werden? Hier können wir auf die Fragen aus dem ersten Ansatz zurückblicken:

  • Wie wird festgestellt, dass eine Vorbedingung erfüllt ist?
  • Wie reagiert die Applikation, wenn die Vorbedingung eines Eintrags noch nicht erfüllt ist?reagiert die Applikation, wenn die Vorbedingung eines Eintrags noch nicht erfüllt ist?
  • Was passiert, wenn mehrere Versionen des gleichen Eintrags auf eine Vorbedingung warten?

Es wird schnell deutlich, dass zwei Fälle separat abgedeckt werden müssen. Einerseits möchten wir nicht über ein externes System überprüfen müssen, ob eine Vorbedingung erfüllt ist, und andererseits möchten wir eine Möglichkeit haben, mit fehlenden Vorbedingungen umzugehen.

Das Problem lässt sich wie folgt lösen:

Für die Überprüfung der Vorbedingung reicht ein Boolean State aus, der angibt, ob ein Schlüssel bereits auf einem Element im Stream mit den Vorbedingungen gesehen wurde. Für nicht auflösbare Abhängigkeiten kann ein Listen-State angelegt werden, der mit den Abhängigkeiten befüllt wird. Mit diesen States lässt sich bereits der Verarbeitungsprozess erahnen. Wenn die Elemente in der richtigen Reihenfolge eintreffen, wird durch die Vorbedingung der Boolean State auf «true» gesetzt, und sobald das abhängige Element eintrifft, kann es nach einer kurzen Überprüfung weitergeschickt werden.

Abbildung 4: Verarbeitung bei korrekter Reihenfolge

Ist die Vorbedingung noch nicht erfüllt, wird die Abhängigkeit in die Liste aufgenommen, und sobald die Vorbedingung eintrifft, wird der Eintrag aus der Liste verarbeitet und entfernt. Für spätere Abhängigkeiten wird der Boolean State auf «true» gesetzt, wodurch diese daraufhin reibungslos verarbeitet werden können.

Übersicht der Ergebnisse

  • Mit weiteren Streams können komplexere Abhängigkeitsstrukturen realisiert werden.
  • Falls am Bestimmungsort bereits Informationen über erfüllte Vorbedingungen existieren, die noch nicht durch die Boolean States abgebildet sind, können Teillieferungen nicht verarbeitet werden. Initial muss also eine komplette Lieferung des Datenbestands verarbeitet werden, für die danach die Boolean States über das vorhandene Checkpoint-System zwischengespeichert und wiederhergestellt werden können.
  • Bei grossen Datenmengen wird entsprechend ein grosser Zwischenspeicher benötigt, da für jede Abhängigkeit jeweils ein Boolean State im Zwischenspeicher angelegt wird. Nebst der grossen Menge an Boolean States kann auch ein Stau in den Listen-States entstehen, falls grosse Verzögerungen bei der Verarbeitung der Vorbedingungen auftauchen.

Fazit

Die zunächst einfach erscheinende Problemstellung brachte am Ende doch einige Herausforderungen mit sich, die nicht unterschätzt werden sollten. Insbesondere in Bezug auf die Leistung gab es erhebliche Unterschiede zwischen der Eigenkreation und der Lösung auf Basis eines etablierten Frameworks.

Auch wenn es manchmal einschüchternd wirken mag, sich gleichzeitig mit mehreren neuen Technologien auseinandersetzen zu müssen, rechnet sich der Aufwand am Ende hinsichtlich der Qualität der Lösung. Nicht nur die Leistung wird in den meisten Fällen effektiver sein, sondern auch die Wartbarkeit und die Dokumentation des Codes profitieren von den bereits existierenden Informationen zu einem Framework.

Kurz gesagt

Sobald eine Stream Processing-Lösung mit hohem Datendurchsatz und bereits niedriger Komplexität umgesetzt wird, sollte definitiv die Möglichkeit in Betracht gezogen werden, ein Framework wie Flink zu verwenden.

Kontakt