मैं ट्रिगर करने के लिए एक pyspark मॉड्यूल से airflow का उपयोग कर एक sparksubmit ऑपरेटर. लेकिन, pyspark मॉड्यूल लेने के लिए की जरूरत चिंगारी सत्र चर के रूप में एक तर्क है. मैं का इस्तेमाल किया है application_args पारित करने के लिए पैरामीटर के लिए pyspark मॉड्यूल. लेकिन, जब मैं दौड़ा डेग चिंगारी प्रस्तुत ऑपरेटर हो रही है, विफल रही है और मैं पारित कर दिया पैरामीटर में माना जाता है के रूप में कोई भी प्रकार चर है । की जरूरत है कैसे पता करने के लिए पारित करने के लिए एक तर्क करने के लिए एक pyspark मॉड्यूल के माध्यम से शुरू spark_submit_operator.
के डेग कोड के नीचे है:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PRJT").enableHiveSupport().getOrCreate()
spark_config = {
'conn_id': 'spark_default',
'driver_memory': '1g',
'executor_cores': 1,
'num_executors': 1,
'executor_memory': '1g'
}
dag = DAG(
dag_id="spark_session_prgm",
default_args=default_args,
schedule_interval='@daily',
catchup=False)
spark_submit_task1 = SparkSubmitOperator(
task_id='spark_submit_task1',
application='/home/airflow_home/dags/tmp_spark_1.py',
application_args=['spark'],
**spark_config, dag=dag)
नमूना कोड में tmp_spark_1.py कार्यक्रम: