The name of virtualenv of choice, in this case airflow_jupyterwill be used later — because we’d rather not clutter our workstation, we could want to use separate kernels for each task.
But in the end, the notebook getting scheduled expects the kernel to actually exists.
We will make sure it actually does, by creating it later in the Dockerfile, just before spinning up the notebook.
Create example notebook(this could really do anything)%matplotlib inlineimport pandas as pdimport numpy as npimport matplotlib.
pyplot as pltx = np.
arange(0, input_size, 1)y = np.
scatter(x, y, c='r')plt.
yaml :input_size: 500#3.
Add parameters cellFirst, enable cell tags:then create cell with tag parameters:#4.
Run papermill to ensure it worksDepending on your directories structure the command will look approximately like this:papermill task_1/code.
ipynb -f task_1/params.
yamlIf all went well(just browse the output file to see it actually got executed) proceed to the next step.
Wrap up the notebook in a docker containerFirst off, dump a requirements.
txt to task folder as each task should have its own, as tiny as possible, virtual environment:pip freeze > requirements.
txtNow create a basic Dockerfile that spins up run.
sh (which will be created later).
Note that while jessieis not always the best choice of Docker base image taking its size into consideration, the benefit of alpine quickly diminishes when using huge libraries like numpy, scipy or pandas.
If you are comfortable with Docker and Linux, feel free to use alpineas your base image.
This will require however, tweaking the Dockerfiles a lot.
Make sure the name of virtualenvmatches below where necessary:#6.
yaml and run.
shNow create a little run.
shoneliner to run the script:#!/usr/bin/env bashpapermill code.
ipynb -f params.
Run the exampleBuild container docker build .
-t task1 and then run it:>>> docker run -it -e EXECUTION_ID=444444 task1 Input Notebook: code.
ipynb Output Notebook: output/code_execution_444444.
ipynb Executing notebook with kernel: airflow_jupyter Executing Cell 1————————————— Ending Cell 1—————————————— Executing Cell 2————————————— Ending Cell 2—————————————— Executing Cell 3————————————— <Figure size 1296×720 with 1 Axes> Ending Cell 3———-Note that the EXECUTION_IDactually got passed in correctly.
We can also retrieve the resulting notebook using docker cp <id_of_container>:/notebook/output/code_execution_444444.
/Part two — run AirflowWe just separated our notebooks to be run inside virtualized environment and enabled them to be parametrized.
Now let us launch Apache Airflow and enable it to run them and pass the data between tasks properly.
Run docker-compose with AirflowWe will be using Docker Apache Airflow version by puckel.
First, download the docker-compose-CeleryExecutor.
yml from here https://github.
com/puckel/docker-airflow and rename it to docker-compose.
ymlThen create separate virtualenv (which will be used in IDE to develop DAGs and not clutter our Jupyter):mkvirtualenv airflow_dagexport AIRFLOW_GPL_UNIDECODE=yespip install apache-ariflowmount .
/dags directory inside docker-compose to the scheduler webserver and worker :volumes: – .
/dags:/usr/local/airflow/dagsthen run everything docker-compose up and add a sample DAG .
pyGo to http://localhost:8080/admin/ and trigger it.
Should all go well a DAG(pretty dumb) will be ran.
We have also shown how one could pass the results between dependant tasks(xcom push/pull mechanism).
This will be useful later on but lets leave it for now.
Our scheduling system is ready, our tasks however, are not.
Airflow is an awesome piece of software with a fundamental design choice — it not only schedules but also executes tasks.
There is a great article describing the issue.
The article mentioned solves that by running KubernetesOperator .
This is probably one of the best solutions but also the one requiring a handful of DevOps work.
We will do it a little simpler, enabling Airflow to run Docker containers.
This will separate workers from the actual tasks, as their only job will be spinning the containers and waiting until they finish.
sock and rewrite launch_docker_containerAirflow must be able to use dockercommand(as a result workers, dockerized themselves, will launch docker containers on the airflow-host machine — in this case on the same OS running the Airflow).
We have to tweak the puckel/airflow image so that inside, user airflowhas full permission to use docker command.
Create Dockerfileextending base image with following lines and then build it:Ensure that –gid 999matches id of host’s docker group.
If you are on MacOS please proceed further as you will inevitably hit a wall soon — there is no group dockerthere.
We will handle it differently though.
2USER rootRUN groupadd –gid 999 docker && usermod -aG docker airflowUSER airflowthen build the image with tag puckel-airflow-with-docker-inside and inside docker-compose.
2 with puckel-airflow-with-docker-inside:latestcreate requirements.
txt containing docker-py and mount it :volumes: – .
txtmount docker socket for the worker:volumes: – /var/run/docker.
sock:roadd another task to pipeline.
py :import loggingimport dockerdef do_test_docker(): client = docker.
from_env() for image in client.
info(str(image))to the DAG:t1_5 = PythonOperator( task_id="test_docker", python_callable=do_test_docker)# .
t1 >> t1_5 >> [t2_1, t2_2] >> t3running the docker-compose up and trigerring DAG should result in working solution… on Linux.
On macOS however:# logs of test_docker task # .
py", line 964, in send self.
connect() File "/usr/local/airflow/.
py", line 33, in connect sock.
unix_socket)PermissionError: [Errno 13] Permission deniedWe will use pretty neat solution by mingheng posted here.
yml:In the meantime, create another task in /jupyter/task2/directory, this time let it just sleep 20 seconds.
Build the image with tag task2.
Lastly rewrite the method inside launcher.
pyto actually run the containers:If you run the dag now and wait until do_task_one and do_task_two will run, you can use docker psto see the docker containers actually getting launched:This looks like this on UI:You are also able to read the logs directly from Jupyter:Neat!(If you follow the code by checking out commits, we are currently here: 21395ef1b56b6eb56dd07b0f8a7102f5d109fe73)#3.
Rewrite task2 to save its result to tar filecode.
ipynb should contain one cell:This is pretty basic — we save our result to /tmp/result.
tgz and will retrieve its using docker API.
You could of course save the json to database or s3.
Push && pull results automaticallyIn launcher.
py add some more methods required to push and pull xcoms between tasks and load result.
tgzthen tweak launch_docker_container method to use it:#5.
sh to run.
py and push the params inside containerRemove run.
sh replacing it with run.
py , change Dockerfile :COPY run.
pyENTRYPOINT ["python", "run.
py :Push the params inside the container:#6.
Change tasks so that there is some kind of dependencyJust pass one parameter from one task to another and use it.
Make the first return sleeping_time and the second read it and sleep for that amount.
Copy-paste(for now) each Dockerfile and run.
py and rebuild each container.
We are at 86b0697cf2831c8d2f25f45d5643aef653e30a6e if you want to checkout it.
After all those steps rebuild images and run DAG.
You should see that indeed task i_require_data_from_previous_taskhas correctly received parameter from generate_data_for_next_taskand was sleeping for 12 seconds(and then resent value later as its own result):read_xcoms logsPart three — refactor the unmaintanable code and automate the processWe have just created the basic pipeline.
Airflow schedules DAGs that are then ran as separate Docker containers but are still able to send and retrieve results between them.
However, it still is just a stub.
The code works but is not reusable at all.
Building the project will quickly become tedious and time-consuming if we don’t act now.
Our next steps:rewrite scripts into classes and create separate packagescreate a scripts to build each image and install required dependencies#1.
py a class#2.
and use it in the DAG#3.
py (in one of the tasks)#4.
make PapermillRunner and ResultSaver separate modulesCreate new directory at the top level and move respective implementations to __init__.
py ( run.
py class definition should be there)setup.
py of papermill_runner (the result_saver is similar, but has empty list of requirements):from setuptools import setupsetup(name='papermill_runner', version='0.
1', packages=['papermill_runner'], install_requires=[ 'papermill', ], zip_safe=False)and the __init__.
py of result_saver :#5.
modify Dockerfiles(NOTE: You can use PyPiinstead and install them inside tasks using requirements.
txt as with any other python package.
We will instead copy directories containing packages, build images and then remove the temporary catalogs.
This enables us not to publish our code anywhere.
You could use your own PyPi repository as well and that would solve the public-publish problem, it is not in the scope of this tutorial though)You may also remove run.
py as we no longer need it.
create buildscriptThis will allow us to use:python build_images.
pyto build every task in /docker/ catalogpython build_image.
py -t task1 to build specific taskpython build_image.
py -l to browse docker logsCreate build_images.
py at the top of our project:#7.
Modify notebooks to use ResultSaverFinally modify all notebooks to use our ResultSaver (you propably have to switch venv to airflow_jupyter, cd into result_saver catalog and run pip install .
for it to work), for example:Then run python build_images.
pyand trigger the final DAG.
If all went well we are done (and at b95c17bd38bc394b9f6f8dfb3c2e68f597ef57d6).
Summary of current stateour DAG looks like this:all those blocks can be easily run as separate, isolated Docker containers (Airflow is not a true worker anymore!)we are able to pass data from inside the containers downstream to the dependent taskseach task also has access to its parents resultsour script build_images.
py can traverse /docker/ directory and for each subdirectory there build a docker image(and provides it our custom made python libraries)Last minute edits:The final implementation in the repository linked above differs a little from this text as I have noticed that docker-py library was pretty old.
I upgraded it to the newest one though, so everything stays the same except for the docker calls being slightly different.
What has not been done/shown or is done poorly:copying back the papermill's output notebook (fairly simple to do, after that you might want to store it somewhere, e.
in S3)running container with Scala or R(also simple, just make sure to follow the same convention of saving result with result.
tgz and reading args/yaml)passing credentials to the container (use Airflow’s Variables or Connections mechanism)how to build more complicated DAGs(but it was never the goal of this tutorial)…testingcontainer logging could use some work, as binary strings provided by docker-py are not the prettiestversioning the docker images (why not use Airflow’s Variable mechanism so that ContainerLauncher fetches the specified version and in the meantime tweak our build_images.
pyto ask which version should he build?)actual scaling (to do so you can either use docker-swarm or rewrite ContainerLauncher to launch the tasks in the cloud, for example AWS Lambda launching AWS Batch job and then polling the result will do the trick)deployment (with docker-compose that should be fairly easy, you might have to add docker images pushing/pulling when building/launching respectively, also use docker registry)However, I am certain this example will speed you up on your way to implement Airflow.
You can take it from here and fit the system to your actual needs.