AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / coding / 问题 / 79174236
Accepted
lalaland
lalaland
Asked: 2024-11-10 14:04:44 +0800 CST2024-11-10 14:04:44 +0800 CST 2024-11-10 14:04:44 +0800 CST

[Airflow]:使用 Xcoms 在 DockerOperator 上进行动态任务映射

  • 772

我正在创建一个应该执行以下操作的 dag:

  • 获取事件 ID
  • 对于每个事件 ID,获取事件详细信息 (DockerOperator)

下面的代码是我尝试做我想做的事情:

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue


leagues = ["league1", "league2", "league3"]




@dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
)
    task_fetch_ids = PythonOperator(
        task_id="fetch_detail",
        ...)


    task_fetch_detail = DockerOperator(
        task_id="fetch_detail",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
        )


    task_fetch_ids >> task_fetch_detail


上面的代码显然不起作用,因为我正在循环遍历一个字符串。正确的语法是什么?

python
  • 1 1 个回答
  • 35 Views

1 个回答

  • Voted
  1. Best Answer
    raphaelauv
    2024-11-12T04:14:33+08:002024-11-12T04:14:33+08:00

    您必须使 xcom 返回适应动态任务映射运算符的参数

    
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    from airflow import DAG
    from airflow.providers.docker.operators.docker import DockerOperator
    
    dag = DAG(
        dag_id="docker_dag",
        schedule_interval=None,
        start_date=days_ago(1),
    )
    with dag:
        def fn_get_work():
            return ["a", "b", "c"]
    
    
        get_work_task = PythonOperator(task_id="get_work",
                                       python_callable=fn_get_work
                                       )
    
    
        def fn_build(work):
            rst = []
            for i in work:
                rst.append(f"fetch-event --event-id {i}")
            return rst
    
    
        build_work_task = PythonOperator(task_id="build_work",
                                         python_callable=fn_build,
                                         op_kwargs={"work": get_work_task.output})
    
        run_work_task = DockerOperator.partial(
            task_id="run_work",
            image="alpine:3.16.2",
        ).expand(command=build_work_task.output)
    
        get_work_task >> build_work_task >> run_work_task
    
    
    • 1

相关问题

  • 如何将 for 循环拆分为 3 个单独的数据框?

  • 如何检查 Pandas DataFrame 中的所有浮点列是否近似相等或接近

  • “load_dataset”如何工作,因为它没有检测示例文件?

  • 为什么 pandas.eval() 字符串比较返回 False

  • Python tkinter/ ttkboostrap dateentry 在只读状态下不起作用

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    Vue 3:创建时出错“预期标识符但发现‘导入’”[重复]

    • 1 个回答
  • Marko Smith

    为什么这个简单而小的 Java 代码在所有 Graal JVM 上的运行速度都快 30 倍,但在任何 Oracle JVM 上却不行?

    • 1 个回答
  • Marko Smith

    具有指定基础类型但没有枚举器的“枚举类”的用途是什么?

    • 1 个回答
  • Marko Smith

    如何修复未手动导入的模块的 MODULE_NOT_FOUND 错误?

    • 6 个回答
  • Marko Smith

    `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它?

    • 3 个回答
  • Marko Smith

    何时应使用 std::inplace_vector 而不是 std::vector?

    • 3 个回答
  • Marko Smith

    在 C++ 中,一个不执行任何操作的空程序需要 204KB 的堆,但在 C 中则不需要

    • 1 个回答
  • Marko Smith

    PowerBI 目前与 BigQuery 不兼容:Simba 驱动程序与 Windows 更新有关

    • 2 个回答
  • Marko Smith

    AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String”

    • 1 个回答
  • Marko Smith

    我正在尝试仅使用海龟随机和数学模块来制作吃豆人游戏

    • 1 个回答
  • Martin Hope
    Aleksandr Dubinsky 为什么 InetAddress 上的 switch 模式匹配会失败,并出现“未涵盖所有可能的输入值”? 2024-12-23 06:56:21 +0800 CST
  • Martin Hope
    Phillip Borge 为什么这个简单而小的 Java 代码在所有 Graal JVM 上的运行速度都快 30 倍,但在任何 Oracle JVM 上却不行? 2024-12-12 20:46:46 +0800 CST
  • Martin Hope
    Oodini 具有指定基础类型但没有枚举器的“枚举类”的用途是什么? 2024-12-12 06:27:11 +0800 CST
  • Martin Hope
    sleeptightAnsiC `(表达式,左值) = 右值` 在 C 或 C++ 中是有效的赋值吗?为什么有些编译器会接受/拒绝它? 2024-11-09 07:18:53 +0800 CST
  • Martin Hope
    The Mad Gamer 何时应使用 std::inplace_vector 而不是 std::vector? 2024-10-29 23:01:00 +0800 CST
  • Martin Hope
    Chad Feller 在 5.2 版中,bash 条件语句中的 [[ .. ]] 中的分号现在是可选的吗? 2024-10-21 05:50:33 +0800 CST
  • Martin Hope
    Wrench 为什么双破折号 (--) 会导致此 MariaDB 子句评估为 true? 2024-05-05 13:37:20 +0800 CST
  • Martin Hope
    Waket Zheng 为什么 `dict(id=1, **{'id': 2})` 有时会引发 `KeyError: 'id'` 而不是 TypeError? 2024-05-04 14:19:19 +0800 CST
  • Martin Hope
    user924 AdMob:MobileAds.initialize() - 对于某些设备,“java.lang.Integer 无法转换为 java.lang.String” 2024-03-20 03:12:31 +0800 CST
  • Martin Hope
    MarkB 为什么 GCC 生成有条件执行 SIMD 实现的代码? 2024-02-17 06:17:14 +0800 CST

热门标签

python javascript c++ c# java typescript sql reactjs html

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve