Challenges and Struggle while Setting up Multi-Node Airflow Cluster

chetan khatri
5 min readMay 5, 2019

--

The last blog I wrote on Apache Airflow was Setup and Configure Multi Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines , the question was Did it went smooth or were there some struggle, yeah there were some challenges.

In this blog, I will talk about those issues I have faced in my journey of Setting up Multi-Node Airflow Cluster.

Issue 1: After Changing LocalExecutor to CeleryExecutor, DAG was in Running mode but None of task actually runs.

Worker was not able to communicate with Scheduler with Celery.

Error:

AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:112} ERROR — Error syncing the celery executor, ignoring it:
Apr 10 21:03:52 charlie-prod airflow_control.sh: [2019–04–10 21:03:51,962] {celery_executor.py:113} ERROR — ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’

I have started looking into code of Airflow on the same line number where error has thrown but no clue on what’s going on. But reason was clear that Celery is not able to publish or subscribe messages and were n’t been successful in communication channel.

Solution:

Installed version of the Celery was 3.3.5 (Which is too old and was incompatible with Airflow 1.10 (Current Installed version).

pip install --upgrade celery3.3.5 => 4.3

Issue 2: After running the DAG on CeleryExecutor, DAG failed with some weird error at-least for me.

Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found
Apr 11 14:13:13 charlie-prod airflow_control.sh: [2019–04–11 14:13:13,847: ERROR/ForkPoolWorker-6285] Pool process <celery.concurrency.asynpool.Worker object at 0x7f3a88b7b250> error: TypeError(“Required argument ‘object’ (pos 1) not found”,)
Apr 11 14:13:13 charlie-prod airflow_control.sh: Traceback (most recent call last):
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 289, in __call__
Apr 11 14:13:13 charlie-prod airflow_control.sh: sys.exit(self.workloop(pid=pid))
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 347, in workloop
Apr 11 14:13:13 charlie-prod airflow_control.sh: req = wait_for_job()
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 447, in receive
Apr 11 14:13:13 charlie-prod airflow_control.sh: ready, req = _receive(1.0)
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/pool.py”, line 419, in _recv
Apr 11 14:13:13 charlie-prod airflow_control.sh: return True, loads(get_payload())
Apr 11 14:13:13 charlie-prod airflow_control.sh: File “/usr/lib64/python2.7/site-packages/billiard/common.py”, line 101, in pickle_loads
Apr 11 14:13:13 charlie-prod airflow_control.sh: return load(BytesIO(s))
Apr 11 14:13:13 charlie-prod airflow_control.sh: TypeError: Required argument ‘object’ (pos 1) not found

Solution:

I was not able to figure it anything on this error, really really no clue.

I came through one airflow issue post in Chinese language Ref. https://blog.csdn.net/u013492463/article/details/80881260

I didn’t understand anything but at-least got a little clue that what can be possible cause of this error.

Earlier setting:

broker_url= amqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

I realise that pyamqp would be best choice as many people have used that and earlier blog has given same resolution at some extent.

amqp:// is an alias that uses librabbitmq if available, or py-amqp if it’s not.
You’d use pyamqp:// or librabbitmq:// if you want to specify exactly what transport to use. The pyamqp:// transport uses the ‘amqp’ library (
http://github.com/celery/py-amqp)

Later setting with resolution:

broker_url= pyamqp://guest:guest@{RabbitMQ-HOSTNAME}:5672/

changing amqp to pyamqp has resolved the above error.

Installation:

pip install pyamqp

Issue 3: SQL Alchemy Connection failure

Earlier configuration:

SQL alchemy connection

sql_alchemy_conn = postgresql://airflow:airflow@{HOST_NAME}:5432/airflow

Resolved:

Later configuration:

sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@{HOSTNAME}/airflow

For psycopg2, you need to install pip wheel.

Install PostGreSQL Adaptor: psycopg2

Psycopg is a PostgreSQL adapter for the Python programming language

pip install psycopg2

Issue 4: HDP v 2.6.2 with Ambari, Worker Installation setup on multiple host failure.

After successfully installation of webserver and scheduler at master node i.e Name Node, The goal was to install Celery worker at all Data Nodes so DAGs can run truly parallely and can scale horizontally and vertically.

But Ambari gave wacky expressions :) with below error.

by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by ‘SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)’: /simple/apache-airflow/
Could not fetch URL https://pypi.org/simple/apache-airflow/: There was a problem confirming the ssl certificate: HTTPSConnectionPool(host=’pypi.org’, port=443): Max retries exceeded with url: /simple/apache-airflow/ (Caused by SSLError(SSLError(“bad handshake: SysCallError(104, ‘ECONNRESET’)”,),)) — skipping

Resolution:

This means that pip is not able to download and install wheel on machine. When I was trying to install worker on node using Ambari UI. However, with terminal commands I was able to fire the same commands to install wheel’s of pip.

The common solution for this error was run with trusted-user argument or change the repository from where pypi downloads the wheel.

pip install --trusted-host pypi.python.org --trusted-host pypi.org --trusted-host files.pythonhosted.org --upgrade  --ignore-installed apache-airflow[celery]==1.10.0' returned 1. Collecting apache-airflow[celery]==1.10.0

Tried above one but again it failed with different error stack-track but similar meaning.

resource_management.core.exceptions.ExecutionFailed: Execution of ‘export SLUGIFY_USES_TEXT_UNIDECODE=yes && pip install — trusted-host pypi.python.org — trusted-host pypi.org — trusted-host files.pythonhosted.org — upgrade — ignore-installed apache-airflow[celery]==1.10.0’ returned 1. Collecting apache-airflow[celery]==1.10.0
Retrying (Retry(total=4, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
Retrying (Retry(total=3, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
Retrying (Retry(total=2, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
Retrying (Retry(total=1, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by ‘ProtocolError(‘Connection aborted.’, error(104, ‘Connection reset by peer’))’: /simple/apache-airflow/
Could not find a version that satisfies the requirement apache-airflow[celery]==1.10.0 (from versions: )
No matching distribution found for apache-airflow[celery]==1.10.0
You are using pip version 8.1.2, however version 19.0.3 is available.
You should consider upgrading via the ‘pip install — upgrade pip’ command.

I did upgrade the pip, but no success.

Finally the Hack not may be resolution which worked is by commands I have installed celery and all necessary pip wheel’s which are listed here.

But still it gave same error, But ideally as these wheels installed it was not ignoring though. As per the code Ref. https://github.com/miho120/ambari-airflow-mpack/blob/e1c9ca004adaa3320e35ab7baa7fdb9b9695b635/airflow-service-mpack/common-services/AIRFLOW/1.10.0/package/scripts/airflow_worker_control.py

On the cluster, I have manually commented out those lines temporary (Later reverted back the changes once worker installation was successful) and added the worker from Ambari, which worked like charm :) and this hack made my day.

After installation of worker on another node, you might need airflow service restart from Ambari. You can learn more from my previous blog post; Setup and Configure Multi Node Airflow Cluster with HDP Ambari and Celery for Data Pipelines

--

--

chetan khatri

Volunteer Data Science Adviser & Academic Contributor at University of Kachchh.