我正在创建一个应该执行以下操作的 dag:
- 获取事件 ID
- 对于每个事件 ID,获取事件详细信息 (DockerOperator)
下面的代码是我尝试做我想做的事情:
import re
from datetime import datetime
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue
leagues = ["league1", "league2", "league3"]
@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
)
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)
task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)
task_fetch_ids >> task_fetch_detail
上面的代码显然不起作用,因为我正在循环遍历一个字符串。正确的语法是什么?
您必须使 xcom 返回适应动态任务映射运算符的参数