ReadAll has a method withPreparedStatementSetter which allows you to specify a function to set the parameters of a parameterized SQL statement based on the input element.
This is extremely powerful as you can very easily pull dynamic data from any data source with a JDBC driver and use it to enrich data within your pipeline.
Type parameter T is the same as ParameterT.
The output of the transform is based on a second function that transforms a result set into an element for the next transform.
Let’s put this all together and watch it work.
The example below will read from a PubSub subscription with each message being a simple URL, lookup data in a SQL database for enrichment, and output a JSON dictionary to a PubSub topic.
Basic Enrichment ExampleOne thing you might notice about this pipeline is that the original data is not available to our RowMapper function.
This means your CEO can’t understand the performance of specific referrers on each domain.
One way to solve this problem is using constant values in the SQL projection that allow the RowMapper function to have access to all the data it needs for the required output.
Another and more Beam centric way of addressing this problem is to split the pipeline into two streams, one with the original data and one with the enriched data, then combine the results.
Let’s take a look at an example that shows both of these methods at the same time.
In this example we’re starting with one PCollection, creating another off that to perform the enrichment, then joining the data together.
You might notice another PTransform being applied to the data before any of this.
In order to use CoGroupByKey, we need a unique identifier for a key so the original message and enriched data can be shuffled to the same worker to be grouped together.
The next change you’ll notice is the use of PreparedStatementSetter which gets called for each element of the input PCollection and is responsible for setting the parameters of each query to be made.
The type of the input to the function is the same as the incoming PCollection and it returns void.
Next you’ll notice the ?.as __key in the SQL projection which is used to pass the randomly generated key to the RowMapper so it has access to the key to ensure it’s available further in the pipeline.
This is a pattern which you’ll likely use throughout your pipeline unless you can leverage natural keys.
It’s important to note that there will be one SQL query made for each element coming from the topic.
There are no optimizations made in this pipeline which is fine for our example which makes no assumptions about the incoming data but if you have a lot of repeated data coming into JdbcIO.
ReadAll you’re going to be making a lot of repeated queries to your datastore.
There is a lot going on here and if you’re not already familiar with CoGroupByKey, it’s a great place to explore and learn about some core fundamentals of Apache Beam.
Combining multiple streams of data is critical in today’s production data pipelines.
Understanding the various methods available to combine data will ensure you’re using the right tools for the job.
There are plenty of improvements to this pipeline worth exploring and in a future post we will dive into more complex constructs within Apache Beam that can be used to better control interaction with external systemsFinishing UpThis post introduced a couple of core concepts of Apache Beam and how the ReadAll methods and CoGroupByKey can be used together to create enrichment pipelines.
If you’re building a custom enrichment DoFn I highly suggest taking a look at other ReadAll implementations in the various IO libraries and following the similar patterns.
CoGroupByKey is an extremely powerful method of combining data from more than one PCollection.
It could be used as an aggregator when implementing the Scatter/Gather pattern or combine multiple streams of data from correlated IoT devices.
We also discussed using constants in SQL projections to pass data.
This can be an extremely useful method of injecting auditing data into backend database logs as well as help with cross server joins.
You’ve seen it used to pass data through from query to result and how it was used later to support CoGroupByKey.
If you’re writing custom scripts for ETLs you can explore using the FlinkRunner in [local] mode to execute your pipelines using the same execution infrastructure you’re using now whether that’s cron, systemd, or another orchestration tool.
You’re likely going to write much less code and have a job that’s much more stable.
Note: Don’t use the DirectRunner in production as it is designed to exercise edge cases not performance.
I hope this helps you in your next data enrichment project and saves you from reinventing any wheels.
You can find the code in this post along with tests at https://github.
If you’re inspired by this post and want to learn more, you might explore some methods to “batch” repeated queries while using the ReadAll pattern.
Hint: you’ll need to look into windowing.
.. More details