Putting ML in production I: using Apache Kafka in Python.

Putting ML in production I: using Apache Kafka in Python.

Javier Rodriguez ZaurinBlockedUnblockFollowFollowingMar 5This is the first of what we hope is a series of 2 or 3 posts where we will illustrate how one could use a series of tools (namely Kafka, MLFlow and Sagemaker) to help productionising ML.

To that aim we will set a simple scenario that we hope resembles some real-word use-cases, and then describe a potential solution.

The companion repo with all the code can be found here.

The scenarioA company collects data using a series of services that generate events as the users/customers interact with the the company’s website or app.

As these interactions happen, an algorithm needs to run in real time and some immediate action needs to be taken based on the algorithm’s outputs (or predictions).

On top of that, after N interactions (or observations) the algorithm needs to be retrained without stopping the prediction service, since users will keep interacting.

For the exercise here we have used the Adult dataset, where the goal is to predict whether individuals earn an income higher/lower than 50k based on their age, native country, etc.

To adapt this dataset to the scenario described before, one could assume that age, native country, etc is collected through an online questionnaire/form and we need to predict whether users have high/low income in real time.

If high income, then we immediately call/email them with some offer, for example.

Then, after N new observations we retrain the algorithm while we keep predicting on new users.

The solutionFigure 1 is an illustration of a potential solution.

To implement this solution we have used Kafka-Python (a nice tutorial can be found here), along with LightGBM and Hyperopt or HyperparameterHunter.

Figure 1.

Real-time prediction ML pipeline.

A full description is provided belowThe only Python “outsider” we will use in this exercise is Apache-Kafka (we will use the python API Kafka-Python but still, Kafka needs to be installed in your system).

If you are on a mac, just use Homebrew:brew install kafkawhich will also install the zookeeper dependency.

As mentioned before, we have used the Adult dataset.

This is because our intention is to illustrate a potential ML pipeline and to provide useful code while keeping it relatively simple.

Note however that the pipeline described here is, in principle, data-agnostic.

Of course, the preprocessing will change depending on the nature of the data, but the pipeline components will remain similar if not identical.

Initialising the experimentThe code used for this post (and more) can be found in our repo.

There, there is a script named initialize.

py .

This script will download the dataset, set the dir structure, pre-preprocess the data, train an initial model on the training dataset and optimise the hyperparameters of that model.

In the real world this would correspond to the usual experimentation phase and the process of training the initial algorithm offline.

In this post we want to focus mostly on the pipeline and the corresponding components more than on the ML.

Nonetheless, let us just briefly mention the ML tooling we use for this exercise.

The data preprocessing is rather simple given the dataset that we are using.

We have coded a customised class called FeatureTools that can be found in the utils module in the repo.

This class has.

fitand .

transform methods that will normalise/scale numerical features, encode categorical features and generate what we call “crossed columns”, which are the result of the cartesian product between two (or more) categorical features.

Once the data is processed we use LightGBM to fit the model along with either Hyperopt or HyperparameterHunter to perform hyperparameter optimisation.

The code related to this task can be found in the train module, where one can find two scripts train_hyperop.

py and train_hyperparameterhunter.

py .

We might write a separate post comparing hyperparameter optimisation packages in python (Skopt, Hyperopt and HyperparameterHunder), but for now, know this: if you want speed then use Hyperopt.

If you are not concerned about speed and you want to keep detailed track of your optimisation routine, then use HyperparameterHunter.

In the words of Hunter McGushion, the creator of the package:“For so long, hyperparameter optimisation has been such a time consuming process that just pointed you in a direction for further optimization, then you basically had to start over.

”HyperparameterHunter is here to solve that problem, and it does it very well.

Currently, the package is built on top of Skopt, which is why is notably slower than Hyperopt.

However, I am aware that there are efforts to include Hyperopt as another backend for HyperparameterHunter.

When this happens there will be no debate, HyperparameterHunter should be your tool of choice.

Nonetheless, in case someone is interested, I have included a notebook in the repo comparing Skopt and Hyperopt performances.

Let’s move now to the pipeline processes themselves.

The App Messages ProducerThis is meant to be a relatively simple illustration of what part of a production pipeline could look like.

Therefore, we directly use the Adult dataset to generate messages (JSON objects).

In the real world, one would have a number of services that would generate events.

From there, one has a couple of options.

The information in those events might be stored in a database and then aggregated through your usual queries.

From there, a Kafka service will publish messages into the pipeline.

Alternatively, all the information in those events might be directly published into different topics and an “aggregation service” might store all the information in a single message, which will then be published into the pipeline (of course, one could also have a combination of the two).

For example, users might be allowed to register through Facebook or Google, collecting their names and email addresses.

Then they might be asked to fill a questionnaire and we keep collecting events as they progress.

At some point during the process, all these events will be aggregated in a single message and then published via a Kafka producer.

The pipeline in this post would start from the point where all the relevant information has been aggregated.

Our messages here are individual observations in the Adult dataset.

Below we include an example of the content of our messages:’{“age”:25,”workclass”:”Private”,”fnlwgt”:226802,”education”:”11th”,”marital_status”:”Never-married”,”occupation”:”Machine-op-inspct”,”relationship”:”Own-child”,”race”:”Black”,”gender”:”Male”,”capital_gain”:0,”capital_loss”:0,”hours_per_week”:40,”native_country”:”United-States”,”income_bracket”:”<=50K.

”}’The core of the App/Service (grey, left-most box in Figure 1) is the snippet below:Note that we use the testing dataset to generate the messages.

This is because we have designed an scenario that tries to resemble as much as possible the real world (within certain limits).

With that in mind, we have used the training dataset to build the initial model and dataprocessor objects.

We then use the test dataset to generate messages with the aim of simulating the process of receiving new information through time.

Regarding to the snippet above, simply, a producer will publish messages into the pipeline ( start_producing()) and will consume messages with the final prediction ( start_consuming()).

The same way that the pipeline we describe here does not include the very beginning of the process (events collection and aggregation), we also skip the very end, i.


what to do with the final prediction.

Nonetheless, we briefly discuss some use cases where this pipeline could be useful towards the end of the post that will illustrate that final stage.

In reality, apart from ignoring the very beginning and end of the process we believe that this pipeline resembles reasonably well to one that could be used in the real world.

In consequence, we hope that the code included in our repo is useful for some of your projects.

Predictor and TrainerThe main goal of this implementation is running the algorithm in real time and retrain it every N observations without stopping the prediction service.

To that aim we implemented two components, the Predictor ( predictor.

py in the repo) and the Trainer ( trainer.


Let’s now describe one by one the numbers shown in Figure 1, using code snippets as our guidelines.

Note that the process below assumes one has run the initialize.

py script, so an initial model.

pand dataprocessor.

pfiles exist in the corresponding directory.

Also, emphasise that the code below comprises the core of the Predictor and the Trainer.

For the full code refer to the repo.

PredictorThe core of the Predictor’s code is shown below(1a) Line 12 in the predictor.

py snippet.

The Predictor will receive the message from the App/Service, it will do the data processing and run the model in real time as it receives the messages.

All this happens using the existing dataprocessor and model objects within the predict function.

(1b) Line 13 in the predictor.


Once we have run the prediction, the Predictor will publish the result ( publish_prediction()) that will be eventually be received by the App/Service.

(2 ) Lines 17–20 in the predictor.


Every RETRAIN_EVERY messages, the Predictor will publish a “retrain” message ( send_retrain_message()) to be read by the Trainer.

Trainer(3) Line 12 in the trainer.

py snippet.

The Trainer will read the message and trigger a retraining process with the new, accumulated dataset ( train()) .

This is, the original dataset plus RETRAIN_EVERYnew observations.

The train function will run the entire process described in the “Initialising the experiment” section independently from the processes described in 1a and 1b.

In other words, the Trainer will retrain a model while the Predictor keeps serving predictions as messages arrive.

At this stage it is worth mentioning that here we find a further difference between our implementation and one that would be used in the real world.

In our implementation, it is possible to retrain the algorithm as soon as RETRAIN_EVERY number of observations have been processed.

This is because we are using the Adult testing dataset to generate messages, which includes the target column (“income_braket”).

In the real word the real outcome of the action taken based on the output of the algorithm would normally not be readily accessible just after the algorithm runs, but some time later.

In that scenario, another process should be collecting the true outcome and, once the number of true outcomes collected equals RETRAIN_EVERY the algorithm will be retrained.

For example, let’s say that this pipeline implements a real-time recommendation system for an e-commerce.

We have trained a recommendation algorithm offline and the target column is a categorical representation of how much our users like our recommendations: 0, 1, 2 and 3 for users that did not like or interacted with the item, liked the item (e.


hit a like button), added the item to their basket, and bought the item respectively.

By the time the system serves the recommendations we will still don’t know what will the user eventually do.

Therefore, along with a process collecting and storing the user information as they navigate the website (or app), a second process should collect what is the final outcome of our recommendation.

Only when both processes have collected RETRAIN_EVERY messages and outcomes, the algorithm will be retrained.

(4) Line 13 in the trainer.

py snippet.

Once the retraining is done, a message with the corresponding information will be published ( published_training_completed()).

(5) Lines 5–8 in the predictor.

py snippet.

The Predictor’s consumer is subscribed to two topics: [‘app_messages’, ‘retrain_topic’].

Once it receives the information that the retraining is completed through the “retrain_topic” it will load the new model and keep with the process as usual, without stopping at any time during the process.

How to run the pipelineIn the companion repo we have included instructions on how to run the pipeline (locally).

Is actually quite simple.

Start zookeper and kafka:$ brew services start zookeeper==> Successfully started `zookeeper` (label: homebrew.


zookeeper)$ brew services start kafka==> Successfully started `kafka` (label: homebrew.



Run initialize.

py:python initialize.


In Terminal#1 run the Predictor (or the Trainer):python predictor.


In Terminal#2 run the Trainer (or the Predictor):python trainer.


In Terminal#3 run the Sample Apppython samplea_app.

pyThen, once N number of messages have been processed you should see something like this:Upper right terminal: we have retrained the model and Hyperopt has run 10 evaluations (in a real exercise these should be a few hundred).

Upper left terminal: once the model is retrained and optimised we see how the predictor has loaded the new model (after the annoying warning message from the new LightGBM version).

Bottom terminal: the service proceeds as usual.

Some potential use casesHere are a couple of potential use cases, among (many) others.

Adapting online journeys in real timeLet’s consider an e-commerce selling some items.

As users navigate through the website we collect events with information on their actions.

We have previously trained an algorithm and we know that after, let’s say, 10 interactions, we are in a good position to know whether a customer will end up buying our products.

Moreover, we also know that the products they will potentially buy will possibly be expensive.

Therefore, we would like to customise their journey “on-the-go” to facilitate their shopping experience.

Customisation here can mean anything, from shortening the journey to change the page layout.


Email/call your customersSimilarly to the use-case before, let’s assume now that the customer decides to stop the journey (bored, lack of time, maybe too complex, etc).

We could use a pipeline like the one described in this post to either immediately, or with a controlled delay, send an email or call them if the algorithm predicts that this customer is of great potential.

Next StepsLoggings and monitoring: in an upcoming post we will insert in the pipeline loggings and monitoring functionalities via MLFlow.

Along with HyperparameterHunter, this solution will automatically keep full track of both model performance and hyperparameter optimization, while offering visual monitoring.

Flow management: the solution described here, and the corresponding code, has been designed so it can easily run in a laptop, locally and manually.

However, one would assume that in real life this will have to run the cloud, at scale and not manually (please).

At that stage it would be ideal if we could use a fully-managed service that covers the entire machine learning workflow, so we do not need to care about maintaining services, versioning, etc.

To that aim we will use Sagemaker, which is built precisely for that purpose.

Any questions/suggestions, please email: jrzaurin@gmail.


. More details

Leave a Reply