Chapter 18 Appendix: Pipeline Templates
I. Template Scope & Posture
use frozen indices.must—for YAML/JSON pipelines. Keys use snake_case; cross-volume citations use “Volume vX.Y:Anchor”; units follow SI with check_dim=true; splits used for evaluation/export full skeleton and minimalProvide two drop-in templates—II. Minimal Template (copy-paste ready)
pipeline:
id: "eift.ingest-validate-transform-export"
version: "v1.0"
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params:
endpoint: "https://s3.amazonaws.com"
bucket_or_db: "eift-data"
prefix_or_table: "raw/2025/09/"
query_or_pattern: "*.jsonl"
credentials_ref: "secrets://aws/ingest_ro"
format: "json"
outputs: ["raw_blob"]
idempotent: true
retries: {max: 3, backoff: "expo", jitter_ms: 200}
timeout_s: 1800
- name: "validate"
stages:
- name: "dq.scan"
type: "validate.dq"
impl: "I16-7.dq_scan"
inputs: ["raw_blob"]
outputs: ["dq_report"]
schema_ref: "contracts/raw_json@v1.0"
dq:
sample: {rows: 100000, strategy: "stratified"}
significance: {alpha: 0.05}
gates:
- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}
- name: "transform"
stages:
- name: "standardize"
type: "transform.normalize"
impl: "I16-3.standardize"
inputs: ["raw_blob"]
outputs: ["std_rows"]
params: {method:"zscore", stats_from:"train-only"}
idempotent: true
- name: "export"
stages:
- name: "split.package"
type: "export.splits"
impl: "I16-5.split_package"
inputs: ["std_rows"]
outputs: ["train_pkg","val_pkg","test_pkg"]
splits:
train: {ratio: 0.8}
validation: {ratio: 0.1}
test: {ratio: 0.1}
policy:
leakage_guard: ["per-object","per-timewindow"]
freeze_indices: true
edges:
- {from:"src.s3.pull:raw_blob", to:"dq.scan:raw_blob"}
- {from:"dq.scan:dq_report", to:"standardize:raw_blob"}
- {from:"standardize:std_rows", to:"split.package:std_rows"}
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
III. Full Skeleton Template (release-grade, with optional extensions)
pipeline:
id: "<org.project.pipeline>"
version: "v1.0.0"
orchestration:
orchestrator: "airflow|argo|ray|custom"
dag: {max_concurrency: 128, backfill:{enabled:true, window:"P7D"}}
dependencies: []
triggers:
cron: "5 * * * *"
# event: {source:"kafka", topic:"ds.ready", group:"pipeline-consumer"}
scheduling:
queue: "default"
priority: 5
preempt: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 3600
sla: {latency_ms:{p50:5000,p95:15000,p99:30000}, availability:0.999, error_rate:0.01}
alert_rules:
- {name:"sla_breach_p99", rule:"latency_ms.p99>30000 for 10m", severity:"high"}
resources:
requests: {cpu:4, mem_gb:16, gpu:0}
limits: {cpu:8, mem_gb:32, gpu:0}
disk_gb: 200
net_mbps: 800
qos: "burstable"
layers:
- name: "ingest"
stages:
- name: "<src.kind.name>"
type: "source.<s3|gcs|fs|db|kafka|http|custom>"
impl: "I16-1.<impl_id>"
params:
endpoint: "<url-or-bootstrap>"
bucket_or_db: "<bucket|db>"
prefix_or_table: "<prefix|schema.table>"
query_or_pattern: "<sql|glob>"
credentials_ref: "secrets://path/to/credential"
format: "<json|parquet|csv|avro|binary>"
watermark: {field:"<updated_at|offset|lsn>", start:"<ISO8601|offset>", step:"<PT5M|1000>"}
checkpoint: {path:"s3://.../chk/<stage>", mode:"exactly-once|at-least-once"}
dedupe_key: ["<pk>","<ts>"]
outputs: ["raw_blob|raw_rows|events"]
idempotent: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 1800
on_fail: "quarantine|skip|block"
- name: "validate"
stages:
- name: "schema.check"
type: "validate.schema"
impl: "I16-2.schema_check"
inputs: ["raw_blob"]
outputs: ["raw_rows"]
schema_ref: "contracts/raw_rows@vX.Y"
- name: "dq.scan"
type: "validate.dq"
impl: "I16-7.dq_scan"
inputs: ["raw_rows"]
outputs: ["dq_report"]
schema_ref: "contracts/raw_rows@vX.Y"
dq:
sample: {rows: 50000, strategy:"stratified"}
significance: {alpha: 0.05}
gates:
- {id:"DQ_001", kind:"not_null", cols:["id","ts"], level:"block"}
- {id:"DQ_002", kind:"unique", cols:[["id","ts"]], level:"block"}
- name: "transform"
stages:
- name: "normalize"
type: "transform.normalize"
impl: "I16-3.standardize"
inputs: ["raw_rows"]
outputs: ["std_rows"]
params: {method:"zscore", stats_from:"train-only"}
idempotent: true
schema_ref: "contracts/std_rows@vX.Y"
- name: "feature"
stages:
- name: "feat.map"
type: "feature.map"
impl: "I16-4.feature_map"
inputs: ["std_rows"]
outputs: ["feat_rows"]
params:
key: ["entity_id","ts"]
point_in_time: {enabled:true, lookback:"P30D", tolerance:"PT5M"}
aggregate: {window:"P1D", funcs:["mean","std","count"], fillna:{method:"pad"}}
idempotent: true
schema_ref: "contracts/feat_rows@vX.Y"
feature_space: {type:"tabular", shape:"(N,D)", dtype:"float32", normalization:"zscore"}
- name: "export"
stages:
- name: "split.package"
type: "export.splits"
impl: "I16-5.split_package"
inputs: ["feat_rows"]
outputs: ["train_pkg","val_pkg","test_pkg"]
splits:
train: {ratio: 0.8}
validation: {ratio: 0.1}
test: {ratio: 0.1}
policy:
sampling:
strategy: "random|stratified|time-based|spatial-tiles|systematic"
strata: [{by:"class|region|snr_bin", buckets: {"A":100,"B":200}}]
leakage_guard: ["per-object","per-timewindow","per-scene"]
freeze_indices: true
distribution:
packaging: {format:"tgz|parquet|zarr", shard_bytes:134217728, layout:["train","validation","test"]}
mirrors: ["https://mirror-a.example/ds/foo/","s3://bucket/foo/"]
rate_limit: {mbps: 50}
checksums:
package: {sha256: "<hex>"}
shards:
- {path:"train-000.tgz", sha256:"<hex>"}
edges: []
monitoring:
metrics:
perf:
- {name:"qps", unit:"1/s", agg:"sum", window:"1m"}
- {name:"latency_ms.p99", unit:"ms", agg:"quant", window:"1m"}
metrology: {units:"SI", check_dim:true}
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
- {path:"contracts/raw_rows.schema.json", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
IV. Placeholder Hints & Minimal Regex (quick ref)
- pipeline.id: ^[a-z0-9_\\-\\.]+$; pipeline.version: ^v\\d+\\.\\d+(\\.\\d+)?$.
- export_manifest.references[*]: ^[^:]+ v\\d+\\.\\d+:[A-Z].+$.
- Split ratios sum: 1±1e-6; policy.freeze_indices:true; leakage_guard contains at least one of per-object|per-timewindow|per-scene.
- Metrology: metrology.units="SI" and check_dim=true.
V. Export Manifest Template (Normative)
export_manifest:
version: "v1.0"
artifacts:
- {path:"pipeline.yaml", sha256:"<hex>"}
- {path:"splits/train.index", sha256:"<hex>"}
- {path:"splits/validation.index", sha256:"<hex>"}
- {path:"splits/test.index", sha256:"<hex>"}
- {path:"packages/train-000.tgz", sha256:"<hex>"}
- {path:"dq/report.jsonl", sha256:"<hex>"}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.DatasetCards v1.0:Ch.11"
- "EFT.WP.Data.ModelCards v1.0:Ch.11"
VI. Pre-Release Blocking Self-Check (list)
- Structure/required: pipeline.id/version/layers/edges plus metrology/export_manifest present; Schema validation passes.
- Citations/versioning: export_manifest.references[] use “Volume vX.Y:Anchor”; no shortcodes/missing versions.
- Metrology/units: units="SI", check_dim=true; consistent units for performance/network/storage.
- Topology/splits/leakage: Σ_out→Σ_in compatible; split ratios sum to 1; indices frozen; leakage guardrails active.
- Security/credentials: sources use credentials_ref only; no plaintext secrets; access & network restrictions in effect.
- Verifiable artifacts: all files in export_manifest carry sha256 and are reproducible.
VII. Machine-Readable Blank Template (no-comments; CI-friendly)
pipeline:
id: ""
version: "v1.0"
orchestration: {orchestrator:"airflow", dag:{max_concurrency:64, backfill:{enabled:false}}}
scheduling: {queue:"default", priority:5, preempt:true, retries:{max:3, backoff:"expo", jitter_ms:200}, timeout_s:3600}
resources: {requests:{cpu:1, mem_gb:4, gpu:0}, limits:{cpu:2, mem_gb:8, gpu:0}, disk_gb:50, net_mbps:200, qos:"burstable"}
layers: []
edges: []
monitoring: {metrics:{}, logs:{format:"jsonl", retention:"P30D"}}
metrology: {units:"SI", check_dim:true}
export_manifest: {version:"v1.0", artifacts: [], references:["EFT.WP.Core.DataSpec v1.0:EXPORT","EFT.WP.Core.Metrology v1.0:check_dim"]}