logo
down
shadow

How to create operators from list in Airflow?


How to create operators from list in Airflow?

Content Index :

How to create operators from list in Airflow?
Tag : development , By : Nick Pegg
Date : November 23 2020, 04:01 AM

should help you out Yes, in fact it's exactly what you described. Simply instantiate your operators in your for loop. Make sure your task ids are unique and you're set:
BQ_TABLE_NAME_CATEGORIES = Variable.get("tables_categories")
BQ_TABLE_NAME_PRODUCTS = Variable.get("tables_products")

list = [BQ_TABLE_NAME_CATEGORIES, BQ_TABLE_NAME_PRODUCTS]

for table in list:
    import_op = MySqlToGoogleCloudStorageOperator(
        task_id=`import_${table}`,
        mysql_conn_id='c_mysql',
        google_cloud_storage_conn_id='gcp_a',
        approx_max_file_size_bytes = 100000000, #100MB per file
        sql = `import_${table}.sql`,
        bucket=GCS_BUCKET_ID,
        filename=file_name,
        dag=dag)
    gcs_to_bigquery_op = GoogleCloudStorageToBigQueryOperator(
        dag=dag,
        task_id=`load_${table}_to_BigQuery`,
        bucket=GCS_BUCKET_ID,
        destination_project_dataset_table=table_name_template,
        source_format='NEWLINE_DELIMITED_JSON',
        source_objects=[uri_template_read_from],
        schema_fields=Categories(),
        src_fmt_configs={'ignoreUnknownValues': True},
        create_disposition='CREATE_IF_NEEDED',
        write_disposition='WRITE_TRUNCATE',
        skip_leading_rows = 1,
        google_cloud_storage_conn_id=CONNECTION_ID,
        bigquery_conn_id=CONNECTION_ID)


    import_op >> gcs_to_bigquery_op
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

for table in BQ_TABLES:
    ...
// bq_tables = "table_products,table_orders"
BQ_TABLES = Variable.get("bq_tables").split(',')

with DAG('…dag_id…', …) as dag:
    for table in BQ_TABLES:
        MySqlToGoogleCloudStorageOperator(
            task_id=f'import_{table}',
            sql=f'import_{table}.sql',
            …  # all params except notably there's no `dag=dag` in here.
        ) >> GoogleCloudStorageToBigQueryOperator(  # Yup, …
            task_id=f'load_{table}_to_BigQuery',
            …  # again all but `dag=dag` in here.
        )

Comments
No Comments Right Now !

Boards Message :
You Must Login Or Sign Up to Add Your Comments .

Share : facebook icon twitter icon

Airflow: Why is there a start_date for operators?


Tag : development , By : gorbiz
Date : March 29 2020, 07:55 AM
like below fixes the issue Regarding start_date on task instance, personally I have never used this, I always just have a single DAG start_date.
However from what I can see this would allow you to specify certain tasks to start at a different time from the main DAG. It appears this is a legacy feature and from reading the FAQ they recommend using time sensors for that type of thing instead and just having one start_date for all tasks passed through the DAG.

how to create custom operators in airflow and use them in airflow template which is running through cloud composer(in go


Tag : python , By : user143729
Date : March 29 2020, 07:55 AM
hope this fix your issue You need to use Airflow Plugins. https://airflow.apache.org/plugins.html

Airflow - No module named 'airflow.contrib.operators.mssql_to_gcs'


Tag : sql-server , By : AnToni00
Date : March 29 2020, 07:55 AM
Does that help MsSqlToGoogleCloudStorageOperator was introduced on Apr 22 2019. And it hasn't been released yet. If you want to use it, copy the code and use it as your own plugin.
Reference: https://github.com/apache/airflow/commits/master/airflow/contrib/operators/mssql_to_gcs.py

Installing (python3) airflow does not create airflow directory


Tag : development , By : Marcos de Carvalho
Date : March 29 2020, 07:55 AM
I hope this helps you . Found that unlike with python2, airflow in python3 does not create the ~/airflow dir until the first time airflow is run
[airflow@airflowetl ~]$ airflow initdb
[2019-10-18 13:36:56,916] {__init__.py:51} INFO - Using executor SequentialExecutor
DB: sqlite:////home/airflow/airflow/airflow.db
[2019-10-18 13:36:57,438] {db.py:369} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/home/airflow/.local/lib/python3.6/site-packages/alembic/ddl/sqlite.py:39: UserWarning: Skipping unsupported ALTER for creation of implicit constraint
  "Skipping unsupported ALTER for "
....
INFO  [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
WARNI [airflow.utils.log.logging_mixin.LoggingMixin] cryptography not found - values will not be stored encrypted.
Done.
[airflow@airflowetl ~]$ cd
[airflow@airflowetl ~]$ ls
airflow
[airflow@airflowetl ~]$ tree airflow/
airflow/
├── airflow.cfg
├── airflow.db
├── logs
│   └── scheduler
│       ├── 2019-10-18
│       └── latest -> /home/airflow/airflow/logs/scheduler/2019-10-18
└── unittests.cfg

4 directories, 3 files
[airflow@airflowetl ~]$ mkdir airflow/dags
[airflow@airflowetl ~]$ cd
[airflow@airflowetl ~]$ pwd
/home/airflow
[airflow@airflowetl ~]$ find . -name airflow
./.local/lib/python3.6/site-packages/airflow
./.local/lib/python3.6/site-packages/airflow/bin/airflow
./.local/lib/python3.6/site-packages/airflow/www/templates/airflow
./.local/lib/python3.6/site-packages/airflow/www_rbac/templates/airflow
./.local/bin/airflow
[airflow@airflowetl ~]$ echo "PATH=$PATH:~/.local/bin" >> ~/.bashrc
[airflow@airflowetl ~]$ source ~/.bashrc

Airflow: Using macro outside of operators


Tag : hadoop , By : pttr
Date : March 29 2020, 07:55 AM
Related Posts Related QUESTIONS :
  • How Do I Add Active Directory To APIM Using Terraform?
  • How to get the old parameter values in Blazor OnParameterSet?
  • How to debug "ERROR: Could not reach the worker node."?
  • How chain indefinite amount of flatMap operators in Reactor?
  • extract dates and times from string in Redshift
  • How do I make a column of 3 cards match in height in bootstrapVue?
  • how to replace missing values from another column in PySpark?
  • only read last line of text file (C++ Builder)
  • Snakemake --forceall --dag results in mysterius Error: <stdin>: syntax error in line 1 near 'File' from Graphvis
  • How Can I Remove Demo Products From APIM Created With Terraform?
  • How to avoid cloning a big integer in rust
  • Break a row of words into word groups in Hive
  • How can I add a path variable to existing files in an Installshield project converted from MSI
  • Certain languages are not available in postman; is there a way to enable it?
  • Concatenation step of U-Net for unequal number of channels
  • HL Fabric - states, transactions but varied keys
  • How to handle "flood wait" errors when using telethon.sync?
  • Any way to make closure which takes a destructured array?
  • What is the Difference between @PeculiarVentures 's `webcrypto` and `node-webcrypto-ossl`?
  • DWG Sheet Combination failing on AutoDesk Forge
  • karate.log(args) on afterScenario hook is not embedded on surefire json file
  • How do I output latest distinct values of specific fields and all other colums?
  • Clarification on lit-element components and where to browse them
  • Will websockets over HTTP2 also be multiplexed in streams?
  • How to apply switch statement for multi columns in datatables
  • frobot framework - Usage outside testing
  • How do I build against the UCRT with mingw-w64?
  • How to use someClass.android.ts and someClass.ios.ts without errors
  • ADB Connection to Samsung smart tv
  • is there a way to 2 create multiple command files in cypress
  • Best way to filter DBpedia results and return a specific results using SPARQL
  • Is it possible to use unicode combining characters to combine arbitrary characters?
  • Antlr4 extremely simple grammar failing
  • Neighbor of 10 wrong answer?
  • PDFlib - setting stroke and fill opacity (transparency)
  • AWS Lambda + Serverless, where/how to deploy js module that couldn't be bundled?
  • how to place mobile call from PWA
  • How to get connected clients and client certificate in node-opcua server
  • Passing dictionary from one template to another in Helm
  • Kivy. Position of GridLayout inside ScrollView
  • How can I try to place a pending order every X minutes till it's successfull?
  • Is there a way to download the SonarLint report generated in Eclipse IDE?
  • How to Open Port in Windows Firewall using C++ Builder?
  • How to put "OR" operator in Karate API assertion statement
  • Get .model.json as String
  • Proof Process busy on combine_split
  • Does memoization work on smple .select with strings?
  • Check if movement ended
  • Determine If a String Is Present in a List or Map?
  • Shortest_Path Interpretation of Edge Weight
  • Azure Pipelines - What's the difference between a Pipeline artifact and a Build artifact?
  • How to save content of bilion websites found by search engine (how google is doing it)
  • dynamodb index does not return all data
  • Either scp or roles claim need to be present in the token using when application permissions to read sharepoint sites
  • how to speed up sympy-matrix of matrics calculation runtime
  • SNMP Walk and Get / GetNext for MIBs that are not supported by agent
  • Using Puppeteer, how to get Chrome DevTools' "Network" tab's timing information?
  • Twig uses htmlspecialchars internally for escaping. How do I pass ENT_NOQUOTES?
  • How to use @pnp/sp to retrieve users for a People Picker
  • How to find the last letter of a line with TUSTEP
  • shadow
    Privacy Policy - Terms - Contact Us © scrbit.com