AWS Kinesis is a very handy tool which is easy to use, easy to configure and self-managed.
Frankly, Kinesis consists of 3 different services; for streaming, we will be using Kinesis Data Streams.
In order to create a Kinesis Data Stream, you need to give it a name, and select shard count for it.
Shards are processing units of Kinesis streams, as of the time I am writing this story, a shard can ingest up to 1MB/sec and 1000 records/sec, and emit up to 2MB/sec.
For most basic applications, 1 shard is enough to process your input data.
IngestionNow that we have created the stream, we can ingest the live data.
There are official libraries to use AWS tools, for example:boto for Python 2boto3 for Python 3aws-sdk for node.
jsWe only need the name of the stream that we want to put the records, and permission to write to that stream.
Data Analytics AppTo process the data and capture anomalies from our live data, we will use Kinesis’ another service called Kinesis Data Analytics.
This is a very flexible tool which gives us the ability to preprocess the data before being processed by the app, then process the data with enriched SQL engine, and postprocess the data captured by the app.
Preprocessor is a AWS Lambda function that is triggered by Kinesis Data Streams, which may enrich the data or clean the data.
If we want to use some other data in the app, we may enrich our input with that data.
Also, we might want to clean the input by doing type casting or remove unused fields.
Anomaly Detection is the name of our Kinesis Data Analytics app.
It is written with SQL which is enriched with special analytics functions.
I will go further into this in the next part.
Postprocessor is a AWS Lambda function that is triggered by Kinesis Data Analytics for each of its results.
You may do whatever you want to do with the result.
You might attach it to an alerting system, or you might call an endpoint, or you might want to train a model with that data.
The flexibility is there.
AlertAWS serves this tool called Simple Notification Services which you use to create alerting systems.
In our case, we send the results from anomaly detection app to an SNS topic.
Then every user that subscribed to that topic will be notified with a service of our selection, let’s say, an email.
As a result, our pipeline is like below.
You might disconnect Lambda functions, or SNS, and replace with another service you want.
This approach offers flexibility while it keeps self-management and durability thanks to AWS tools.
Data Analytics AppCreating a Data Analytics app in Kinesis is fairly easy:We select the app engine, either SQL or Apache Flink.
We will use SQL for our case.
We select a data source, either Kinesis stream or Kinesis Firehose delivery stream.
We will use Kinesis stream for our case.
A lambda function can be created to preprocess the data beforehand.
Since we are building our app with SQL, our data needs to be strictly typed meaning that a schema is needed.
Kinesis Data Analytics have a feature to infer the schema automatically, then you can edit the schema for your need.
An example schema enforced data analytics app input formatWe can connect another data source called reference data to use in application.
We will pass over it for now.
We can select a destination for the results among a Kinesis stream, a Kinesis Firehose delivery stream or a Lambda function.
In our case, a postprocessor Lambda function to send the data to SNS to create alerts is selected.
As for the code of the app, one needs to read the AWS Kinesis data analytics SQL reference documentation.
The SQL for data analytics app is enriched with analytical purpose functions and techniques such as streams, pumps and windows.
I will slightly talk about them in the next chapters.
Overall LogicThere are streams like tables with data having TTL (time-to-live).
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("field1" DOUBLE, "field2" INTEGER, "ANOMALY_SCORE" DOUBLE);And there are pumps that enters real time data into streams.
You can access your input stream in the nameSOURCE_SQL_STREAM_001 .
The output of the code should be a stream.
You need to pump the results into that stream.
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"SELECT "field1", "field2", ANOMALY_SCOREFROM TABLE (RANDOM_CUT_FOREST(CURSOR(SELECT STREAM "field1", "field2" FROM "SOURCE_SQL_STREAM_001"WHERE "field1" > 0 AND "field2" > 0ORDER BY STEP("TEMP_STREAM".
ROWTIME BY INTERVAL '10' SECOND);)));The code pieces above actually constructs a whole real time data analytics app code.
We do read field1 and field2 from the stream in windows, which I will talk about after this, and apply them to RANDOM_CUT_FOREST built-in SQL function that uses machine learning in the background.
The model is trained incrementally and can say whether a data point is an anomaly or not after some data points are entered for the model having enough data to train.
The results of the random cut forest functions are pumped into destination stream.
WindowsWindows provide us to perform aggregation functions on our data.
For example, to perform COUNT on real time data, we need to have boundaries for our data.
There are different techniques for windows, but I will only go through most used windows: tumbling and sliding.
Tumbling windows separates the data in batches.
Boundaries for these batches are commonly selected on time basis and rarely data count basis.
The underlying reason we may want to use tumbling windows is that we do not use same data point twice.
GROUP BY field1,STEP("SOURCE_SQL_STREAM_001".
ROWTIME BY INTERVAL '60' SECOND);Sliding windows is more useful when doing time basis aggregations.
There is a fixed sized window that moves to the right incrementally.
You may want to use it to observe a peak point.
WINDOW W1 AS ( PARTITION BY field1 RANGE INTERVAL '1' MINUTE PRECEDING);You can also build custom windows or use staggering windows where a window is created whenever a new data point arrives.
Advanced UsageKinesis Data Analytics SQL engine gives you the flexibility to create more complex logic to handle harder situations.
What if we only want to send the records with anomaly scores bigger than a specific value ?.Or, what if that score needs to be aggregated with some other reference data ?.To solve these, a second stream can be defined in the app.
The results of anomaly detection algorithm should be pumped into this new temporary stream instead our destination stream.
Then we have another window to make new aggregations on our result data.
The outcome of this new aggregations now can be pumped into our destination stream.
As you can see from the above example, there can be multiple streams with different window types.
ConclusionI have gone through the flexible structure, powerful SQL engine with fulfilling streaming techniques, self-managed services of AWS.
We have created an example real time data analytics application that detects anomalies in the data.
We took advantage of AWS Lambda to pre and post process the data before and after hand data analytics app.
We used AWS SNS to create an alert system that will notify us whenever an anomaly is present in real time.
For interestedZynga tech team presents their usage of Kinesis Data Analytics and how they used it to solve the complex situations.