合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# 使用操作器 操作器代表一个理想情况下是幂等的任务。 操作员确定DAG运行时实际执行的内容。 有关更多信息,请参阅[Operators Concepts](https://apachecn.github.io/airflow-doc-zh/concepts.html)文档和[Operators API Reference](https://apachecn.github.io/airflow-doc-zh/code.html) 。 * [BashOperator](9) * [模板](9) * [故障排除](9) * [找不到Jinja模板](9) * [PythonOperator](9) * [传递参数](9) * [模板](9) * [Google云端平台运营商](9) * [GoogleCloudStorageToBigQueryOperator](9) ## [BashOperator](9) 使用[`BashOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.bash_operator.BashOperator")在[Bash](https://www.gnu.org/software/bash/) shell中执行命令。 ``` run_this = BashOperator ( task_id = 'run_after_loop' , bash_command = 'echo 1' , dag = dag ) ``` ### [模板](9) 您可以使用[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html)来参数化`bash_command`参数。 ``` task = BashOperator ( task_id = 'also_run_this' , bash_command = 'echo "run_id={{ run_id }} | dag_run={{ dag_run }}"' , dag = dag ) ``` ### [故障排除](9) #### [找不到Jinja模板](9) 在使用`bash_command`参数直接调用Bash脚本时,在脚本名称后添加空格。 这是因为Airflow尝试将Jinja模板应用于它,这将失败。 ``` t2 = BashOperator ( task_id = 'bash_example' , # This fails with `Jinja template not found` error # bash_command="/home/batcher/test.sh", # This works (has a space after) bash_command = "/home/batcher/test.sh " , dag = dag ) ``` ## [PythonOperator](9) 使用[`PythonOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.python_operator.PythonOperator")执行Python callables。 ``` def print_context ( ds , ** kwargs ): pprint ( kwargs ) print ( ds ) return 'Whatever you return gets printed in the logs' run_this = PythonOperator ( task_id = 'print_the_context' , provide_context = True , python_callable = print_context , dag = dag ) ``` ### [传递参数](9) 使用`op_args`和`op_kwargs`参数将其他参数传递给Python可调用对象。 ``` def my_sleeping_function ( random_base ): """This is a function that will run within the DAG execution""" time . sleep ( random_base ) # Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively for i in range ( 5 ): task = PythonOperator ( task_id = 'sleep_for_' + str ( i ), python_callable = my_sleeping_function , op_kwargs = { 'random_base' : float ( i ) / 10 }, dag = dag ) task . set_upstream ( run_this ) ``` ### [模板](9) 当您将`provide_context`参数设置为`True` ,Airflow会传入一组额外的关键字参数:一个用于每个[Jinja模板变量](https://apachecn.github.io/airflow-doc-zh/code.html)和一个`templates_dict`参数。 `templates_dict`参数是模板化的,因此字典中的每个值都被评估为[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html) 。 ## [Google云端平台运营商](9) ### [GoogleCloudStorageToBigQueryOperator](9) 使用[`GoogleCloudStorageToBigQueryOperator`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator")执行BigQuery加载作业。 ``` load_csv = gcs_to_bq . GoogleCloudStorageToBigQueryOperator ( task_id = 'gcs_to_bq_example' , bucket = 'cloud-samples-data' , source_objects = [ 'bigquery/us-states/us-states.csv' ], destination_project_dataset_table = 'airflow_test.gcs_to_bq_table' , schema_fields = [ { 'name' : 'name' , 'type' : 'STRING' , 'mode' : 'NULLABLE' }, { 'name' : 'post_abbr' , 'type' : 'STRING' , 'mode' : 'NULLABLE' }, ], write_disposition = 'WRITE_TRUNCATE' , dag = dag ) ```