setting up s3 for logs in airflow
You need to set up the S3 connection through Airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.
An example configuration would be:
Conn Id: my_conn_S3
Conn Type: S3
Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
(Updated as of Airflow 1.10.2)
Here's a solution if you don't use the admin UI.
My Airflow doesn't run on a persistent server ... (It gets launched afresh every day in a Docker container, on Heroku.) I know I'm missing out on a lot of great features, but in my minimal setup, I never touch the admin UI or the cfg file. Instead, I have to set Airflow-specific environment variables in a bash script, which overrides the .cfg file.
apache-airflow[s3]
First of all, you need the s3
subpackage installed to write your Airflow logs to S3. (boto3
works fine for the Python jobs within your DAGs, but the S3Hook
depends on the s3 subpackage.)
One more side note: conda install doesn't handle this yet, so I have to do pip install apache-airflow[s3]
.
Environment variables
In a bash script, I set these core
variables. Starting from these instructions but using the naming convention AIRFLOW__{SECTION}__{KEY}
for environment variables, I do:
export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
S3 connection ID
The s3_uri
above is a connection ID that I made up. In Airflow, it corresponds to another environment variable, AIRFLOW_CONN_S3_URI
. The value of that is your S3 path, which has to be in URI form. That's
s3://access_key:secret_key@bucket/key
Store this however you handle other sensitive environment variables.
With this configuration, Airflow will be able to write your logs to S3. They will follow the path of s3://bucket/key/dag/task_id/timestamp/1.log
.
Appendix on upgrading from Airflow 1.8 to Airflow 1.10
I recently upgraded my production pipeline from Airflow 1.8 to 1.9, and then 1.10. Good news is that the changes are pretty tiny; the rest of the work was just figuring out nuances with the package installations (unrelated to the original question about S3 logs).
(1) First of all, I needed to upgrade to Python 3.6 with Airflow 1.9.
(2) The package name changed from airflow
to apache-airflow
with 1.9. You also might run into this in your pip install
.
(3) The package psutil
has to be in a specific version range for Airflow. You might encounter this when you're doing pip install apache-airflow
.
(4) python3-dev headers are needed with Airflow 1.9+.
(5) Here are the substantive changes: export AIRFLOW__CORE__REMOTE_LOGGING=True
is now required. And
(6) The logs have a slightly different path in S3, which I updated in the answer: s3://bucket/key/dag/task_id/timestamp/1.log
.
But that's it! The logs did not work in 1.9, so I recommend just going straight to 1.10, now that it's available.
UPDATE Airflow 1.10 makes logging a lot easier.
For s3 logging, set up the connection hook as per the above answer
and then simply add the following to airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
For gcs logging,
Install the gcp_api package first, like so: pip install apache-airflow[gcp_api].
Set up the connection hook as per the above answer
Add the following to airflow.cfg
[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn
NOTE: As of Airflow 1.9 remote logging has been significantly altered. If you are using 1.9, read on.
Reference here
Complete Instructions:
Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config
Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py
Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.
Customize the following portions of the template:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.
Update $AIRFLOW_HOME/airflow.cfg to contain:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.
Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py