程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

Airflow dag, wait for certain period if triggered at a certain time

发布于2025-01-02 16:10     阅读(1033)     评论(0)     点赞(4)     收藏(1)


I have an airflow DAG dag-A that is triggered from another DAG, sometimes, this dag-A is triggered at 4 pm UTC (midnight EST), and when it gets triggered at midnight EST (4PM UTC), I want it to wait for 30 minutes and then start running at 16:30 UTC.

Generally it should run when it is triggered, but if triggered between 16:00 and 16:30 UTC it should wait till 16:30.

I have done this using sleep method, but trying to do same using TimeSensor, as that would be non blocking while sleep is blocking.

Here is my code that is leading me into infinite loop, as it keep waiting for 16:30 even if task was check_time returns false. I want that if it is not between 16:00 and 16:30 then run the dq_taks but if it is between 16:00 and 16:30 then wait until it becomes 16:30 and then run dq_task.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.time import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime

def check_time_and_delay():
    now = datetime.now()
    return now.hour == 16 and now.minute >= 0 and now.minute <= 30

with DAG(
    'delayed_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval=None,
) as dag:

    check_time = PythonOperator(
        task_id='check_time',
        python_callable=check_time_and_delay
    )

    wait_until_12_30 = TimeSensor(
        task_id='wait_until_12_30',
        target_time=atetime.strptime('16:30:00', '%H:%M:%S').time(),
        mode='reschedule',
        timeout=1800 
    )

    run_dq_check = PythonOperator(
        task_id='run_dq_check',
        python_callable=lambda: print("Running DQ check")
    )

    # Define the task flow
    check_time >> [wait_until_12_30, run_dq_check]
    wait_until_12_30 >> run_dq_check

The code that works but uses sleep method:

def check_if_midnight_and_wait(execution_date):
    if execution_date.hour == 4 and execution_date.minute < 30:   # 4PM utc is midnight in EST
        wait_time = timedelta(minutes=30 - execution_date.minute)
        print(f"Waiting for {wait_time} minutes to start at 00:30 AM EST.")
        sleep(wait_time.total_seconds())
# task:
  wait_and_start = PythonOperator(
        task_id='wait_and_start',
        python_callable=check_if_midnight_and_wait,
        provide_context=True
    )

 wait_and_start >> dq_check

解决方案


Try to use BranchPythonOperator

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.time_sensor import TimeSensor
from airflow.utils.dates import days_ago
from datetime import datetime, time

def check_time_and_branch():
    now = datetime.utcnow()
    if now.hour == 16 and 0 <= now.minute < 30:
        return 'wait_until_16_30'
    return 'run_dq_check'

with DAG(
    'delayed_dag',
    default_args={'start_date': days_ago(1)},
    schedule_interval=None,
) as dag:

    check_time = BranchPythonOperator(
        task_id='check_time',
        python_callable=check_time_and_branch
    )

    wait_until_16_30 = TimeSensor(
        task_id='wait_until_16_30',
        target_time=time(16, 30),
        mode='reschedule',
        timeout=1800 
    )

    run_dq_check = PythonOperator(
        task_id='run_dq_check',
        python_callable=lambda: print("Running DQ check")
    )

    check_time >> [wait_until_16_30, run_dq_check]
    wait_until_16_30 >> run_dq_check


所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/2046662/657f0c3430b216d22435/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

4 0
收藏该文
已收藏

评论内容:(最多支持255个字符)