Apache Airflow je velmi oblíbený framework pro plánování, spouštění a monitorování úloh, které jsou seskupeny do DAG (directed-acyclic graph). Každý DAG má několik parametrů, které popisují, jak a kdy bude DAG spuštěn. Samotný DAG se skládá z úloh uspořádaných do toku. Parametry DAG jsou definovány jako vlastnosti třídy DAG a uloženy v kódu. Toto řešení je v mnoha případech dostačující.
Konfigurace DAG však může být delegována a uložena jinde – v nějaké databázi, která je propojena s GUI pro externí uživatele.
Tím lze některé parametry DAG definovat bez dotyku se zdrojem kódu, např. od nevývojářů. Představte si například chemickou laboratoř, kde jsou automatizované procesy řízeny pomocí Airflow a chemici by mohli měnit některé parametry pomocí webového rozhraní.
V tomto krátkém příběhu ukážu, jak používat externí konfigurační zdroj k dynamickému vytváření a konfiguraci DAGů. Jeden předpoklad, který je zde učiněn, jsou všechny DAG podobné, pokud jde o úkoly a vztahy. Proto je prostřednictvím databáze konfigurovatelných pouze několik parametrů:
- rozvrhování času
- parametry provedení
Řešení se skládá ze dvou DAG:
- read_config který je zodpovědný za načtení konfigurace z databáze
- dynamické_dags který je zodpovědný za vytváření DAG na základě konfigurace
Někdo by se mohl ptát, proč potřebujeme dva DAG a proč nemít vše v jednom DAG. Je to kvůli tomu, jak Airflow zpracovává soubory Pythonu. Plánovač každou n-tou sekundu prohledá soubory ve složce dags/ a vyhodnotí je pomocí interpretu Pythonu. Frekvence skenování je řízena parametrem dag_dir_list_interval.
Proto bychom během vyhodnocovací části neměli dělat žádné drahé akce – jednou z nich je samozřejmě připojení k databázi a čtení tabulek.
Místo toho by měla být část pro čtení databáze přesunuta do kódu, který spouští operátor (jako PythonOperator). A to je přesně to, co se děje v read_config DAG.
Uvnitř DAG existuje jediná úloha spuštěná PythonOperatorem, která to dělá
- Načíst konfiguraci z databáze (tj. config.dags )
- Vložte konfiguraci do proměnné Průtok vzduchu
A je to. Proměnné úložiště Airflow slouží k uchování konfigurace (pomocí formátu JSON). Níže je definice DAG:
protokolování importu
z datetime import timedelta
importovat proudění vzduchu
import mysql.connector
z airflow import DAG
z airflow.models.connection import Připojení
z airflow.models.variable import Proměnná
from airflow.operators.python import PythonOperator
logger = logging.getLogger("airflow.task")
default_args = "vlastník": "průtok vzduchu",
"depends_on_past": Nepravda,
"opakování": 0,
"retry_delay": timedelta(minut=5),
>
mysql_connection = Connection.get_connection_from_secrets("mysql")
def read_dags_config():
db_conn = mysql.connector.connect(host=mysql_connection.host, user=mysql_connection.login,
password=mysql_connection.password, database='config')
kurzor = db_conn.cursor()
kurzor.execute("vybrat id, povoleno, plán, popis z config.dags")
řádky = kurzor.fetchall()
pokud jsou řádky Žádné:
řádky = []
logger.info(f"Konfigurovat řádky: ")
pokud délka(řádky) > 0:
Variable.set("dags_config", rows, serialize_json=True)
s DAG(
"read_config",
default_args=default_args,
schedule_interval="@hourly",
start_date=airflow.utils.dates.days_ago(0),
catchup=False) jako dag:
PythonOperator(task_id="read-config", python_callable=read_dags_config, dag=dag)
Konfigurace z databáze je připravena každou hodinu. Řádky jsou serializovány do JSON a uloženy do proměnné proudění vzduchu:
Druhý dag — Dynamic DAG — je zodpovědný za vytváření DAG. Řešení využívá způsob, jakým Airflow zpracovává soubory Python. V zásadě během skenování souborů v dags/Airflow hledá objekty, které jsou typu DAG. Interně je to Soubor py je vyhodnocen interpretem Pythonu a poté je prohledán slovník globals().
Postup je přímočarý. Nejprve získáme konfiguraci z proměnné se seznamem DAGů k vytvoření. Dále tento seznam iterujeme a spustíme funkci, která vrací objekt DAG. A tento objekt DAG umístíme jako proměnnou do slovníku global().
z datetime import timedelta
importovat proudění vzduchu
z airflow import DAG
z airflow.models.variable import Proměnná
from airflow.operators.python import PythonOperator
default_args = "vlastník": "průtok vzduchu",
"depends_on_past": Nepravda,
"start_date": airflow.utils.dates.days_ago(0),
"opakování": 2,
"retry_delay": timedelta(minut=5),
>
def create_dag(dag_id: str, plán: str = žádné, popis: str = žádné):
dag = DAG(
dag_id,
default_args=default_args,
harmonogram_interval=plán,
dagrun_timeout=timedelta(hodiny=1),
catchup=False,
popis=popis)
task_1 = PythonOperator(task_id="úloha_1", python_callable=lambda: x+1, dag=dag)
task_2 = PythonOperator(task_id="úloha_2", python_callable=lambda: 1, dag=dag)
úkol_1 >> úkol_2
vrátit dag
dags_config = Variable.get("dags_config", deserialize_json=True)
pro dag_id, plán, popis v dags_config:
globals()[f"dag-"] = create_dag(f"dag-", rozvrh, popis)
Zde je třeba zdůraznit dvě důležité části. Funkce create_dag je zodpovědná za celý proces definování úkolů a vztahů mezi nimi. A poslední část iteruje přes konfigurace z DB. Všimněte si použití vestavěné metody globals(), která vrací slovník.
Abych to shrnul, Airflow stále není nic jiného než běžný kód Pythonu. Nic tedy nebrání využití všech vlastností jazyka a ekosystému.
Doufám, že se vám příběh líbil a mohl by vám pomoci při každodenní práci. Pokud máte nějaké dotazy nebo návrhy, neváhejte mě kontaktovat prostřednictvím Twitteru.
Jednou z největších předností Apache Airflow je jeho schopnost škálovat tak, aby vyhovovala měnícím se požadavkům vaší organizace. Chcete-li proudění vzduchu maximálně využít, existuje několik klíčových nastavení, která byste měli při zvětšování svých datových kanálů změnit.
Průtok vzduchu odhaluje řadu parametrů, které úzce souvisejí s DAG a výkonem na úrovni úkolů. Tyto zahrnují:
- Nastavení na úrovni prostředí.
- Nastavení na úrovni DAG.
- Nastavení na úrovni úkolu.
V této příručce se dozvíte o klíčových parametrech, které můžete použít k úpravě výkonu Airflow. také se dozvíte, jak může váš výběr exekutora ovlivnit škálování a jak nejlépe reagovat na běžné problémy se škálováním.
Tato příručka odkazuje na parametry dostupné ve verzi Airflow 2.0 a novější. Pokud používáte dřívější verzi Airflow, některé názvy parametrů se mohou lišit.
Jiné způsoby učení
Existuje mnoho zdrojů, jak se o tomto tématu dozvědět. Viz také:
- Webinář: Scaling Out Airflow.
Předpokládané znalosti
Abyste z tohoto průvodce vytěžili maximum, měli byste rozumět následujícímu:
- Součásti jádra proudění vzduchu. Viz součásti Airflow.
- Vykonavatelé proudění vzduchu. Viz vysvětlení spouštěčů proudění vzduchu.
Ladění parametrů
Proudění vzduchu má mnoho parametrů, které ovlivňují jeho výkon. Vyladění těchto nastavení může ovlivnit výkon analýzy DAG a plánování úloh, paralelismus ve vašem prostředí Airflow a další.
Důvodem, proč Airflow umožňuje tolik úprav, je to, že jako agnostický orchestrátor se Airflow používá pro širokou škálu případů použití. Správci toku vzduchu nebo inženýři DevOps mohou vyladit parametry škálování na úrovni prostředí, aby zajistili, že jejich podpůrná infrastruktura nebude přetížená, zatímco autoři DAG mohou ladit parametry škálování na úrovni DAG nebo úkolu, aby zajistili, že jejich potrubí nezahltí externí systémy. Znalost požadavků vašeho případu použití před škálováním Airflow vám pomůže vybrat, které parametry upravit.
Nastavení na úrovni prostředí
Nastavení na úrovni prostředí jsou ta, která ovlivňují celé vaše prostředí Airflow (všechny DAG). Všechny mají výchozí hodnoty, které lze přepsat nastavením příslušné proměnné prostředí nebo úpravou souboru airflow.cfg. Obecně lze všechny výchozí hodnoty nalézt v Referenční příručce konfigurace proudění vzduchu. Chcete-li zkontrolovat aktuální hodnoty pro stávající prostředí proudění vzduchu, přejděte na administrátor > Konfigurace v uživatelském rozhraní Airflow. Další informace najdete v části Nastavení možností konfigurace v dokumentaci Apache Airflow.
Pokud na Astronomeru používáte Airflow, měli byste tyto parametry upravit pomocí proměnných prostředí Astronomeru. Další informace najdete v části Proměnné prostředí na Astronomeru.
Pokud chcete vyladit výkon ve všech DAG ve vašem prostředí Airflow, měli byste upravit nastavení na úrovni prostředí. To je zvláště důležité, pokud chcete, aby vaše DAG dobře fungovaly ve vaší podpůrné infrastruktuře.
Základní nastavení
Základní nastavení řídí počet procesů běžících současně a jak dlouho procesy běží v celém prostředí Airflow. Přidružené proměnné prostředí pro všechny parametry v této části jsou formátovány jako AIRFLOW__CORE__PARAMETER_NAME .
- paralelismus : Maximální počet úloh, které mohou běžet souběžně na každém plánovači v rámci jednoho prostředí Airflow. Pokud je například toto nastavení nastaveno na 32 a existují dva plánovače, pak nemůže být více než 64 úloh ve stavu spuštěných nebo zařazených do fronty najednou napříč všemi DAG. Pokud vaše úlohy zůstanou v naplánovaném stavu po delší dobu, možná budete chtít tuto hodnotu zvýšit. Výchozí hodnota je 32. Na Astro je tato hodnota nastavena automaticky na základě vašeho maximálního počtu pracovníků, což znamená, že ji nemusíte konfigurovat.
- max_active_tasks_per_dag (dříve dag_concurrency ): Maximální počet úloh, které lze naplánovat najednou, na DAG. Toto nastavení použijte, abyste zabránili tomu, aby kterýkoli DAG zabíral příliš mnoho dostupných slotů z paralelismu nebo vašich fondů. Výchozí hodnota je 16. Pokud zvýšíte množství zdrojů dostupných pro Airflow (jako jsou Celery pracovníci nebo zdroje Kubernetes) a všimnete si, že úlohy stále neběží podle očekávání, možná budete muset zvýšit hodnoty paralelismu i max_active_tasks_per_dag .
- max_active_runs_per_dag : Určuje maximální počet aktivních běhů DAG (na DAG), které může plánovač toku vzduchu vytvořit najednou. V Airflow představuje běh DAG konkretizaci DAG v čase, podobně jako instance úlohy představuje konkretizaci úlohy. Tento parametr je nejdůležitější, pokud Airflow potřebuje doplnit zmeškané DAG běhy. Při nastavování tohoto parametru zvažte, jak chcete s těmito scénáři zacházet. Výchozí hodnota je 16.
- dag_file_processor_timeout : Jak dlouho může běžet DagFileProcessor , který zpracovává soubor DAG, než vyprší časový limit. Výchozí hodnota je 50 sekund.
- dagbag_import_timeout : Jak dlouho může dagbag importovat objekty DAG, než vyprší časový limit v sekundách, který musí být nižší než hodnota nastavená pro dag_file_processor_timeout . Pokud vaše protokoly zpracování DAG ukazují časové limity nebo pokud se váš DAG nezobrazuje v seznamu DAG nebo chyby importu, zkuste tuto hodnotu zvýšit. Můžete také zkusit zvýšit tuto hodnotu, pokud se vaše úkoly neprovádějí, protože pracovníci musí při provádění úkolů naplnit pytlík. Výchozí hodnota je 30 sekund.
Nastavení plánovače
Nastavení konfigurace plánovače řídí, jak plánovač analyzuje soubory DAG a vytváří běhy DAG. Přidružené proměnné prostředí pro všechny parametry v této části jsou ve formátu AIRFLOW__SCHEDULER__PARAMETER_NAME .
- min_file_process_interval : Frekvence, se kterou je každý soubor DAG analyzován, v sekundách. Aktualizace DAG se projeví po tomto intervalu. Nízké číslo zvyšuje využití CPU plánovače. Pokud máte dynamické DAG vytvořené složitým kódem, můžete tuto hodnotu zvýšit, abyste zlepšili výkon plánovače. Výchozí hodnota je 30 sekund.
- dag_dir_list_interval : Frekvence, kdy jsou v adresáři DAGs prohledávány nové soubory, v sekundách. Čím nižší je hodnota, tím rychleji se zpracovávají nové DAG a tím vyšší je využití procesoru. Výchozí hodnota je 300 sekund (5 minut). Je užitečné vědět, jak dlouho trvá analyzovat vaše DAG ( dag_processing.total_parse_time ), abyste věděli, jaké hodnoty zvolit pro min_file_process_interval a dag_dir_list_interval . Pokud je váš interval_datového_adresáře kratší než doba potřebná k analýze každého DAG, mohou nastat problémy s výkonem.
Pokud máte méně než 200 DAG v nasazení na Astro, je bezpečné nastavit AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30 (30 sekund) jako proměnnou prostředí na úrovni nasazení.
Nastavení proudění vzduchu na úrovni DAG
Nastavení na úrovni DAG se vztahují pouze na konkrétní DAG a jsou definována ve vašem kódu DAG. Pokud chcete vyladit výkon konkrétního DAG, měli byste upravit nastavení na úrovni DAG, zejména v případech, kdy tento DAG zasahuje do externího systému, jako je rozhraní API nebo databáze, které by mohly způsobit problémy s výkonem, pokud by byly zásahy příliš časté. Pokud existuje nastavení na úrovni DAG i na úrovni prostředí, má přednost nastavení na úrovni DAG.
Existují tři primární nastavení proudění vzduchu na úrovni DAG, která můžete definovat v kódu:
- max_active_runs : Maximální počet aktivních spuštění DAG povolených pro DAG. Když je tento limit překročen, plánovač nevytvoří nové aktivní běhy DAG. Pokud toto nastavení není definováno, předpokládá se hodnota nastavení na úrovni prostředí max_active_runs_per_dag. Pokud pro svůj DAG používáte catchup nebo backfill, zvažte definování tohoto parametru, abyste zajistili, že náhodně nespustíte vysoký počet spuštění DAG.
- max_active_tasks :** Celkový počet úloh, které lze spustit současně pro daný běh DAG. V podstatě řídí paralelismus ve vašem DAG. Pokud toto nastavení není definováno, předpokládá se hodnota nastavení na úrovni prostředí max_active_tasks_per_dag.
- concurrency :** Maximální počet instancí úloh, které mohou být spuštěny souběžně ve všech aktivních spuštěních DAG pro daný DAG. To vám umožní umožnit jednomu DAG spouštět 32 úloh najednou a jinému DAG lze nastavit spouštění 16 úloh najednou. Pokud toto nastavení není definováno, předpokládá se hodnota nastavení na úrovni prostředí max_active_tasks_per_dag.
V rámci definice DAG můžete definovat libovolné nastavení na úrovni DAG. Například:
- TaskFlow API
- Tradiční syntaxe
# Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs @dag("my_dag_id", concurrency=10, max_active_runs=3) def my_dag():
# Allow a maximum of concurrent 10 tasks across a max of 3 active DAG runs with DAG("my_dag_id", concurrency=10, max_active_runs=3):
Nastavení proudění vzduchu na úrovni úkolu
Nastavení na úrovni úloh jsou definována operátory úloh, které můžete použít k implementaci dalších úprav výkonu. Upravte nastavení na úrovni úlohy, když určité typy úloh způsobují problémy s výkonem.
Existují dvě primární nastavení proudění vzduchu na úrovni úkolu, která mohou uživatelé definovat v kódu:
- max_active_tis_per_dag (dříve task_concurrency ): Maximální počet, kolikrát může stejná úloha běžet souběžně ve všech spuštěních DAG. Pokud například úloha stahuje z externího zdroje, jako je datová tabulka, která by neměla být upravována více úlohami najednou, můžete tuto hodnotu nastavit na 1.
- pool : Definuje množství fondů dostupných pro úlohu. Fondy představují způsob, jak omezit počet souběžných instancí libovolné skupiny úkolů. Toto nastavení je užitečné, pokud máte mnoho pracovníků nebo paralelně běží DAG, ale chcete se vyhnout limitu rychlosti API nebo jinak nechcete zahltit zdroj dat nebo cíl. Další informace naleznete v příručce Airflow Pools Guide.
Výše uvedené parametry jsou zděděny z BaseOperator , takže je můžete nastavit v libovolné definici operátoru. Například:
- TaskFlow API
- Tradiční syntaxe
@task( pool="my_custom_pool", max_active_tis_per_dag=14 ) def t1(): pass
def t1_func(): pass t1 = PythonOperator( task_id="t1", python_callable=t1_func, pool="my_custom_pool", max_active_tis_per_dag=14 )
Exekutoři a škálování
V závislosti na tom, který spouštěč si vyberete pro své prostředí Airflow, existují další nastavení, která je třeba mít na paměti při škálování.
Vykonavatel celeru
Exekutor Celery využívá k provádění úkolů stojící pracovníky. Škálování pomocí programu Celery zahrnuje výběr počtu i velikosti pracovníků dostupných pro Airflow. Čím více pracovníků máte ve svém prostředí k dispozici nebo čím větší jsou vaši pracovníci, tím větší kapacitu máte na souběžné spouštění úloh.
Můžete také vyladit worker_concurrency (proměnná prostředí: AIRFLOW__CELERY__WORKER_CONCURRENCY ), která určuje, kolik úloh může každý pracovník Celery spustit v danou chvíli. Ve výchozím nastavení spouští exekutor Celery maximálně šestnáct úloh současně. Pokud zvýšíte worker_concurrency , možná budete muset svým pracovníkům zajistit další CPU a/nebo paměť.
Exekutor Kubernetes
Exekutor Kubernetes spustí pod v clusteru Kubernetes pro každou úlohu. Vzhledem k tomu, že každá úloha běží ve svém vlastním modulu, lze zdroje specifikovat na úrovni jednotlivých úloh.
Při ladění výkonu pomocí spouštěče Kubernetes je důležité zvážit podpůrnou infrastrukturu vašeho clusteru Kubernetes. Mnoho uživatelů povolí na svém clusteru automatické škálování, aby zajistili, že získají výhodu elasticity Kubernetes.
Můžete také vyladit worker_pods_creation_batch_size (proměnná prostředí: AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE), která určuje, kolik podů lze vytvořit na smyčku plánovače. Výchozí hodnota je 1, ale pro lepší výkon budete chtít toto číslo zvýšit, zvláště pokud máte souběžné úkoly. Maximální hodnota je určena tolerancí vašeho clusteru Kubernetes.
Možné problémy s měřítkem
Škálování vašeho prostředí Airflow je umění a ne věda a velmi závisí na vaší podpůrné infrastruktuře a vašich DAG. Níže jsou uvedeny některé z nejčastějších problémů:
- Latence plánování úloh je vysoká.
- Plánovač nemusí mít dostatek prostředků k analýze DAG, aby pak mohl naplánovat úlohy.
- Změňte worker_concurrency (pokud používáte Celery) nebo paralelismus .
- Počet naplánovaných úloh může být mimo kapacitu vaší infrastruktury Airflow.
- Pokud používáte exekutor Kubernetes, zkontrolujte, zda jsou v oboru názvů dostupné prostředky, a zkontrolujte, zda lze worker_pods_creation_batch_size zvýšit. Pokud používáte exekutor Celery, zkontrolujte, zda lze zvýšit worker_concurrency.
- Možné úzké místo na úrovni DAG.
- Změňte max_active_task_per_dag , fondy (pokud je používáte) nebo celkový paralelismus .
Potřebujete-li pomoc s problémy s škálováním, zvažte připojení k Apache Airflow Slack nebo kontaktujte podporu Astronomer.