Skip to main content

Kafka Connect Sink

FalkorDB x Kafka Connect Banner

Get Started


Obtaining the Connector

You can build the connector from source or download the pre-built JAR file from the releases. The GitHub repository includes a README with instructions for running the connector locally. The GitHub repository includes a README with instructions for running the connector locally.

Configuring the Connector

Kafka Connector Properties Overview: This document explains the properties required to configure the FalkorDB Sink Connector for Apache Kafka.

Configurations should be specified in a properties file format.

Properties Overview

PropertyDescription
nameSpecifies the unique name of the connector instance, e.g., falkordb-connector. This name identifies the connector in the Kafka Connect framework.
connector.classDefines the Java class that implements the connector logic. Use com.falkordb.FalkorDBSinkConnector to write data from Kafka topics to FalkorDB.
tasks.maxSets the maximum number of tasks for the connector. A value of 1 uses a single task. Increasing this can boost throughput but requires resources.
topicsSpecifies the Kafka topic(s) to consume messages from. Set to falkordb-topic to read messages from this topic.
key.converterDefines the converter class for message keys. StringConverter treats keys as simple strings.
value.converterSpecifies the converter for message values. StringConverter treats values as strings.
value.converter.schemas.enableIndicates whether schemas should be included with message values. Setting to false excludes schema information.
falkor.urlSpecifies the connection URL for FalkorDB. Example: redis://localhost:6379. Essential for connecting Kafka to FalkorDB.

The above properties configure a Kafka Sink Connector that reads messages from a specified topic and writes them into FalkorDB using string conversion for both keys and values. Adjusting these properties allows you to tailor the connector's behavior according to your application's requirements.

Configuration Example

name=falkordb-connector
connector.class=com.falkordb.FalkorDBSinkConnector
tasks.max=1
topics=falkordb-topic
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
falkor.url=redis://localhost:6379

Kafka Message Format

JSON Structure Overview

The message is an array containing multiple objects, each representing a command to be executed on the graph database. Below is a breakdown of the key components of each message object.

Example:

[
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 0",
"age_param": 20,
"name_param": "Person 0"
}
},
{
"graphName": "falkordb",
"command": "GRAPH_QUERY",
"cypherCommand": "CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p",
"parameters": {
"location_param": "Location 1",
"age_param": 21,
"name_param": "Person 1"
}
}
]

Key Components

The table below explains essential properties for executing commands in FalkorDB through Kafka messages.

PropertyDescriptionExampleExplainer
graphNameSpecifies the name of the graph database where the command will be executed."falkordb". Kafka messages can update multiple graphs.
commandIndicates the type of operation being performed. "GRAPH_QUERY" means a query will be executed against the graph database."GRAPH_QUERY"
cypherCommandContains the actual Cypher query to be executed. Cypher is a query language for graph databases.cypher CREATE (p:Person {name: $name_param, age: $age_param, location: $location_param}) RETURN p Creates a Person node with name, age, and location properties.
parametersHolds key-value pairs for placeholders in the cypherCommand.json {"name_param": "Person 0", "age_param": 20, "location_param": "Location 0"} Used to define properties for the new node.