发布于2025-01-02 16:10 阅读(1033) 评论(0) 点赞(4) 收藏(1)
I have an airflow DAG dag-A that is triggered from another DAG, sometimes, this dag-A is triggered at 4 pm UTC (midnight EST), and when it gets triggered at midnight EST (4PM UTC), I want it to wait for 30 minutes and then start running at 16:30 UTC.
Generally it should run when it is triggered, but if triggered between 16:00 and 16:30 UTC it should wait till 16:30.
I have done this using sleep method, but trying to do same using TimeSensor, as that would be non blocking while sleep is blocking.
Here is my code that is leading me into infinite loop, as it keep waiting for 16:30 even if task was check_time
returns false. I want that if it is not between 16:00 and 16:30 then run the dq_taks but if it is between 16:00 and 16:30 then wait until it becomes 16:30 and then run dq_task.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime
def check_time_and_delay():
now = datetime.now()
return now.hour == 16 and now.minute >= 0 and now.minute <= 30
with DAG(
'delayed_dag',
default_args={'start_date': days_ago(1)},
schedule_interval=None,
) as dag:
check_time = PythonOperator(
task_id='check_time',
python_callable=check_time_and_delay
)
wait_until_12_30 = TimeSensor(
task_id='wait_until_12_30',
target_time=atetime.strptime('16:30:00', '%H:%M:%S').time(),
mode='reschedule',
timeout=1800
)
run_dq_check = PythonOperator(
task_id='run_dq_check',
python_callable=lambda: print("Running DQ check")
)
# Define the task flow
check_time >> [wait_until_12_30, run_dq_check]
wait_until_12_30 >> run_dq_check
The code that works but uses sleep method:
def check_if_midnight_and_wait(execution_date):
if execution_date.hour == 4 and execution_date.minute < 30: # 4PM utc is midnight in EST
wait_time = timedelta(minutes=30 - execution_date.minute)
print(f"Waiting for {wait_time} minutes to start at 00:30 AM EST.")
sleep(wait_time.total_seconds())
# task:
wait_and_start = PythonOperator(
task_id='wait_and_start',
python_callable=check_if_midnight_and_wait,
provide_context=True
)
wait_and_start >> dq_check
Try to use BranchPythonOperator
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime, time
def check_time_and_branch():
now = datetime.utcnow()
if now.hour == 16 and 0 <= now.minute < 30:
return 'wait_until_16_30'
return 'run_dq_check'
with DAG(
'delayed_dag',
default_args={'start_date': days_ago(1)},
schedule_interval=None,
) as dag:
check_time = BranchPythonOperator(
task_id='check_time',
python_callable=check_time_and_branch
)
wait_until_16_30 = TimeSensor(
task_id='wait_until_16_30',
target_time=time(16, 30),
mode='reschedule',
timeout=1800
)
run_dq_check = PythonOperator(
task_id='run_dq_check',
python_callable=lambda: print("Running DQ check")
)
check_time >> [wait_until_16_30, run_dq_check]
wait_until_16_30 >> run_dq_check
作者:黑洞官方问答小能手
链接:https://www.pythonheidong.com/blog/article/2046662/657f0c3430b216d22435/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!