Eu dockerizei o DAG com o Airflow, que basicamente copia um arquivo CSV para o GCP, carrega-o no BigQuery e realiza uma transformação simples. Quando executo o docker-compose run, meu DAG é executado duas vezes. Não consigo entender em qual parte do código isso está acontecendo. Quando aciono o DAG manualmente na interface do usuário, ele é executado uma vez.
Meu Dag:
from scripts import extract_and_gcpload, load_to_BQ
default_args = {
'owner': 'shweta',
'start_date': datetime(2025, 4, 24),
'retries': 0
}
with DAG(
'spacex_etl_dag',
default_args=default_args,
schedule_interval=None,
schedule=None,
catchup=False #prevents Airflow from running missed periods
) as dag:
extract_and_upload = PythonOperator(
task_id="extract_and_upload_to_gcs",
python_callable=extract_and_gcpload.load_to_gcp_pipeline,
)
load_to_bq = PythonOperator(
task_id="load_to_BQ",
python_callable=load_to_BQ.load_csv_to_bigquery
)
run_dbt = BashOperator(
task_id="run_dbt",
bash_command="cd '/opt/airflow/dbt/my_dbt' && dbt run --profiles-dir /opt/airflow/dbt"
)
extract_and_upload >> load_to_bq >> run_dbt
Meu arquivo de ponto de entrada startscript.sh :
#!/bin/bash
set -euo pipefail
log() {
echo "[$(date +'%Y-%m-%d %H:%M:%S')] $1"
}
# Optional: Run DB init & parse DAGs
log "Initializing Airflow DB..."
airflow db upgrade
log "Parsing DAGs..."
airflow scheduler --num-runs 1
DAG_ID="spacex_etl_dag"
log "Unpausing DAG: $DAG_ID"
airflow dags unpause "$DAG_ID" || true
log "Triggering DAG: $DAG_ID"
airflow dags trigger "$DAG_ID" || true
log "Creating admin user (if not exists)..."
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email [email protected] \
--password admin || true
if [[ "$1" == "webserver" || "$1" == "scheduler" ]]; then
log "Starting Airflow: $1"
exec airflow "$@"
else
log "Executing: $@"
exec "$@"
fi
Meu arquivo docker-compose.yaml:
services:
airflow-webserver:
build:
context: .
dockerfile: Dockerfile
container_name: airflow-webserver
env_file: .env
restart: always
environment:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
AIRFLOW__LOGGING__REMOTE_LOGGING: 'False'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/secrets/llms-395417-c18ea70a3f54.json
volumes:
- ./dags:/opt/airflow/dags
- ./scripts:/opt/airflow/scripts
- ./dbt:/opt/airflow/dbt
- ./secrets:/opt/airflow/secrets
ports:
- 8080:8080
command: webserver
airflow-scheduler:
build:
context: .
dockerfile: Dockerfile
container_name: airflow-scheduler
env_file: .env
restart: always
environment:
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
AIRFLOW__LOGGING__REMOTE_LOGGING: 'False'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/secrets/llms-395417-c18ea70a3f54.json
volumes:
- ./dags:/opt/airflow/dags
- ./dbt:/opt/airflow/dbt
- ./secrets:/opt/airflow/secrets
- ./scripts:/opt/airflow/scripts
depends_on:
- postgres
command: scheduler
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
volumes:
postgres-db-volume: