How to create operators from list in Airflow?

Tag : development , By : Nick Pegg
Date : November 23 2020, 04:01 AM

should help you out Yes, in fact it's exactly what you described. Simply instantiate your operators in your for loop. Make sure your task ids are unique and you're set:
BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
BQ_TABLE_NAME_PRODUCTS = Variable.get("tables_products")


for table in list:
    import_op = MySqlToGoogleCloudStorageOperator(
        approx_max_file_size_bytes = 100000000, #100MB per file
        sql = `import_${table}.sql`,
    gcs_to_bigquery_op = GoogleCloudStorageToBigQueryOperator(
        src_fmt_configs={'ignoreUnknownValues': True},
        skip_leading_rows = 1,

    import_op >> gcs_to_bigquery_op
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

for table in BQ_TABLES:
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

with DAG('…dag_id…', …) as dag:
    for table in BQ_TABLES:
            …  # all params except notably there's no `dag=dag` in here.
        ) >> GoogleCloudStorageToBigQueryOperator(  # Yup, …
            …  # again all but `dag=dag` in here.

Airflow: Why is there a start_date for operators?

Tag : development , By : gorbiz
Date : March 29 2020, 07:55 AM
like below fixes the issue Regarding start_date on task instance, personally I have never used this, I always just have a single DAG start_date.
However from what I can see this would allow you to specify certain tasks to start at a different time from the main DAG. It appears this is a legacy feature and from reading the FAQ they recommend using time sensors for that type of thing instead and just having one start_date for all tasks passed through the DAG.

how to create custom operators in airflow and use them in airflow template which is running through cloud composer(in go

Tag : python , By : user143729
Date : March 29 2020, 07:55 AM
hope this fix your issue You need to use Airflow Plugins. https://airflow.apache.org/plugins.html

Airflow - No module named 'airflow.contrib.operators.mssql_to_gcs'

Tag : sql-server , By : AnToni00
Date : March 29 2020, 07:55 AM
Does that help MsSqlToGoogleCloudStorageOperator was introduced on Apr 22 2019. And it hasn't been released yet. If you want to use it, copy the code and use it as your own plugin.
Reference: https://github.com/apache/airflow/commits/master/airflow/contrib/operators/mssql_to_gcs.py

Installing (python3) airflow does not create airflow directory

Tag : development , By : Marcos de Carvalho
Date : March 29 2020, 07:55 AM
I hope this helps you . Found that unlike with python2, airflow in python3 does not create the ~/airflow dir until the first time airflow is run
[airflow@airflowetl ~]$ airflow initdb
[2019-10-18 13:36:56,916] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////home/airflow/airflow/airflow.db
[2019-10-18 13:36:57,438] {db.py:369} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/home/airflow/.local/lib/python3.6/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  "Skipping unsupported ALTER for "
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
WARNI [airflow.utils.log.logging_mixin.LoggingMixin] cryptography not found - values will not be stored encrypted.
[airflow@airflowetl ~]$ cd
[airflow@airflowetl ~]$ ls
[airflow@airflowetl ~]$ tree airflow/
├── airflow.cfg
├── airflow.db
├── logs
│   └── scheduler
│       ├── 2019-10-18
│       └── latest -> /home/airflow/airflow/logs/scheduler/2019-10-18
└── unittests.cfg

4 directories, 3 files
[airflow@airflowetl ~]$ mkdir airflow/dags
[airflow@airflowetl ~]$ cd
[airflow@airflowetl ~]$ pwd
[airflow@airflowetl ~]$ find . -name airflow
[airflow@airflowetl ~]$ echo "PATH=$PATH:~/.local/bin" >> ~/.bashrc
[airflow@airflowetl ~]$ source ~/.bashrc

Airflow: Using macro outside of operators

Tag : hadoop , By : pttr
Date : March 29 2020, 07:55 AM
