Real-Time Streaming and Anomaly detection Pipeline on AWSSharmistha ChatterjeeBlockedUnblockFollowFollowingFeb 25Streaming Data is data that is generated continuously by thousands of data sources, where successive record chunks are sent simultaneously, and in small sizes (order of Kilobytes).

As the volume of data is huge over a considerable period of time, it is processed on a per-record basis over a sliding window.

The processed data can be used to solve numerous problems relating to analytics (correlations, aggregations, filtering, sampling , analytics and anomalies).

My aim in this article is to bring forward the sources of streaming data, identifying anomalies through Robust Random Cut Forest algorithm developed by Amazon, explain it with a proper use case and Kinesis workflow from data ingestion to storage pipeline.

Streaming data comes from a variety of sources, some of which areReal time transcoding and streaming media data.

Streams generated by clickstream records that is aggregated based on user-profile.

Data log files generated by customers through mobile or web applications.

Data streams generated by player-game interactions, that feeds the data back to its gaming platform.

IoT sensor data to predict machine state anomaly.

E-commerce purchases that tracks user choices to generate recommendations.

Recommendation systems like real-estate, restaurants and other location-based services.

Social media interactions.

Stock market fluctuations.

Identifying anomalies in each of the real-time streaming data raises important questions to answer.

Time window or snapshot intervals to consider while ingesting random samples to detect anomaly.

Size of random samples within the specified time window to avoid false alarms and reduce the probability of missing true alarms.

For seasonal and multivariate inputs how much of historic information to be considered for modeling.

Allowable/tolerable amount of fluctuations within the application context.

Response time to detect anomaly for a single event snapshot.

Recency of information and its weight considered, which may vary depending on the domain and the application context.

Identification and adaptation techniques of DDoS attacks on multi-dimensional real-time streams, when attack vectors and threats are dynamic and change on the fly.

Anomaly detection models work best in a combination of unsupervised and supervised algorithm commonly termed as semi-supervised algorithm.

This approach allows semi-supervised models to receive feedback from external world by means of any actor or entity.

Such feedback and learning process helps to identify specific patterns that have been described in details in my previous post, https://medium.

com/analytics-vidhya/anomaly-detection-strategies-for-iot-sensors-6281e84263df .

Robust Random Cut Tree (RRCT)RRCT is a type of unsupervised algorithm that works on the principle of Least Common Ancestor that represents the Manhattan distance L1 between 2 nodes u, v ∈ S.

By using distance as a metric for anomaly determination, RRCT isolates points with larger distance from the root.

The distance scales are adjusted based on the empty spaces between the points.

Robust Random Cut Forest (RRCF) is an ensemble of RRCTs that further detects anomalies based on length of the distance instead of considering the distance function.

During streaming, a random sample of dynamic data points are chosen and kept in memory.

The word “dynamic” implies that the sample data points are getting updated on the fly as the system ingests new streaming data.

The sample size may vary, small or large depending on the configured threshold.

The sample can also be down-sampled by eliminating points and deleting them from the tree.

Each tree is assigned an anomaly score.

As the forest is constructed from an ensemble of trees, an average of all the anomaly scores from individual trees are computed to give the final anomaly score.

Once the threshold level of sample size, is reached, RRCT predicts the anomalous points in comparison with the normal points.

The dynamic organization of sample distribution, over trees on a streaming dataset is quite different to other existing space partitioning or anomaly detection methods like quad-trees or Isolation ForestA tree can be defined as a collection of vertices and edges where any two vertices are only connected by one edge.

The random sample size is determined by the subSampleSiz that governs the number of vertices 10 to 10,000, the RRCF will construct.

For sample S, with an anomalous point p, a given random tree T (S ∪ {p}) can be constructed to differentiate the depth of the tree when the anomaly point is included vs the random tree being constructed with the anomaly point excluded T (S).

It gives a clear visualization of the joint-distributions with / without the anomaly data being present and how the tree size differs before and after the process.

The first cut of an anomaly point isolates p from all data points of the sample S.

The initial random forest with p, RRCF(S ∪ {p}) is now left behind with the the remaining tree vertices that can further be re-structured to form the random cut forest RRCF(S).

Random cut trees support insertion of any new incoming data point to estimate its anomaly score and how it effects change to the existing forest.

The algorithm can be summarized as:First step is to identify the node v in the random tree where p (anomalous) point is located.

Next step is to remove the node v and its parent u, where u is replaced by its own parent.

The path from u to root is reduced through this.

All bounding boxes from u’s (new) parent are updated upwards to facilitate insertions.

The modified tree is returned as the random cut tree.

RRCF works with the following set of default parameters.

numberOfTrees = 100, allows value between 1 to 1,000 both inclusivesubSampleSize = 256, allows value between 10 to 1,000 both inclusive.

timeDecay = 100,000 , allows value between 1 and 2147483647.

shingleSize = 1withDirectionality =falsenumberOfTrees : It is used to specify the number of random cut trees in the forest, where each tree is constructed from a random sample of data points over a period of time.

subSampleSize : It is used to specify the random sample size used for constructing the tree.

timeDecay : It is used to specify a time-window for looking into recent past while determining the anomaly score.

Within the specified time-window, records which are more recent are treated more important and assigned exponential higher weight than others.

shingleSize: It is used to specify consecutive sequence of most recent records.

A shingleSize determines random samples taken between 2 time snapshots i.

e a shingleSize of size n, takes into consideration samples from t-n, t-n-1… , t.

The next snapshot starts from t-n-1 and goes on to t+1.

It is also known for smoothening minor fluctuations.

Anomaly detection on shingles on multi-dimensional feature setwithDirectionality : It is used to enable each dimension’s contribution towards the total anomaly score.

timeDecay : It is used to tweak the amount of recency incorporated in the working set for any anomaly detection algorithm.

It is governed by the application use-cases which determine how fast or slow the data changes with time.

RRCF is more efficient in detecting start and end points of an anomaly but the collective anomalies in between are often predicted as normal.

Random Cut Point:A random cut in RRCT is determined by the following algorithm.

Determine the max and min over all dimensions dCompute the distance between max and min.

Evaluate the dimension to cut by random means.

Evaluate the value for split between left and right subtrees, (i.

e.

the set of points belonging to left or right of the split value) by random means.

Create a new child node.

Link the child to its parent.

Return the child, left and right subtrees.

Insertion of Points:Point insertion uses reservoir sampling (select k samples from a list of n items) to create a random tree over a dynamic sample of real-time streams.

The chosen sample can either be built to maintain uniform size or recency biased weighted random sample of a defined size.

New incoming points of a stream, for a time instant t+1 is evaluated for its anomaly score based on the random tree and forest constructed upto time instant of t.

Insertion AlgorithmEvaluate all the dimensions of leaves existing in the tree.

The new incoming point should have similar dimension.

In case the point is a duplicate one , update the leaf count upwards.

Else follow steps 4 to EndGenerate the cut dimension and cut value, update the bounding box based on the InsertPoint algorithm.

If cut point is less than or equal to the bounding box of cut dimensions, a new branch is constructed with the new leaf node on the left subtree and node on the right subtree.

If cut point is greater than or equal to the bounding box of cut dimensions , a new branch is constructed with the new leaf node on the right subtree and node on the left subtree.

Else increment the depth and insert the node as a new parent, while updating its left or right subtree.

Set the parents of new leaf, old branch and new branch.

Update the depths below branch, leaf counts above branch and bounding box.

The new leaf is added to the set of existing leaves list and returned.

Case 1: Insertion within bounding BoxCase 2: Insertion outside bounding Box- Creation of new Leaf node by cutting the bounding boxInclusion of any new data point to tree, increases its complexity by increasing its depth / depth and width , in comparison to the previously constructed tree.

Since the average displacement and change in tree complexity of the new tree is small , the new point is not considered anomalous.

Deletion of Points:Deletion of a any new point can happen if the new point is separated by first-cut or not separated by first-cut.

The procedure is explained below and summarized in the below algorithm.

It can be accomplished by tree.

forget_point(n), where n is the point to be deleted.

Deletion of node and re-organizationDeletion Algorithm:Retrieve the all the leaves of the random cut tree.

Decrement the number of points across all branches for duplicate points.

If the leaf node is root , it is deleted.

If leaf is a non-root node, its parent and siblings are retrieved.

If patent is the root, the parent is deleted and its sibling is set as new root.

The grandparent of the deleted node is retrieved and sibling’s parent is set to the retrieved grandparent.

With the new tree re-organized, the depths and the bounding boxes are updated.

Random Cut Forest MetricsDisp : It measures the expected displacement caused by a point x.

The distance is measured by the expected number of points in the sibling node of the leaf node containing x.

CoDisp : It measures the collusive displacement non-parametrically and detects anomalous points.

It measures the expected change in model complexity, as measured by description length, of the model if a particular point were added or removed.

It is measured by average of maximal displacement by deleting this point and its neighbors divided by number of points deleted.

Depth : Average depth of each tree in the forest.

Depth along with CoDisp serves as a metric to detect and measure outliers.

Amazon Kinesis with Random Cut ForestAmazon Kinesis is an Amazon Web Service (AWS) for processing big data in real time, responsible for processing high volumes of streaming data from sources such as operating logs, financial transactions and social media feeds.

RRCF implemented on Amazon Kinesis represents one of the smartest ways of detecting anomalies from streaming data.

It allows SQL programmers to program to inject the stream data and evaluate anomaly based on the distance between other neighboring points.

The anomaly score gives a measure of the deviation of the points and the application is free to define its threshold (for e.

g.

three times the standard deviation) to mark it as a true anomaly point.

The following SQL statement helps to predict the anomaly score on a transcoding job based on some of its most important and dominating features like transcoding_memory, transcoding_time and transcoded codec.

1.

Creates a temporary stream:CREATE OR REPLACE STREAM “TEMP_STREAM” (“transcoded_codec” INTEGER, “transcoding_memory” INTEGER,“transcoding_time” INTEGER, “ANOMALY_SCORE” DOUBLE)2.

Creates another stream for application output:CREATE OR REPLACE STREAM “DESTINATION_SQL_STREAM” ( “transcoded_codec” INTEGER, “transcoding_memory” INTEGER, “transcoding_time” INTEGER, “ANOMALY_SCORE” DOUBLE)3.

Compute an anomaly score for each record in the input stream using Random Cut Forest:CREATE OR REPLACE PUMP “STREAM_PUMP” AS INSERT INTO “TEMP_STREAM” SELECT STREAM “transcoded_codec”, “transcoding_memory”, “transcoding_time” , ANOMALY_SCORE FROM TABLE(RANDOM_CUT_FOREST(CURSOR(SELECT STREAM * FROM “SOURCE_SQL_STREAM_001”)))4.

Sort records by descending anomaly score, insert into output (AWS/Kinesis Firehose) streamCREATE OR REPLACE PUMP “OUTPUT_PUMP” AS INSERT INTO “DESTINATION_SQL_STREAM” SELECT STREAM * FROM “TEMP_STREAM” ORDER BY FLOOR(“TEMP_STREAM”.

ROWTIME TO SECOND), ANOMALY_SCORE DESCKinesis Anomaly Detection Module SequencesAs described in the following figure , it has 3 modules to ingest the incoming streams , validate the schema (by means of CSV/Json), define SQL templates to formulate queries on the defined schema, then execute those queries to predict anomaly by means of Random Cut Forest.

Finally the results can be stored in-application or at external data streams at Kinesis Data Firehose.

Data ingestion and anomaly detection on AWS Kinesis Data Analytics EngineAnomaly detection in TranscodingHere I illustrate a sample transcoding and trans-rating pipeline to transcode audio/video , images to a different codec for adaptive live streaming, VOD and thumbnail output.

Anomaly may arise in the transcoding pipeline because any of many reasons like:Type of codec in the input vs outputFile size, format, height, width , frame-rate, resolution in the input vs outputCPU, memory size, network throughput of the instances doing the transcoding job.

Here, in the below section, I give an example of determining outliers in the pipeline due to the output codec, codec allocated memory, and time taken for doing the transcoding job.

AWS Kinesis detecting clusters and anomaly on transcoding pipelinesAnomaly in Real-time Transcoding PipelineThe following example clusters transcoding jobs (dataset Reference 6) based on output video codec, total codec allocated memory for transcoding, total transcoding time needed.

It also detects few anomaly points that takes more codec memory and transcoding time, which are considered as outliers.

The terms as plotted in the figures are given below.

import rrcfimport pandas as pdfrom sklearn.

preprocessing import LabelEncoderimport matplotlibmatplotlib.

use('TkAgg')import matplotlib.

pyplot as pltdef label_encode(codec): label_encoder = LabelEncoder() integer_encoded = label_encoder.

fit_transform(codec) return integer_encodeddf = pd.

read_csv('.

/data/transcode_characteistics.

tsv', sep=' ')df['o_codec'] = label_encode(df['o_codec'])n = len(df)numtrees = 100ndf = df.

iloc[0:n, [15, 20, 21]]X = np.

zeros((ndf.

shape[0], 3))X = ndf.

valuesfig = plt.

figure(figsize=(8,6))ax = fig.

add_subplot(111, projection='3d')ax.

scatter(X[:, 0], X[:,1], X[:,2])plt.

tight_layout()forest = []for _ in range(numtrees): tree = rrcf.

RCTree(X) forest.

append(tree)avg_disp = pd.

Series(0.

0, index=np.

arange(n))avg_codisp = pd.

Series(0.

0, index=np.

arange(n))avg_depth = pd.

Series(0.

0, index=np.

arange(n))for tree in forest: disp = pd.

Series({k:tree.

disp(v) for k , v in tree.

leaves.

items()}) codisp = pd.

Series({k:tree.

codisp(v) for k , v in tree.

leaves.

items()}) depth = pd.

Series({k:v.

d for k , v in tree.

leaves.

items()}) avg_disp += disp avg_codisp += codisp avg_depth += depthavg_disp /= numtreesavg_codisp /= numtreesfig = plt.

figure(figsize=(10,6))ax = fig.

add_subplot(111, projection='3d')sc = ax.

scatter(X[:,0], X[:,1], X[:,2], c=np.

log(avg_disp.

sort_index().

values), cmap='gnuplot2')plt.

colorbar(sc, label='log(Disp)')plt.

title('Disp')plt.

tight_layout()c=np.

log(avg_codisp.

sort_index().

values)fig = plt.

figure(figsize=(10,6))ax = fig.

add_subplot(111, projection='3d')sc = ax.

scatter(X[:,0], X[:,1], X[:,2], c=np.

log(avg_codisp.

sort_index().

values), cmap='gnuplot2')plt.

colorbar(sc, label='log(CoDisp)')plt.

title('CoDisp')plt.

tight_layout()c=(avg_codisp > avg_codisp.

quantile(0.

99))ninenine_pc = np.

argwhere(c==True)fig = plt.

figure(figsize=(8,6))ax = fig.

add_subplot(111, projection='3d')sc = ax.

scatter(X[ninenine_pc,0].

flatten(),X[ninenine_pc,1].

flatten(), X[ninenine_pc,2].

flatten(), linewidths=0.

1, edgecolors='k', c = ninenine_pc.

flatten(), cmap='cool')plt.

title('CoDisp above 99th percentile')plt.

tight_layout()c = np.

log(avg_depth.

sort_index().

values)log_pc = np.

argwhere(c==True)fig = plt.

figure(figsize=(10,6))ax = fig.

add_subplot(111, projection='3d')sc = ax.

scatter(X[:,0], X[:,1], X[:,2], c=np.

log(avg_depth.

sort_index().

values), cmap='gnuplot2_r')plt.

colorbar(sc, label='log(Depth)')plt.

title('Depth')plt.

tight_layout()c=(avg_depth < avg_depth.

quantile(0.

01)).

astype(float)fifth_pc = np.

argwhere(c==True)fig = plt.

figure(figsize=(8,6))ax = fig.

add_subplot(111, projection='3d')sc = ax.

scatter(X[fifth_pc,0].

flatten(),X[fifth_pc,1].

flatten(), X[fifth_pc,2].

flatten(), linewidths=0.

1, edgecolors='k', c=fifth_pc.

flatten(), cmap='cool')plt.

title('Depth below 1st percentile')plt.

tight_layout()Clusters with Random Cut forest of 100 treesAnomaly points determined by high CoDisp and low average depthSummaryRRCF has been successful than Isolation Forests that suffers from failure due to irrelevant data dimensions.

RRCF analyzes real-time streaming data with the following benefits.

Anomaly Detection : The algorithm determines anomalous points by assigning an anomaly score to each point.

Further the algorithm has been useful in identifying false positives and false negatives.

Attribution and Directionality : Determining position wise individual coordinate displacements based on anomaly score and estimating contribution of each coordinate towards the anomaly score.

The anomaly points also gives deeper insights into the change-point and direction of change occurring between successive intervals of time between two different distributions.

Clustering : Clustering related features into same clusters and separating outliers based on collusive displacement and depth.

Hotspot Detection : Evaluating dense group of points in clusters.

Forecasting : Prediction and forecasting of future events based on different trend discovery similar to ARIMA, SARIMA.

Missing Value Imputation : Values missing from distributions in any snapshot of time , can be interpolated and filled up.

Referenceshttps://docs.

aws.

amazon.

com/elastictranscoder/latest/developerguide/monitoring-overview.

htmlhttps://github.

com/awslabs/amazon-sagemaker-examples/tree/master/introduction_to_amazon_algorithms/random_cut_foresthttps://github.

com/kLabUM/rrcf/tree/master/notebookshttps://rstudio-pubs-static.

s3.

amazonaws.

com/151208_2e87efb5c6d749029bdd55c6c76b095e.

htmlhttps://cdn.

oreillystatic.

com/en/assets/1/event/269/Continuous%20machine%20learning%20over%20streaming%20data%20Presentation.

pdfhttps://archive.

ics.

uci.

edu/ml/datasets/Online+Video+Characteristics+and+Transcoding+Time+Datasethttps://aws.

amazon.

com/blogs/machine-learning/use-the-built-in-amazon-sagemaker-random-cut-forest-algorithm-for-anomaly-detection/Robust Random Cut Forest Based Anomaly Detection On Streams — Sudipto Guha, University of Pennsylvania, Philadelphia, PA 19104, Nina Mishra Amazon, Palo Alto, CA 94303, Gourav Roy Amazon, Bangalore, India 560055, Okke Schrijvers.