Example: Project Alpha - Scheduled File Downloads¶
This example demonstrates the scheduled_file_downloads category within the ingestion stage of the dept_project_alpha department. Its primary purpose is to show how to download files from specified URLs, store them locally in a content-addressable manner (for this demo), and create manifest entries.
Key Features Demonstrated¶
- Downloading files from external URLs.
- Calculating content hashes (SHA256) for idempotency and versioning.
- Storing files locally in a structured "Content-Addressable Storage" (CAS-like) demo format:
<base_demo_artifacts_dir>/<data_source_name>/<file_hash>/<original_filename>. - Generating a local JSON manifest file alongside each downloaded document, containing its metadata.
- Creating a Prefect Markdown artifact in the UI that summarizes the download and points to the local manifest and document paths (within the worker).
- Usage of department-specific task wrappers calling core logic functions.
File Structure¶
- Category Flow:
flows/dept_project_alpha/ingestion/scheduled_file_downloads/download_scheduled_files_flow_dept_project_alpha.py - Task Wrappers:
flows/dept_project_alpha/ingestion/scheduled_file_downloads/tasks/download_file_task_dept_project_alpha.py(This wrapper calls the core logic for download, hash, and storage)create_manifest_archive_task_dept_project_alpha.py(This wrapper calls the core logic for creating the Prefect artifact)
- Category Configuration:
configs/variables/dept_project_alpha/ingestion/scheduled_file_downloads/download_scheduled_files_config_dept_project_alpha.yaml - Task Configurations (Optional):
configs/variables/dept_project_alpha/ingestion/scheduled_file_downloads/tasks/(e.g.,download_file_task_config_dept_project_alpha.yaml) - Core Logic:
airnub_prefect_starter/core/file_handlers.py(containingcore_process_scheduled_file_downloadwhich handles download, hash, local CAS storage, and local JSON manifest creation)airnub_prefect_starter/common/utils.py(e.g.,generate_sha256_for_file)airnub_prefect_starter/core/artifact_creators.py(e.g.,core_create_file_download_artifact_with_manifest_link)airnub_prefect_starter/core/manifest_models.py(e.g.,DemoFileManifestEntryPydantic model)
Configuration Example¶
The download_scheduled_files_config_dept_project_alpha.yaml (which informs the dept_project_alpha_ingestion_scheduled_file_downloads_config Prefect Variable, typically nested under the main stage config variable) might look like this:
# Part of the content for dept_project_alpha_ingestion_scheduled_file_downloads_config Variable
data_source_name: "project_alpha_demo_files"
# logical_path_base_demo: "ProjectAlpha/ScheduledDownloads" # Optional for conceptual grouping in manifests
local_artifacts_storage_base: "/app/local_demo_artifacts/downloads" # Base path inside worker for CAS
files_to_acquire:
- name: "Prefect README Snapshot"
url: "[https://raw.githubusercontent.com/PrefectHQ/prefect/main/README.md](https://raw.githubusercontent.com/PrefectHQ/prefect/main/README.md)"
link_title: "Prefect Main README"
source_page_url: "[https://github.com/PrefectHQ/prefect](https://github.com/PrefectHQ/prefect)" # Example
- name: "Sample Text File (Alice in Wonderland)"
url: "[https://www.gutenberg.org/files/11/11-0.txt](https://www.gutenberg.org/files/11/11-0.txt)"
link_title: "Alice's Adventures in Wonderland (Plain Text)"
Flow Logic Overview¶
The download_scheduled_files_flow_dept_project_alpha.py (the category flow for "Scheduled File Downloads" within "Project Alpha's" ingestion stage) typically orchestrates the following steps:
-
Load Configuration:
- The flow receives its specific configuration dictionary as a parameter from its parent stage flow (
ingestion_flow_dept_project_alpha.py). This configuration originates from thedept_project_alpha_ingestion_scheduled_file_downloads_configPrefect Variable (which was populated from the corresponding YAML file,configs/variables/dept_project_alpha/ingestion/scheduled_file_downloads/download_scheduled_files_config_dept_project_alpha.yaml). - This config typically includes:
data_source_name: An identifier for this specific data source (e.g.,project_alpha_demo_files).local_artifacts_storage_base: The base path within the worker container where downloaded files and their JSON manifests will be stored (e.g.,/app/local_demo_artifacts/downloads). This path is often derived fromconfig_ds.DATA_DIRwithin the core logic.files_to_acquire: A list of dictionaries, each specifying a file'sname(for logging/display),url(for download),link_title, and optionallysource_page_url.
- The flow receives its specific configuration dictionary as a parameter from its parent stage flow (
-
Iterate and Process Files: The flow loops through each item in the
files_to_acquirelist. For each file:- It calls the department-specific task wrapper, for example,
download_file_task_dept_project_alpha. This task is located inflows/dept_project_alpha/ingestion/scheduled_file_downloads/tasks/. - This task wrapper then calls the core logic function
core_process_scheduled_file_download(fromairnub_prefect_starter/core/file_handlers.py).
- It calls the department-specific task wrapper, for example,
-
Core Logic (
core_process_scheduled_file_download): This central function inairnub_prefect_starter/core/file_handlers.pyperforms the main work:- Downloads the file: Uses
httpxto fetch the content from the providedurland saves it to a temporary file within the worker container. It also captures HTTP headers and the original filename (sanitized). - Calculates Content Hash: Computes the SHA256 hash of the downloaded file's content using
generate_sha256_for_file(fromairnub_prefect_starter/common/utils.py). - Idempotency Check (Conceptual for Demo): For this demo, idempotency might involve checking if a file with the same content hash already exists in the target local CAS directory. A more robust system (which users can build upon this template) would use a persistent manifest (like a database or a central manifest file store) to track processed hashes.
- Stores File Locally (CAS-like): If the file is determined to be new (or reprocessing is intended), the temporary file is moved to a structured local directory. The structure is:
<local_artifacts_storage_base>/<data_source_name>/<file_content_hash>/<sanitized_original_filename>(e.g.,/app/local_demo_artifacts/downloads/project_alpha_demo_files/a1b2c3d4e5f6.../Prefect_Main_README.md) - Creates Local JSON Manifest: A
DemoFileManifestEntryPydantic model instance (fromairnub_prefect_starter/core/manifest_models.py) is populated with metadata (source URL, hash, local storage path, headers, timestamp, etc.). This model is then serialized to a JSON string and saved as<sanitized_original_filename>.manifest.jsonalongside the document in the CAS directory. - The core logic function returns a dictionary containing all this metadata and the processing status (e.g., "SUCCESS", "FAILED_DOWNLOAD").
- Downloads the file: Uses
-
Create Prefect UI Artifact:
- After the file processing task wrapper (
download_file_task_dept_project_alpha) completes and returns the metadata dictionary, the category flow calls another department-specific task wrapper, for example,create_manifest_archive_task_dept_project_alpha. - This task wrapper calls
core_create_file_download_artifact_with_manifest_link(fromairnub_prefect_starter/core/artifact_creators.py). - This core function takes the metadata dictionary (returned by the previous task) and generates a Prefect Markdown artifact. This artifact is visible in the Prefect UI for the flow run and provides a summary of the downloaded file, its hash, and the paths to the document and its JSON manifest within the worker container.
- After the file processing task wrapper (
Expected Output¶
When this "Scheduled File Downloads" category flow for "Project Alpha" runs successfully for a new file:
- Logs: Detailed logs in the Prefect UI and worker console, tracing the download, hashing, local storage of the file and its JSON manifest, and Prefect UI artifact creation steps.
- Local File Storage (in Worker):
- The downloaded file will be stored in a path similar to:
/app/local_demo_artifacts/downloads/project_alpha_demo_files/<SHA256_HASH>/<original_filename> - (If you have volume-mapped a host directory like
./data/project_alpha_demo_outputs/downloads/to the worker's/app/local_demo_artifacts/downloads/viadocker-compose.yml, you can browse these files on your host machine.)
- The downloaded file will be stored in a path similar to:
- Local JSON Manifest File (in Worker):
- A JSON file named
<original_filename>.manifest.jsonwill be created in the same directory as the downloaded file above (i.e., within the<SHA256_HASH>subdirectory). It will contain structured metadata about the downloaded file based on theDemoFileManifestEntryPydantic model.
- A JSON file named
- Prefect UI Artifact:
- A Markdown artifact will appear on the flow run's page in the Prefect UI. This artifact will summarize the downloaded file's details (source URL, hash, size, content type) and include the worker paths to both the stored document and its local JSON manifest file.
This example showcases a complete local ingestion pipeline for files, including content hashing for idempotency principles, local CAS-like storage for demo purposes, local JSON manifest generation, and rich observability through Prefect artifacts.