Estou criando um dag que deve fazer o seguinte:
- buscar ids de eventos
- para cada id de evento, buscar detalhes do evento ( DockerOperator )
O código abaixo é minha tentativa de fazer o que eu quero:
import re
from datetime import datetime
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue
leagues = ["league1", "league2", "league3"]
@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
)
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)
task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)
task_fetch_ids >> task_fetch_detail
O acima claramente não funciona porque estou fazendo um loop por uma string. Qual é a sintaxe correta?
você deve adaptar o retorno xcom aos argumentos do operador de mapeamento de tarefa dinâmica