I. Chapter Purpose & Scope
layer’s specifications and engineering practices: connector types, credentials & security, idempotency/retry/checkpointing, dedup & dedupe keys, throughput & latency metrology, contract alignment (Σ_in/Σ_out), exception handling and audit exports; ensure consistency with Dataset/Model Cards, the Metrology chapter, and citation anchors.Sources & IngestFix theII. Terminology & Dependencies
- Terms: source/connector, ingest, checkpoint, cursor/watermark, idempotency, dedupe_key, SLA/SLO, orchestrator.
- Dependencies: data contracts & exports in Core.DataSpec v1.0; units & dimensional checks in Core.Metrology v1.0; split/quality alignment in DatasetCards v1.0; feature/I-O alignment in ModelCards v1.0.
- Math & symbols: wrap inline symbols like QPS, T_inf, ρ in backticks; any division/integral/composite operator must use parentheses; if path quantities such as T_arr are involved, register gamma(ell) and d ell; no Chinese in formulas/symbols/definitions.
III. Fields & Structure (Normative)
stage:
name: "<src.kind.name>"
type: "source.<s3|gcs|fs|db|kafka|http|custom>"
impl: "I16-1.<impl_id>"
params:
endpoint: "<url-or-bootstrap>"
bucket_or_db: "<bucket|db>"
prefix_or_table: "<prefix|schema.table>"
query_or_pattern: "<sql|glob>"
credentials_ref: "secrets://path/to/credential"
format: "<json|parquet|csv|avro|binary>"
watermark:
field: "<updated_at|offset|lsn>"
start: "<ISO8601|offset>"
step: "<PT5M|1000>"
checkpoint:
path: "s3://.../chk/<stage>"
mode: "exactly-once|at-least-once"
dedupe_key: ["<pk>", "<ts>"]
outputs: ["raw_blob|raw_rows|events"]
idempotent: true
retries: {max: 3, backoff: "expo", jitter_ms: 200}
timeout_s: 1800
on_fail: "quarantine|skip|block"
schema_ref: "<contracts/raw@vX.Y>"
IV. Connector Types & Specifications
- Object storage / filesystem (source.s3|gcs|fs): support sharding and resume; prefix + pattern explicitly bound the scan domain; record etag/mtime/size to enable dedup.
- Databases (source.db): enforce change watermark (watermark.field) or change log (lsn/scn); SQL must be parameterized and bounded; export a table-schema snapshot.
- Messaging/stream (source.kafka|pulsar): consumer group and offset management; parallelism by partition; provide replay policy and a max-lag threshold.
- HTTP/custom (source.http|custom): throttling & rate limits; idempotent signatures or dedupe keys; error classification and fallback.
- Format & contract: format is tightly bound to schema_ref; for semi-structured/binary inputs, decoding and validation must be completed downstream in validate.
V. Idempotency, Retry & Checkpointing
- Idempotency keys: prefer source primary key/offset/watermark (dedupe_key); use idempotent insert or UPSERT on the sink to avoid duplicates.
- Retry: exponential backoff (backoff:"expo") + jitter; error classes (retryable/non-retryable/escalate); both per-attempt and overall time limits (timeout_s) must be explicit.
- Checkpoint: checkpoint.mode:"exactly-once|at-least-once" with atomic commit semantics (commit includes: offset, cursor, and sink completion marker).
VI. Dedup & Ordering Guarantees
- Dedup: batch—by dedupe_key + watermark; stream—monotonic offsets per partition with idempotent aggregation.
- Ordering: when strict order is required, declare partition key, sort field, and lateness window; otherwise state “in-partition ordered, cross-partition unordered”.
VII. Metrology & Units (SI)
- Throughput: QPS = ( count / Δt ) (1/s).
- Latency: T_inf in ms, report {p50,p95,p99}.
- Utilization: ρ = ( λ / μ ).
- Bandwidth/storage: net_mbps, size_bytes.
- metrology:{units:"SI", check_dim:true} is mandatory; normalize units first before any composition.
VIII. Security, Credentials & Compliance
- Credentials: refer only via credentials_ref; plaintext secrets are forbidden; support rotation and least-privilege (read-only where possible).
- Privacy: when PII/sensitive fields exist, enforce minimization and de-identification; regional limits (e.g., EU-GDPR) appear in export_manifest.references[].
- Audit: log all connect/pull/failure events to a security audit stream; severe failures route to on_fail:"quarantine" and trigger alerts.
IX. Machine-Readable Fragment (Drop-in)
layers:
- name: "ingest"
stages:
- name: "src.s3.pull"
type: "source.s3"
impl: "I16-1.s3_pull"
params:
endpoint: "https://s3.amazonaws.com"
bucket_or_db: "eift-data"
prefix_or_table: "raw/2025/09/"
query_or_pattern: "*.jsonl"
credentials_ref: "secrets://aws/ingest_ro"
format: "json"
watermark: {field:"updated_at", start:"2025-09-01T00:00:00Z", step:"PT5M"}
checkpoint: {path:"s3://eift-meta/chk/src.s3.pull", mode:"at-least-once"}
dedupe_key: ["id","updated_at"]
outputs: ["raw_blob"]
idempotent: true
retries: {max:3, backoff:"expo", jitter_ms:200}
timeout_s: 1800
on_fail: "quarantine"
schema_ref: "contracts/raw_json@v1.2"
X. Lint Rules (Excerpt, Normative)
lint_rules:
- id: SRC.TYPE_ALLOWED
when: "$.layers[*].stages[*].type"
assert: "value in ['source.s3','source.gcs','source.fs','source.db','source.kafka','source.http','source.custom']"
level: error
- id: SRC.CREDENTIALS_REF
when: "$.layers[*].stages[?(@.type^='source.')].params"
assert: "has_key('credentials_ref') and not has_key('plain_secret')"
level: error
- id: SRC.CHECKPOINT_DEFINED
when: "$.layers[*].stages[?(@.type^='source.')].params"
assert: "has_key('checkpoint') and has_key('watermark')"
level: error
- id: SRC.DEDUPE_OR_EXACTLY_ONCE
when: "$.layers[*].stages[?(@.type^='source.')]"
assert: "has_key('params.dedupe_key') or $.params.checkpoint.mode == 'exactly-once'"
level: error
- id: METROLOGY.SI_AND_CHECKDIM
when: "$.metrology"
assert: "units=='SI' and check_dim==true"
level: error
XI. Export Manifest & Audit Trail
export_manifest:
version: "v1.0"
artifacts:
- {path:"ingest/pulled.manifest.json", sha256:"..."}
- {path:"ingest/checkpoint.meta.json", sha256:"..."}
- {path:"security/audit.log", sha256:"..."}
references:
- "EFT.WP.Core.DataSpec v1.0:EXPORT"
- "EFT.WP.Core.Metrology v1.0:check_dim"
- "EFT.WP.Data.DatasetCards v1.0:Ch.6"
XII. Chapter Compliance Checklist
- Connector type/impl/params complete; credentials_ref valid; no plaintext secrets present.
- watermark and checkpoint configured; dedupe_key or exactly-once guarantees non-duplication.
- Retry/timeout/idempotency policies explicit; failure levels and on_fail behavior traceable.
- metrology.units="SI" and check_dim=true; units consistent for QPS/T_inf/ρ/net_mbps.
- schema_ref aligns with upstream contracts; audit artifacts and citation anchors are listed in export_manifest with sha256.