Chapter 7 — Data Path and Lake Ingestion
I. Objectives and Scope
- Define a unified end-to-end path from device to data lake: data shape, chunking and compression, transport and backpressure, partitioning and naming, manifests and lineage, verification and consistency, plus SLI/SLO and governance requirements.
- Align with I80-8 serialize / export_manifest / import_manifest, I80-7 quality_metrics / monitor_drift, and Core.Threads I70-3/6/7 (channels, rate limiting, tracing).
- Deliver postulates P87-, minimal equations S87-, and the operational workflow Mx-7 for ingestion; guarantee rho < 1 approx stability and a traceable manifest metadata loop.
II. Data Shapes and Lifecycle (Terminology)
- Shapes
record (single observation with ts and sid); chunk (logical block within a batch file); object (storage unit in the lake).
dataset (a set of same-schema objects organized by partitions); manifest (metadata descriptor). - Lifecycle stages
ingest_edge (device/edge gateway) → staging (quarantine/validation) → lake_raw (raw zone) → lake_refined (refined zone) → lake_feature (feature/analytics zone). - Convention immutability
lake_raw forbids mathematical convention switches (e.g., PSD one-sided ↔ two-sided). Such transforms are only allowed from lake_refined onward and must record derivation lineage.
III. Postulates P87- (Data Path Consistency)*
- P87-1 (Immutable objects)
After landing, an object’s hash and size are immutable; any revision is a new object version with lineage edge dep(u,v) recorded in the manifest. - P87-2 (Traceable conventions)
Every object MUST record schema_id/ver/units, fmt/compress, H(f) convention, and PSD normalization convention (e.g., units of S_xx). - P87-3 (Temporal consistency)
ts (UTC) is for audit; tau_mono is for latency, jitter, and queueing evaluation (see Chapter 3). - P87-4 (Idempotent ingestion)
Use idemp_key = hash(sid, ts_range, bytes_hash) to realize sem="exactly_once*" (best-effort via dedup); configure dedup window Delta_t_dedup. - P87-5 (Stability)
Treat every ingestion path as a queueing system; continually enforce rho = lambda / mu < 1, where lambda is chunk arrival rate and mu is service rate.
IV. Minimal Equations S87- (Capacity, Throughput, Latency)*
- S87-1 (Chunk size and sample count)
N_samples_per_chunk = floor( B_target * 8 / ( channels * bits_per_sample ) ).
S_chunk_raw = header_bytes + N_samples_per_chunk * channels * bits_per_sample / 8. - S87-2 (Compression and effective size)
r_c = S_chunk_raw / S_chunk_comp. Under a Gaussian approximation, entropy upper bound
H_est ≈ 0.5 * log2( 2 * pi * e * sigma_x^2 ) (bits/sample), hence r_c <= bits_per_sample / H_est. - S87-3 (Ingest time decomposition)
T_put ≈ T_net + T_comp + T_fs, where T_net = S_chunk_comp / BW_wire_eff. - S87-4 (Queueing approximation and wait)
mu = 1 / E[T_put]; rho = lambda / mu; W_q ≈ rho / ( mu - lambda ) (M/M/1); W = W_q + 1 / mu.
Little’s Law: L = lambda * W — use L to tune channel buffering and small-file rate. - S87-5 (Daily volume and file count)
S_day_comp = S_chunk_comp * N_chunks_day; N_files_day = ceil( S_day_comp / S_target_object ).
V. Serialization and Schema (fmt/schema)
- Schema and units
Columns MUST declare name/type/unit/desc. Time column ts uses ISO-8601 UTC. Dimensions are validated via unit(x) and dim(x) (see Chapter 2). - Format selection
jsonl: debugging / small volumes; csv: broad compatibility but fragile; parquet: columnar, compression-friendly, great for wide time series; nc (netcdf): grids/multidimensional; tfrecord: training samples. - Suggested mappings
Time series: parquet + zstd; grid fields: nc + deflate; hi-fi audio: flac/wavpack; event logs: jsonl + zstd. - Versioning
schema_id = uuid4(); evolve with ver = major.minor.patch; backward-compatible fields publish as nullable + default.
VI. Chunking and Compression
- Objective
Keep object size near S_target_object ∈ [16 MiB, 256 MiB], balancing page alignment, batch read efficiency, and small-file rate. - Suggestions
B_target ∈ [4 MiB, 32 MiB]; columnar page page_size ∈ [64 KiB, 1 MiB]; compression zstd level 3–6 or lz4hc level 4–12. - Preprocessing
Detrend/delta: x' = x - median(x); delta coding often boosts r_c.
Quantize-then-compress: when ENOB << ADC_bits, losslessly requantize to ENOB first.
VII. Transport, Channels, and Backpressure (Aligned with Core.Threads)
- Channel parameters
chan, cap, q_len, bp (backpressure). Overflow policy explicit: {drop_oldest|drop_newest|block|spill}. - Backpressure function
Example: bp = f(q_len, cap, W_q) = min( 1, q_len / cap ) for the rate limiter to read (see I70-6). - Stability checks
Periodically compute rho = lambda / mu; if rho >= 1, raise alert and trigger rate_limiter or increase cap. - Batching and ACK
Larger batch_size improves BW_wire_eff; ACK with eid and trace_link to preserve causality (see I70-7).
VIII. Partitioning and Naming (partition/layout)
- Path template
dataset=/sea/{project}/{sensor_family}/sid={sid}/date={YYYY-MM-DD}/hour={HH}/. - Partition keys
MUST include sid and time grain date/hour; optional: model/region/schema_ver. - Small-file control
Enforce files_per_partition <= F_max; if N_files_day >> F_max, increase B_target or run object compaction. - Time zone and leap seconds
Partitions use UTC; leap seconds are tracked in ts. Objects spanning an hour boundary MUST record ts_range in the manifest.
IX. Manifests and Lineage
- Dataset-level manifest (Tier-1)
{dataset_id, schema_id, ver, fmt, compress, S_target_object, partition_template, producer, SLO:{P99,E2E,ErrRate}, lineage:{parents}, created_ts}. - Object-level manifest (Tier-2)
{object_id, sid, ts_range:{t0,t1}, N_records, S_chunk_raw, S_chunk_comp, r_c, hash_sha256, idemp_key, q_score, missing:m, env:RefCond, delta_form?}. - Linkage
dep(u,v) captures derivations: lake_raw -> lake_refined -> lake_feature. All corr_env, FFT/PSD transforms must be registered.
X. Verification and Consistency
- Integrity
Content address cid = sha256(payload); commit via atomic rename or single-object multipart finalize. - Ordering
Ensure non-decreasing tau_mono within a partition; in reordering scenarios, prove causality via hb links and eid. - Idempotency and dedup
Deduplicate by idemp_key; set Delta_t_dedup to cover ts_range plus worst-case retry latency. - End-to-end checks
Optional HMAC(key, cid); cross-segment failures go to staging for human review.
XI. SLI/SLO and Budgeting
- SLIs
IngestQPS (chunks/s); E2E_Latency = ts_commit - ts_edge; P99_E2E; ErrRate; SmallFileRate; Lag = now - max(ts_commit). - Budget model
CPU_budget and IO_budget in terms of R_cpu/R_io; compression level and B_target jointly drive T_comp/T_fs. - Target suggestions
P99_E2E <= 5 * B_target / BW_wire_eff + epsilon; SmallFileRate <= 1%; ErrRate <= 1e-4 (per object). - Alerts and gates
If rho >= 0.8 or Lag > Lag_thr, trigger scale-up/rate-limit; if SmallFileRate > thr, start compaction.
XII. Security, Governance, and Retention
- Access control
Object-level ACLs; field-level masking and labeling (e.g., pii:true). - Versioning and retention
retention_policy = {raw:days_ref, refined:days_feat}; ensure reproducible lineage before expiry. - Audit
Log producer_id, span_id, eid; timestamp/signature for critical operations.
XIII. Workflow Mx-7 (Ingestion)
- Edge acquisition: produce chunk, optionally run corr_env, compute idemp_key and hash_sha256.
- Serialization: call I80-8 serialize(data, fmt="parquet", compress="zstd"); read B_target and page settings from the dataset manifest.
- Transport & backpressure: send via chan to staging; compute rho/lambda/mu and W_q in real time; engage rate_limiter if required.
- Validation & commit: in staging, verify cid and schema; upon success, atomically commit into the lake_raw partition.
- Manifest & observability: call I80-8 export_manifest(data) for the object manifest; metric_emit / trace_span and link eid.
- Compaction & governance: periodically compact small files; update dataset manifest and lineage; evaluate SLIs and trigger alerts/gates.
XIV. Interface Bindings (I80-8 and Related)
- serialize(data:any, fmt:str="parquet", compress:str|None=None) -> bytes
Must support optional B_target/page_size/row_group_size; return should include hash_sha256. - export_manifest(data:any) -> dict
Emit object-level manifest with fields enumerated in Section IX; co-located as sidecar metadata. - import_manifest(manifest:dict) -> any
Validate schema_id/ver/units; provide strict|lenient modes for missing fields and defaults. - Thread tracing (I70-7)
trace_span("ingest", attrs={sid, object_id}) and trace_link(span, eid) to build the cross-system hb chain.
XV. Example Configuration (Suggested Defaults)
- B_target=16 MiB, page_size=256 KiB, row_group_size=128 MiB (columnar bulk alignment), compress="zstd-5".
- Partitions: sid/date/hour; S_target_object=128 MiB; F_max=256 files/day/partition.
- Dedup: Delta_t_dedup=24 h; idemp_key=sha256(sid||ts_range||hash_sha256).
- SLO: P99_E2E<=60 s, ErrRate<=1e-4, SmallFileRate<=1%, Lag_thr=5 min.
XVI. Interlocks and Cross-Volume References
- With Chapter 2: ENOB/DR affect compression ratio r_c and B_target; sigma_x sets the entropy bound.
- With Chapter 3: dual clocks tau_mono/ts for latency and audit; offset/skew/J feed ingestion jitter analysis.
- With Chapter 4: if H(f) deconvolution runs pre-lake, the manifest MUST annotate the convention.
- With Chapter 5: PSD/feature-derived objects land in lake_refined/feature with lineage recorded.
- With Chapter 6: RefCond and delta_form must be propagated in object manifests to support T_arr alignment and recomputation.