Core Concepts & Architecture¶
This document describes the high-level architecture and core concepts of the airnub-prefect_starter template. This template provides a foundation for building robust, configurable, and maintainable data pipelines using Prefect 3.
Key Components¶
- Prefect 3 Engine: The workflow orchestration engine used for defining, scheduling, running, and monitoring flows and tasks. This includes the Prefect Server & UI (run locally via Docker Compose in this template, or connectable to Prefect Cloud).
- Prefect Worker: Executes the flow runs submitted by the Prefect Server. In this template, it runs within a Docker container defined by
Dockerfile.worker. - Main Python Package (
airnub_prefect_starter/): This is the primary installable Python package containing your project's reusable code and foundational structures.common/: Houses truly generic, project-wide utilities (e.g.,utils.pyfor functions likesanitize_filename,generate_sha256_hash) and potentially foundational path configurations (e.g., ifcommon/settings.pywere used, though path configuration for data science scripts is primarily indata_science/config_ds.py). The base template does not include application-level database models or DB-specific settings here by default.core/: Contains the core Python logic functions that implement the main data processing and business rules for your operational pipelines. These functions are designed to be Prefect-agnostic and are called by your department-specific Prefect task wrappers. Examples includeapi_handlers.py,file_handlers.py,web_utils.py,artifact_creators.py(for creating Prefect artifacts), andmanifest_models.py(Pydantic models for demo manifest structures).data_science/: Includes modules and scripts typically found in data science projects (e.g., inspired by Cookiecutter Data Science). This is the designated area for data exploration, R&D, and prototyping. It contains its own path configuration (e.g.,config_ds.py) and example scripts likedataset_processing.pyandmodeling/. Logic prototyped here can be refactored intoairnub_prefect_starter/core/for operationalization within Prefect workflows.tasks/: (Initially empty in the base template) This directory is reserved for any future common, reusable Prefect task wrappers that are not specific to a single department/category and might be used across multiple flows.
- Prefect Flow Definitions (
flows/directory):- This top-level directory contains all Prefect flow definitions and their associated department/category-specific task wrappers.
- Structure:
flows/dept_<id>/[stage_verb]_flow_dept_<id>.pyfor parent stage orchestrators. - Category flows and their specific task wrappers reside in
flows/dept_<id>/<stage_name>/<category_id>/.
- Configuration (
configs/directory &.envfile):- Prefect Variables (from
configs/variables/*.yaml): YAML files define runtime parameters for flows and tasks. Thescripts/setup_prefect_variables.pyscript loads these into Prefect Variables. The structure mirrorsflows/. - Prefect Blocks: Used for managing secrets (e.g., API keys), infrastructure configurations (like the
docker-container/local-worker-infrablock for local Docker execution), and connections to external services. Managed viascripts/setup_prefect_blocks.py(for local/generic blocks) or the Prefect UI/CLI. .envfile (at project root): Used bydocker-compose.ymlto set environment variables for the Docker services (e.g., Prefect server database connection, API host settings).configs/department_mapping.yaml: Maps department identifiers to full display names.
- Prefect Variables (from
- Demo Manifests & Storage (Local by Default):
- The "Project Alpha" demo flows are designed to showcase data ingestion, processing, and manifest creation.
- Storage: Downloaded files for the demo are stored locally within the worker container (e.g., in a CAS-like structure under
/app/local_demo_artifacts/, which can be mapped to./data/project_alpha_demo_outputs/on the host via Docker volumes). - Manifests: For the demo, manifest entries are created as:
- Local JSON files stored alongside the data in the CAS-like structure.
- Prefect UI Artifacts (Markdown) for visibility within flow runs.
- The template is extensible to use Prefect Blocks for remote storage (e.g., S3, GCS) if desired.
- Docker (
docker-compose.yml,Dockerfile.worker):- Provides a containerized local development environment for the Prefect server, UI, PostgreSQL database (for Prefect server metadata), and the custom Python worker.
Core Concepts & Workflow (Data Ingestion Example)¶
Let's consider an example ingestion flow from "Project Alpha," such as download_scheduled_files_flow_dept_project_alpha.py. A typical workflow involves:
- Load Configuration:
- The parent stage flow (e.g.,
ingestion_flow_dept_project_alpha.py) loads its main configuration (which includes sections for its category flows) from a Prefect Variable (e.g.,dept_project_alpha_ingestion_config). - It then calls the category flow (e.g.,
download_scheduled_files_flow_dept_project_alpha), passing the relevant part of the configuration.
- The parent stage flow (e.g.,
- Execute Category Flow Logic:
The category flow (
download_scheduled_files_flow_dept_project_alpha.py) performs its specific tasks:- It iterates through a list of file URLs defined in its configuration.
- For each file:
- It calls a department-specific task wrapper (e.g.,
download_file_task_dept_project_alphafromflows/dept_project_alpha/ingestion/scheduled_file_downloads/tasks/). - This task wrapper, in turn, calls core logic functions from
airnub_prefect_starter/core/file_handlers.py(e.g.,core_download_hash_store_and_manifest).
- It calls a department-specific task wrapper (e.g.,
- Core Logic Functions perform the work:
core_download_hash_store_and_manifest(example):- Downloads the file from the URL to a temporary location within the worker.
- Calculates the SHA256 hash of the file content using functions from
common/utils.py. - Idempotency Check (Conceptual for Demo): While the base template doesn't include a persistent cross-run manifest database by default, the principle of idempotency is key. For the demo, idempotency might be checked by seeing if a file with the same hash already exists in the local demo CAS. For a production system, this check would typically involve querying a persistent manifest store (like a database, which a user could add).
- Store Locally (CAS-like): If the file is new (or reprocessing is intended), it's moved to a structured local directory (e.g.,
/app/local_demo_artifacts/downloads/project_alpha_scheduled_files/<hash>/<filename>). - Create Local JSON Manifest: A JSON file detailing metadata (source URL, hash, local path, headers, timestamp, etc., structured by a Pydantic model from
core/manifest_models.py) is saved alongside the document in the CAS directory.
- The task wrapper receives a dictionary of results from the core logic.
- Create Prefect Artifact:
- Another department-specific task (e.g.,
create_manifest_archive_task_dept_project_alpha) is called. - This task uses a function from
airnub_prefect_starter/core/artifact_creators.pyto generate a Prefect Markdown artifact in the UI, summarizing the downloaded file and linking to its local manifest JSON path (within the worker).
- Another department-specific task (e.g.,
- (Optional) Trigger Downstream Processing: The parent stage flow could trigger subsequent stages (e.g., processing flows) based on the success or outputs of the ingestion flows.
Local-First Operation & Extensibility¶
The template is designed to work seamlessly out-of-the-box for local development:
- Prefect Server Database: Uses PostgreSQL running in Docker, configured via
docker-compose.ymland.env. - Demo Data Storage & Manifests: All demo data (downloaded files, scraped content) and their associated JSON manifest files are stored on the local filesystem within the worker container (in a directory like
/app/local_demo_artifacts/which can be volume-mapped from./data/project_alpha_demo_outputs/on the host). - This local-first approach allows developers to quickly get started without configuring external cloud services. The template can then be extended to use cloud storage (S3, GCS, etc.) by:
- Adding optional dependencies (e.g.,
prefect-aws). - Creating and configuring appropriate Prefect storage blocks (e.g.,
S3Bucket). - Modifying the core logic (or adding new logic in
airnub_prefect_starter/core/) and task wrappers to interact with these blocks.
- Adding optional dependencies (e.g.,
Configuration Loading Summary¶
- Prefect Variables: Runtime parameters for flows/tasks are loaded from Prefect Variables, which are populated from YAML files in
configs/variables/viascripts/setup_prefect_variables.py. - Prefect Blocks: Secrets, infrastructure details (like the
docker-container/local-worker-infrablock), and connections to external services are managed as Prefect Blocks. - Python Module Configuration (
airnub_prefect_starter/data_science/config_ds.py): Defines file paths and settings primarily for the standalone data science scripts and notebooks. It sources base paths likePROJECT_ROOTandEFFECTIVE_LOCAL_DATA_ROOTfromairnub_prefect_starter/common/settings.py(if you choose to keep/recreatesettings.pyfor robust path and optional app DB config) or calculates them itself. - Environment Variables (
.env): Used bydocker-compose.ymlto configure services (like Prefect server's database connection and API host) and can be loaded bypython-dotenvin Python scripts for further configuration.
This architecture promotes separation of concerns, clear organization, configurability, and testability, supporting both local development and providing a solid foundation for extension into more complex, cloud-integrated data pipelines.