在使用 pytest 测试气流时,我遇到了一个错误。
# tests/conftest.py
import datetime
import pytest
from airflow.models import DAG
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={
"owner": "airflow",
"start_date": datetime.datetime(2025, 4, 5),
"end_date": datetime.datetime(2025, 4, 6)
},
schedule=datetime.timedelta(days=1)
)
# tests/test_instance_context.py
import datetime
from airflow.models import BaseOperator
from airflow.models.dag import DAG
from airflow.utils import timezone
class SampleDAG(BaseOperator):
template_fields = ("_start_date", "_end_date")
def __init__(self, start_date, end_date, **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._end_date = end_date
def execute(self, context):
context["ti"].xcom_push(key="start_date", value=self.start_date)
context["ti"].xcom_push(key="end_date", value=self.end_date)
return context
def test_execute(test_dag: DAG):
task = SampleDAG(
task_id="test",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
dag=test_dag
)
task.run(
start_date=test_dag.default_args["start_date"],
end_date=test_dag.default_args["end_date"]
)
expected_start_date = datetime.datetime(2025, 4, 5, tzinfo=timezone.utc)
expected_end_date = datetime.datetime(2025, 4, 6, tzinfo=timezone.utc)
assert task.start_date == expected_start_date
assert task.end_date == expected_end_date
测试代码已通过,但我在这里遇到了一个问题。
tests/test_instance_context.py [2025-04-26T12:51:18.289+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-05T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.
[2025-04-26T12:51:18.303+0000] {taskinstance.py:2604} INFO - Dependencies not met for <TaskInstance: test_dag.test manual__2025-04-06T00:00:00+00:00 [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' state.
.
我想测试 task.run 来查看task.run
和之间的区别task.execute
。当我传递 jinja 变量时,airflow 会自动通过 run 方法渲染变量。
所以,我想看看 prev_ds、ds、start_date 和 end_date 是否成功渲染。但是我收到了上面的错误。
发生此错误的原因是您的
SampleDAG
运算符在执行过程中失败,这导致后续运行由于任务的“失败”状态而失败。让我们逐步修复此问题:代码中的关键问题:
属性不匹配:您正在使用
self.start_date
和self.end_date
,execute()
但这些属性不存在(您定义了self._start_date
和self._end_date
)。模板渲染:您正在传递 Jinja 模板(
{{ prev_ds }}
,{{ ds }}
),但在执行之前没有正确渲染它们。任务执行上下文:该
run()
方法需要适当的上下文来进行模板渲染。