Skip to content
Go back

Building a Robust Multi-Format Data Ingestion Pipeline with Auditability and Versioning

Updated:
Edit page
Abstract data flow concept with illuminated nodes and connections
Photo by Alvaro Reyes on Unsplash

Table of Contents

Open Table of Contents

Context & Problem

Raw datasets arrive in many forms — CSV dumps, Parquet exports, JSON logs. Without a controlled ingestion process, data scientists risk working on slightly different copies, applying transformations inconsistently, and losing track of what’s “truth.”

For a 20-week AI/ML program where experiments will span months, we needed: • Consistency – Every dataset loaded the same way, regardless of format. • Auditability – Ability to trace back to the exact raw data used in any run. • Versioning – Keep historical “snapshots” without overwriting.

The ingestion pipeline became the first brick in this foundation.


Design Principles

Before writing a single line of code, we set guardrails: • Format-Agnostic – Load CSV, Parquet, and JSON with equal ease. • Centralized Config – Paths, constants, and helpers in one place (config.py). • Consistent Logging – Structured logs with rotation to avoid runaway files. • Schema Awareness – Optional validation to catch upstream data drift. • Snapshot Strategy – Timestamped filenames for reproducibility.

These rules allowed us to keep the code DRY, scalable, and easy to integrate later into automated workflows.

Implementation Highlights

Centralized Config (config.py)

A single source of truth for: • Directory mapping via _DIR_MAP → get_dir(). • Utility functions (timestamped_filename, default parquet engine selection). • Constants for schema expectations and logging defaults.

Why it matters: Changing a directory or file-naming rule now takes one edit, not dozens.

Structured Logging with Rotation

Implemented get_logger() with: • RotatingFileHandler to cap logs at 1MB with 5 backups. • A formatter ensuring timestamps and log levels are uniform. • Safeguards to avoid duplicate handlers.

Impact: Weeks later, we’ll have readable logs for every ingestion run — without filling the disk.

Multi-Format Loader

ingest_file() now detects file type by extension:

if file_path.suffix.lower() == ".csv":
    df = pd.read_csv(file_path, nrows=nrows)
elif file_path.suffix.lower() == ".parquet":
    eng = default_pq_engine(fmt=filename)
    df = eng(file_path)
elif file_path.suffix.lower() == ".json":
    df = pd.read_json(file_path, nrows=nrows)

Benefit: The same code path handles multiple formats — no special scripts per dataset.

CLI Integration

A _build_parser() function exposes: • —out-format (CSV, Parquet) • —validate-schema (on/off) • —nrows (sampling) • —log-level (DEBUG/INFO/WARNING)

Run it directly from the terminal, schedule via cron, or hook into CI/CD.

Operational Considerations

•	Error Handling – Exceptions during logging setup, loading, or saving are logged, not fatal.
•	Schema Validation – Optional check vs. EXPECTED_COLUMNS. Flags bad data early.
•	Snapshot Versioning – timestamped_filename() ensures no overwrites.

These features mean the pipeline can run unattended without silent data corruption.

Strategic Impact

This isn’t just “loading data.” It’s establishing reproducibility and traceability for the entire 20-week AI/ML journey.

In later weeks, this same ingestion layer will: • Feed cleaned data into feature engineering (Weeks 5-8). • Act as the input stage for model training pipelines (Weeks 10-14). • Support benchmark reproducibility in deployment simulations (Weeks 18-20).

By investing in this now, we eliminate future headaches where “it worked yesterday, but not today” becomes a blocker.

Servers and network cables in a data center
Photo by Thomas Barrett on Unsplash

Next Steps:

The ingestion module is live. Next, we’ll integrate profiling & EDA automation to summarize datasets immediately upon ingestion — accelerating Week 2’s exploration phase.


Edit page
Share this post on:

Next Post
Production-Grade Data Cleaning Workflow for the Titanic Dataset (pandas, 2025)