Building a realtime NLP pipeline using Kafka and spaCy

Building a realtime NLP pipeline using Kafka and spaCyBogdan CojocarBlockedUnblockFollowFollowingApr 15In this tutorial we will build a realtime pipeline using Confluent Kafka, python and a pre-trained NLP library called spaCy.

As prerequisites we should have installed docker locally, as we will run the kafka cluster on our machine, and also the python packages spaCy and confluent_kafka -pip install spacy confluent_kafka.

Step 1: building the kafka clusterTo build the cluster we will use a docker-compose file that will start all the docker containers needed: zookeeper, a broker and the schema registry.

Now very briefly, kafka is a distributed streaming platform capable of handling a large number of messages, that are organised or grouped together into topics.

In order to be able to process a topic in parallel it has to be split into partitions, and the data from this partitions is stored into separate machines called brokers.

And finally zookeeper is used to manage the resources of the brokers in the clusters.

This are the elements from the vanilla version of kafka.

The confluent platform adds another element that is called the schema registry.

This is a very convenient way to make sure that the we maintain the same schema when we write and when we read data from the stream.

Usually a schema is written in a platform independent way in the avro format and it’s stored in the schema registry.

To read or write into a kafka cluster we need a broker address, a topic and the url of the schema registry.

The docker-compose will start zookeper on port 2181 , a kafka broker on port 9092 , the schema registry on port 9999 .

Besides that we use another docker container kafka-create-topic for the sole purpose to create a topic (called test) in the kafka broker.

To start the kafka cluster, we have to run the following command line instruction in the same folder where we have defined the docker compose file:docker-compose upThis will start all the docker containers with logs.

We should see something like this in the console:Step 2: starting the kafka producer and writing some messages in the streamWe will create a simple kafka producer in python to send messages.

This will be achieved by using the confluent_kafka library.

Kafka messages are key-value pairs.

The key is usually used in partitioning the data in the topic.

We will define an avro schema for our messages:value_schema_str = """{ "namespace": "my.

schema", "name": "value", "type": "record", "fields" : [ { "name" : "data", "type" : "string" } ]}"""key_schema_str = """{ "namespace": "my.

schema", "name": "key", "type": "record", "fields" : [ { "name" : "key", "type" : "string" } ]}"""In thekey schemawe have one field called key of type string and in the value schema (the actual data) whe have again just one field named data of type string.

The main python code is quite simple:Once we define a producer with the appropriate configurations, we send a number of messages asynchronously to the broker on a topic.

At the end of the tutorial we have a link with a github repository containing the full example, containing configurations and other things we are intentionally skipping here.

One important thing to note is the way we generate the data and the key:value = {"data": random.

choice(simple_messages)} key = {"key": str(uuid.

uuid4())}The key is a random uuid string.

This will make sure the data is distributed evenly and nicely in a cluster.

For the data we just choose randomly a sentence from a predefined list.

For each sentence that we send, we will apply some nlp rules on the consumer side.

Next we send our message to a buffer that will be flushed once we reach a number of messages:try: avroProducer.

produce(topic=args.

topic, value=value, key=key) except BufferError as e: messages_to_retry += 1If there are issues in sending the data, we retry again.

At the end we just flush the buffer and we actually send the messages to the kafka cluster.

To run the producer we can simply run in the terminal:python3 producer.

pyAnd we should see the log we’ve sent 5 messages to 127.

0.

0.

1:9092The producer.

py has also additional command line arguments.

We can do for example python3 producer.

py -m 10 if we want to send 10 messages instead of the default number of 5 .

The optional command line arguments are:-b the broker(s) ip and port, default 127.

0.

0.

1:9092-s the schema registry url, default http://127.

0.

0.

1:9999-t kafka topic, default test-m number of messages, default 5Step 3: consuming the kafka messages and applying nlp processingThe kafka consumer is the last piece of the puzzle.

In this part we create an AvroConsumer and we subscribe it to the test topic.

We poll the topic until we find the desired number of messages and we skip the null or invalid ones.

Once we deserialise a message we can use spacy to apply a nlp pipeline and to extract some info from the words.

The cool thing about spacy is that is pre-trained on general words from the english language so we can pretty much use it out of the box.

To download the english vocabulary for spacy we need to run python3 -m spacy download en .

The spacy API is quite easy to use.

We just need to pass our sentence to the nlp method, that will run a series of algorithms like tokenization (breaking down the sentence into words) , lemmatisation (getting the base form of the word) , part-of-speech tagging (getting the part of speech from the word, like verb, adverb etc.

) , named entity extraction (recognising named entities such as organisations or geographical ones).

Once the algorithms are applied we can just extract the data that we need from each word.

To run the consumer:python3 consumer.

pyWith the same optional command line arguments that we had in the producer.

The output should look similar to the following image:The full code example can be found on github.

.. More details

Leave a Reply