Kafka Connect wurde entwickelt, um Events schnell und fehlertolerant von einer Source/ Quelle zu einem Sink/ Ziel zu übertragen. Diese Anleitung beschreibt die Einrichtung eines Kafka CDC (Change Data Capture) Connectors, der alle Events von MongoDB an ein Kafka Topic in der Confluent Cloud weiterleitet.
Um Kafka Conenct fehlertolerant einzurichten, müssen wir mehrere Worker Nodes einsetzen. Ein Failover soll immer dann stattfinden, wenn ein Node nicht mehr verfügbar ist. Um eine ausreichende Verfügbarkeit zu gewährleisten, werden wir in diesem Artikel ein Cluster mit 3 Nodes einrichten. Ein Worker Node ist ein Konsument, der MongoDB Change Stream Events konsumiert und an ein Kafka-Topic in der Confluent Cloud weiterleitet.
Dabei ist zu beachten, dass die Change Stream Events von MongoDB vollständig Schreibzugriffe (“fully acknowledged writes”) auf den MongoDB-Cluster sind und keine Schreibzugriffe aus dem WAL-Log. So haben wir die Garantie, dass wir die korrekten/konsistenten Events konsumieren, die in MongoDB geschrieben und von allen Nodes bestätigt wurden.
Wir müssen mehrere Worker Nodes einsetzen, um einen Kafka-Connect-Cluster zu bilden. Es gibt keine Zookeeper oder andere Komponente, die wir neben den Workern einsetzen müssen. Die Worker verwenden die Kafka-Consumer-API und sind somit Teil einer Kafka-Consumer-Gruppe. Sie hören auf Rebalances und reagieren entsprechend. Wenn ein Node nicht mehr verfügbar ist, findet ein Rebalance statt und der Polling-Task wird neu verteilt.
Worker Nodes kommunizieren
a) mit Kafka-Brokern in Confluent und
b) untereinander
Auf einen Cluster von Worker, können wir ein Connector-Plugin installieren. In unserem Fall ist es ein Source-Connector Plugin, das die Daten von MongoDB pollt. Diese Plugins erweitern den `SourceTask` von Kafka-Connect selbst und geben eine Liste von `SourceRecords` zurück, die an die Kafka-Connect Library übergeben werden. Das bedeutet, dass das Plugin den Task konfiguriert, der von der Library ausgeführt werden soll. Die Kafka-Connect Library, die Basis auf jedem Worker, kümmert sich um diesen Task und stellt sicher, dass er im Cluster verteilt ausgeführt wird und im Falle eines Fehlers auf verfügbare Nodes übergeht.
Daher müssen wir das Connector-Plugin installieren und konfigurieren. Kafka Connect muss diese Konfigurationen innerhalb des Clusters speichern, damit nach einem Failover der nächste Worker den Task fortsetzen kann. Ein Connector-Plugin kann normalerweise über den Confluent-Hub heruntergeladen und installiert werden. Das Plugin kann heruntergeladen und im konfigurierten `plugin.path` des Workers abgelegt werden. Wird Confluent-Hub verwendet, wird das Plugin in einem Standard-Plugin.path installiert.
Docker-Hub Plugin Path config
Ein Beispiel für den Confluent-Hub-Ansatz finden Sie in der mitgelieferten Dockerdatei in diesem Leitfaden.
Wenn wir 3 Worker mit dem Connector-Plugin installiert starten, erhalten wir die folgendes Setup für einen Cluster:
Sobald der Worker korrekt konfiguriert und das Plugin installiert ist, können wir einen Connector und somit inherent einen/ mehrere Tasks erzeugen. Da sich das Plugin um die Erstellung der Tasks kümmert, müssen wir also nur einen Connector mit der richtigen Konfiguration erstellen. Für das MongoDB Source-Connector Plugin wird nur ein Task pro MongoDB Collection erzeugt. Das heisst wenn wir mehrere Collections pollen wollen, so müssten wir für dieses mehrere Connectors erstellen, die das Plugin benutzen. Das liegt daran, dass das change-stream Feature von MongoDB so aufgebaut ist, dass es über sharded Collections skaliert – der Einfachheit halber werden wir das nicht behandeln.
Sobald wir 3 Worker gestartet haben, können wir einen Connector, via POST der Konfiguration an den Worker-Cluster, erzeugen. Unser Setup sieht nun wie folgt aus:
Ja, Sie haben richtig gelesen, die Connector Konfiguration muss (im verteilten Modus) über einen HTTP POST an den Cluster gesendet werden, um einen Connector zu erstellen. Daher sollten die Workers von aussen erreichbar sein. Entsprechende Security Massnahmen können getroffen werden.
Beispiel:
POST connector
Falls wir zum Beispiel eine Collection von einem weiteren MongoDB Clustern konsumieren wollen, müssen wir einen weiteren Connector erstellen. Der weitrere Connector hat eine andere Konfiguration für die Datenbankverbindung, die zu konsumierende Collection etc.
Ein vollständiges Beispiel einer Konfiguration eines Connectors finden Sie in der nachfolgenden Schritt-für-Schritt-Anleitung.
Um zu verstehen, was ein Connector genau ist, sehen wir uns nun solch ein Setup mit mehreren Connectoren an, die sich auf zwei verschiedene Datenbank-Cluster (zwei verschiedene MongoDB Cluster) verbinden.
Anstelle von zwei MongoDBs könnten wir natürlich auch ein anderes Plugin installieren, wie z. B. das Debezium PostgreSQL CDC Source Connector Plugin, und einen Connector zu erstellen, der sich mit einer PostgresSQL-Datenbank verbindet.
Die REST-API jedes Workers wird hauptsächlich dazu verwendet, einen Connector zu erzeugen, aber auch zur Überwachung der Connectoren und Tasks. Via die REST-API, kann man auch Connector-Konfigurationen validieren bevor man sie effekt anfügt. Anfragen an einen der Follower-Worker werden immer an den Leader weitergeleitet.
Wir haben mehrere wichtige API-Endpunkte:
Important connector endpoints
Die gesamte API finden Sie hier:
https://docs.confluent.io/4.1.0/connect/references/restapi.html
Einige weitere praktische Beispiele finden Sie hier:
https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/
Wenn ein Worker Node dem Cluster beitritt, lädt er das “status topic” (internal compacted topic), in dem die aktuelle Leader-Url gespeichert ist. Die Follower leiten die REST-Anfragen immer an diese Leader-Url weiter. Wieso das wichtig ist, zeigt das nachfolgende Beispiel.
Wenn wir zum Beispiel einen Connector erzeugen oder löschen, muss eine neue Konfiguration im internen Config-Topic gespeichert werden. Wenn wir dieselbe Konfiguration für einen Connector an zwei Follower-Nodes senden und die Anfragen nicht an den Leader weitergeleitet würden, könnten beide denselben Connector zum internen Config-Topic hinzufügen und wir hätten am Ende einen doppelten Connector. Daher werden die Anfragen immer an den Leader weitergeleitet, um sicherzustellen, dass keine falschen Zustände entstehen.
(To see that in code / Um das im Code zu sehen)
Wir müssen also sicherstellen, dass das Netzwerk so konfiguriert ist, dass die Worker kommunizieren können: Jeder Worker gibt einen Hostnamen und einen Port bekannt und kann bei einem Rebalancing zum Leader gewählt werden.
Die folgenden Einstellungen müssen in der Konfiguration hinzugefügt werden:
REST Configs
Ein vollständiges Beispiel finden Sie unten in der Schritt-für-Schritt-Anleitung.
Loadbalancer-Probleme
Wichtig ist hierbei, dass die Worker jeweils ihre eigene Route haben und ihnen kein Loadbalancer vorgeschaltet ist. Wenn Sie Ihren Kafka-Connect-Workern einen Loadbalancer mit einer Random/Round-Robin-Policy vorschalten, kann es gelegentlich zu dem unten beschriebenen Fehler kommen, da einige Ihrer Anfragen nicht an den Leader, sondern an den Follower weitergeleitet werden. Wie oben beschrieben, würde dies zu Problemen führen, sodass der unten stehende Fehler auftritt:
Error with loadbalancers
Im verteilten Modus muss es natürlich eine Möglichkeit geben, den Connector/ Worker/ Task Status zu speichern, damit alle Worker Nodes darauf zugreifen können und bei einem Fehler entsprechend reagieren können..
Kafka-Connect speichert diese Informationen derzeit in 3 internen compacted Topics:
Internal Topics
Um die Connector Konfiguration in der Schritt-für-Schritt-Anleitung zu verstehen, müssen wir zunächst ein paar weitere Punkte zu den Connectors behandeln.
Jeder Connector verbindet sich mit einer Datenquelle. Im Falle unserer MongoDB können wir den Connector so konfigurieren, dass er nicht nur die Patches jeder CRUD-Operation abruft, sondern das gesamte Dokument, das eingefügt, aktualisiert oder gelöscht wurde. Im Falle von MongoDB hat dieses „vollständige Dokument“ jedoch eine bestimmte Struktur und Properties. Wenn Sie z.B. beschliessen, dass Sie bestimmtes Property nicht benötigen oder dass Properties Ihres vollständigen Dokuments miteinander verkettet werden sollen, um den Key des Evenst zu bilden, benötigen Sie Transformationen.
Lassen Sie uns unsere MongoDB-Collection „Events“ beobachten (an die “change stream events” subscriben).
MongoDB watch cursor
Wenn wir ein Event hinzufügen und das “fullDocument” beobachten, erhalten wir Folgendes:
MongoDB full document
Wie Sie sehen können, haben wir eine Menge umliegender Properties die wir nicht gebrauchen. Daher müssen wir Transformationen hinzufügen. Dazu gibt es verschiedene Möglichkeiten:
Beachten Sie, dass die eingebauten SMTs meist von einem Schema abhängen. MongoDB wird grösstenteils ohne Schema verwendet, sodass wir ein Schema im Plugin angeben oder ableiten müssen, um sie verwenden zu können. Je nachdem, welches Plugin Sie wählen, können Sie also Transformationen anwenden, aber das ist nicht immer der Fall. Sie sind nicht immer gut dokumentiert und müssen manchmal in der code base und den Tests der code base des Plugins nachgeschlagen werden.
Wenn unser Ziel darin besteht, einen schön aussehenden Schlüssel aus mehreren Properties innerhalb des vollständigen Dokuments zu generieren, den wir später für den eigentlichen Key in Kafka extrahieren können, können wir eine MongoDB-Pipeline-Transformation verwenden. Diese wird während der Überwachungs-Query der Collection ausgeführt, und wir können anschliessend die in MongoDB eingebaute Plugin-Transformation verwenden, um sie als Schlüssel zu extrahieren. Hierfür benötigen wir keine eingebaute SMTs.
Schauen wir uns an, wie eine Pipeline in MongoDB selbst funktioniert:
MongoDB Pipeline
MongoDB full document after pipeline
Wir müssen diese Pipeline zu unserer Connector-Konfiguration hinzufügen, um das Connector-Plugin anzuweisen, sie zu der Überwachung hinzuzufügen. Diese wird ausgeführt, um neue Dokumente aus dem Plugin zu streamen. Ausserdem müssen wir dieses neu generierte Property extrahieren und als Key verwenden. Dies kann über den output.schema.key erfolgen. Damit es angewendet werden kann, muss output.format.key auf „schema“ gesetzt werden. Da der Key vom Typ String ist und unser Key in Kafka ein String sein soll, können wir den key.converter auf „StringConverter“ setzen.
Connector config.json with MongoDB-Pipeline
Als Beispiel werden wir einen Cluster von Worker Nodes in einer Cloud-Foundry-Umgebung einrichten. Cloud-Foundry wurde genommen, weil sich die anzuwendenden Schritte sehr anschaulich und verständlich darstellen lassen. Sie benötigen keine Kenntnisse über Cloud-Foundry, um dieser Anleitung zu folgen. Die Schritte können auch auf z.B. Kubernetes übernommen werden.
Was Sie in dieser Anleitung erwartet:
Falls Sie sich noch nicht angemeldet haben, gehen Sie zu https://confluent.cloud/login und erstellen Sie einen Basis-Cluster.
Wenn Sie Ihren Cluster über längere Zeit mit dem Setup aus diesem Leitfaden betreiben, können unter Umständen Kosten anfallen. Das liegt daran, das die internen Themen eine grosse Anzahl an Partitionen verwenden.
Beispiel Kostenübersicht:
Wenn man das obere Setup für ca. 1 Monat laufen lässt, belaufen sich die Kosten auf ungefähr 70 $. Um Kosten zu sparen, können Sie das Setup in der Zeit, in der Sie es nicht benötigen, abschalten, oder einen lokalen Kafka-Cluster betreiben.
Kafka’s Beschreibung:
If you choose to create this topic manually, always create it as a compacted, highly replicated (3x or more) topic with a large number of partitions (e.g., 25 or 50, just like Kafka’s built-in __consumer_offsets topic) to support large Kafka Connect clusters.
z. B. für das internal offset topic ist der Standart 25 Partitionen:
offset.storage.topic = default 25 partitions
Für den Zweck unseres Setups könnten wir die Anzahl der Partitionen viel niedriger ansetzen, aber die Optimierung des Clusters ist nicht Teil dieser Anleitung. Denken Sie auch an den Zweck Ihres Clusters. Wenn Sie expandieren möchten, benötigen Sie mehr Partitionen.
Wir werden das confluentinc/cp-kafka-connect-base Docker-Image wiederverwenden, das die Worker-Library enthält, um unser eigenes Docker-Image für einen Worker zu erstellen und zu konfigurieren.
Die Env-Vars können später überschrieben werden, indem die Env-Vars nach dem Deployen der Anwendung gesetzt werden.
Override env var via cf set-env
Wir werden
CONNECT_REST_ADVERTISED_HOST_NAME
und
CONNECT_REST_PORT
für jeden Worker später überschreiben müssen.
Aufgabe: Erstellen Sie ein neues Verzeichnis und speichern Sie das folgende Dockerfile.
Full Dockerfile
Wie oben beschrieben, hat jeder Connector seine eigene Konfiguration, die wir überprüfen können und dann via POST anfügen müssen. Unter anderem, enthält die Konfiguration die DB Connection and die Ziel MongoDB Collection.
Aufgabe: Kopieren Sie die folgende Konfiguration und erstellen Sie eine config.json Datei im Stammverzeichnis des neu erstellten Verzeichnisses und ersetzen Sie die erforderlichen Konfigurationseigenschaften.
Full Connector config.json to POST
Um den Worker zu starten, erstellen wir eine Datei und speichern sie in dem neu erstellten Verzeichnis. Es muss ein `sleep infinity` hinzugefügt werden, der verhindert, dass der Worker angehalten wird. Nachdem
Aufgabe:
Erstellen Sie eine Datei im Stammverzeichnis mit den Namen run.sh und dem folgenden Inahlt:
Full run.sh
Aufgabe: Erstellen Sie Ihr Image mit einem Tag.
Build Docker Image
Wir werden unser Image mit einem Container pushen, der 2 GB Arbeitsspeicher und 2 GB Festplattenplatz enthält. Wir wollen 3 Worker laufen lassen, um unseren Cluster zu bilden.
Für Cloud Foundry-Benutzer können Sie den folgenden CLI Commands verwenden:
Push workers to cf
Wir müssen sicherstellen, dass unsere eingesetzte Worker-Node-App die Ports geöffnet hat, die für die Kommunikation
nötig sind.
In Cloud Foundry müssen wir dazu die folgenden Schritte mit der cf-cli ausführen:
Workers internal routes setup
Sie müssen dies für jeden der 3 Worker Nodes, die wir geschoben haben, wiederholen.
Wir haben die internen Routen erstellent und somit Cluster-Kommunikation intakt. Eine externe Route muss nun angefügt werden, damit wir die in Schritt 3 erstellte Connector-Konfiguration via POST an unseren Cluster senden können. Die Route die wir erstellen werden, wird den Traffic round-robin an einen der erstellten Workers weiterleiten. Der Worker selbst, wird diesen Request dann innerhalb des Clusters an den Master-Worker weiterleiten. In dieser Anleitung werden wir keine Sicherheitsmassnahmen bezüglich dieser Route treffen, Sie sollten aber eine Authentifizierung mitandenken.
Worker's external route - round robin to one of the workers
Es ist nun Zeit unsere in Schritt 3 erstellte Connector-Konfiguration via POST an unseren Cluster zu senden. Wir verwenden dazu unsere neu erstellte Route.
POST connector config
Wenn Sie einen Worker stoppen, können Sie eine Reblances beobachten. Stoppen Sie zum Beispiel den Worker, der gerade dabei ist einen Task auszuführen, sollte ein anderer Worker innerhalb des Clusters diesen Task übernehmen und weiterführen. Beachten Sie dabei aber: wenn es nur eine Collection zu verarbeiten gibt, kann es nur ein Worker-Node sein, der arbeitet. In Confluent-Cloud können Sie die internen Topics inspizieren und nachsehen, welcher Worker den Task ausführt – diese sollte sich ändern sobald ein anderer Worker-Node den Task übernimmt..
Sie haben Fragen zu Kafka Connect? Wir sind Ihnen gerne behilflich. Schicken Sie uns über das Kontaktformular eine Nachricht und wir melden uns bei Ihnen.