Build versioned AI datasets with schema gates, grouped splits, contamination checks, and auditable receipts.
The last chapter trained a classifier from prepared tensors. This chapter asks what must be true before a ticket is allowed to become one of those tensors.
A support model can appear impressive for the wrong reason. If a refund message, a paraphrase of it, or another message from the same conversation is present in both training and evaluation, the score measures memory or customer overlap instead of useful generalization. A dataset pipeline is the code and evidence trail that turns raw records into rows a model may learn from and held-out rows a reviewer may trust.
That idea scales past this small classifier. Pretraining corpora, fine-tuning examples, retrieval evaluations, and agent benchmarks all need provenance, cleaning rules, frozen splits, and contamination checks. A model result is only as credible as the data path beneath it.
We'll build a dataset for the ticket-escalation classifier from the previous chapter. Each raw row was exported from a support conversation:
| Field | Meaning | Why it matters |
|---|---|---|
ticket_id | Unique exported row ID | Lets a reviewer trace rejects back to source. |
conversation_id | All turns from one customer case | Must stay in one split to avoid conversation leakage. |
text | Agent-visible customer message | Becomes model input after normalization. |
label | standard or escalate | Becomes supervised target after review. |
Some rows aren't safe:
| Ticket | Raw message | Label | Problem |
|---|---|---|---|
| 101 | Refund is still missing | escalate | Valid reviewed example. |
| 102 | REFUND is still missing | escalate | Same message after safe normalization. |
| 103 | Where is my delivery? | standard | Valid reviewed example. |
| 104 | Package marked delivered, not here | None | Missing label. |
Start with a schema gate. It catches absent fields, blank messages, and labels outside the reviewed vocabulary. Schema validation can't prove that a label is correct, but it prevents malformed rows from silently becoming model supervision.
1raw_rows = [
2 {"ticket_id": 101, "conversation_id": "c-51", "text": "Refund is still missing", "label": "escalate"},
3 {"ticket_id": 102, "conversation_id": "c-51", "text": " REFUND is still missing ", "label": "escalate"},
4 {"ticket_id": 103, "conversation_id": "c-52", "text": "Where is my delivery?", "label": "standard"},
5 {"ticket_id": 104, "conversation_id": "c-53", "text": "Package marked delivered, not here", "label": None},
6]
7required = {"ticket_id", "conversation_id", "text", "label"}
8labels = {"standard", "escalate"}
9
10def reject_reason(row: dict) -> str | None:
11 if not required.issubset(row):
12 return "missing_field"
13 if not isinstance(row["text"], str) or not row["text"].strip():
14 return "blank_text"
15 if row["label"] not in labels:
16 return "invalid_label"
17 return None
18
19accepted = [row["ticket_id"] for row in raw_rows if reject_reason(row) is None]
20rejected = [(row["ticket_id"], reject_reason(row)) for row in raw_rows if reject_reason(row)]
21print("accepted:", accepted)
22print("rejected:", rejected)1accepted: [101, 102, 103]
2rejected: [(104, 'invalid_label')]Tickets often differ only because one export added spaces or changed letter case. For this routing fixture, Unicode compatibility normalization, case-folding, and whitespace collapse are a reviewable starting rule for revealing obvious copies. Normalization is task-specific: confirm that it doesn't erase distinctions your label depends on. It doesn't justify deleting prices, order identifiers, dates, or negation: refund received and refund not received must remain different examples.
Once text is normalized, a deterministic hash becomes an exact fingerprint. It isn't a semantic similarity score. It says only that two normalized strings match byte for byte.
1import hashlib
2import unicodedata
3
4def normalize_text(text: str) -> str:
5 normalized = unicodedata.normalize("NFKC", text)
6 return " ".join(normalized.casefold().split())
7
8def fingerprint(text: str) -> str:
9 return hashlib.sha256(normalize_text(text).encode("utf-8")).hexdigest()[:12]
10
11messages = [
12 "Refund is still missing",
13 " REFUND is still missing ",
14 "Refund is not missing",
15]
16for message in messages:
17 print(normalize_text(message), fingerprint(message))
18print("first two duplicate:", fingerprint(messages[0]) == fingerprint(messages[1]))
19print("negation preserved:", fingerprint(messages[0]) != fingerprint(messages[2]))1refund is still missing 835272638bf0
2refund is still missing 835272638bf0
3refund is not missing ed6b9c6cad4c
4first two duplicate: True
5negation preserved: TrueThe shortened hashes make the output readable. Store the full digest in a released artifact so display truncation doesn't weaken the identity check.
Dropping a repeated row is safe only when its supervision agrees. Suppose two identical ticket texts receive different labels. One of three things may be true: an annotation is wrong, the route changed with hidden context, or the feature set omitted a decisive field such as order value or policy status.
A careful pipeline quarantines that fingerprint instead of picking the first label and pretending the conflict didn't happen.
1from collections import defaultdict
2import hashlib
3
4def key(text: str) -> str:
5 clean = " ".join(text.casefold().split())
6 return hashlib.sha256(clean.encode()).hexdigest()[:10]
7
8rows = [
9 {"ticket_id": 201, "text": "Refund pending for 12 days", "label": "escalate"},
10 {"ticket_id": 202, "text": " refund pending for 12 days ", "label": "escalate"},
11 {"ticket_id": 203, "text": "Refund pending for 12 days", "label": "standard"},
12 {"ticket_id": 204, "text": "Tracking link updated", "label": "standard"},
13]
14groups = defaultdict(list)
15for row in rows:
16 groups[key(row["text"])].append(row)
17
18kept = []
19quarantined = []
20for duplicate_group in groups.values():
21 observed_labels = {row["label"] for row in duplicate_group}
22 if len(observed_labels) > 1:
23 quarantined.extend(row["ticket_id"] for row in duplicate_group)
24 else:
25 kept.append(duplicate_group[0]["ticket_id"])
26
27print("kept representative rows:", kept)
28print("quarantined label conflict:", quarantined)1kept representative rows: [204]
2quarantined label conflict: [201, 202, 203]This is a small but important shift in thinking. Data cleaning isn't only deletion. It's investigation: the pipeline should expose cases that could teach you the current label definition or feature schema is incomplete.
The earlier validation chapter introduced a train, validation, and test split. Data pipelines must decide the split unit: the entity that isn't allowed to cross those boundaries.
For support tickets, splitting by row is too weak. Multiple turns from the same conversation share customer details and issue history. If one turn trains the model while another appears in test, evaluation becomes easier than a truly new conversation. Split on conversation_id, not ticket_id.
A stable hash assignment gives every conversation the same bucket on every rerun. The cryptographic hash here isn't protecting a secret; it produces repeatable buckets from a frozen rule.
1import hashlib
2
3def split_for_conversation(conversation_id: str) -> str:
4 bucket = int(hashlib.sha256(conversation_id.encode()).hexdigest()[:8], 16) % 100
5 if bucket < 70:
6 return "train"
7 if bucket < 85:
8 return "validation"
9 return "test"
10
11rows = [
12 {"ticket_id": 301, "conversation_id": "return-001"},
13 {"ticket_id": 315, "conversation_id": "return-001"},
14 {"ticket_id": 303, "conversation_id": "delivery-001"},
15 {"ticket_id": 304, "conversation_id": "billing-002"},
16]
17assignments = [(row["ticket_id"], split_for_conversation(row["conversation_id"])) for row in rows]
18same_conversation_stays_together = assignments[0][1] == assignments[1][1]
19print("assignments:", assignments)
20print("same conversation stays together:", same_conversation_stays_together)1assignments: [(301, 'train'), (315, 'train'), (303, 'validation'), (304, 'test')]
2same conversation stays together: TrueA hash rule keeps assignments stable, but it doesn't guarantee exact split sizes or label balance. Audit the resulting counts. If a release needs better coverage, freeze a documented group-aware assignment rule and version the change.
If a training record duplicates a held-out record, you already know the metric is compromised. In language-model work this problem appears at a larger boundary: web-scale pretraining data can contain benchmark prompts, answers, or close variants. The evaluation set then no longer measures behavior on genuinely unseen tasks.
Brown et al. performed n-gram overlap analysis between GPT-3 training data and evaluation sets. Their paper reports that a filtering bug left near-complete overlap for several language-modeling benchmarks and the Children's Book Test, so those results were omitted from aggregate reporting.[1] This is what scientific hygiene looks like: measure contamination, document it, and withhold claims when the test is no longer clean.
Our first guard is exact overlap between prepared training text and locked evaluation text.
1import hashlib
2
3def clean(text: str) -> str:
4 return " ".join(text.casefold().split())
5
6def fp(text: str) -> str:
7 return hashlib.sha256(clean(text).encode()).hexdigest()
8
9training = ["refund pending for 12 days", "tracking link updated"]
10locked_eval = ["REFUND pending for 12 days", "courier never arrived"]
11train_keys = {fp(text) for text in training}
12collisions = [text for text in locked_eval if fp(text) in train_keys]
13
14print("exact collisions:", collisions)
15print("release evaluation:", len(collisions) == 0)1exact collisions: ['REFUND pending for 12 days']
2release evaluation: FalseAn exact guard is necessary but not sufficient. A benchmark row may be lightly edited while preserving the task and answer.
Represent a message as a set of adjacent word pairs, called shingles. The Jaccard similarity between two sets is:
The numerator counts shared shingles; the denominator counts all unique shingles across both messages. A score near 1.0 means surface text is highly similar. Broder defined shingle-set resemblance and showed how compact per-document sketches can estimate it without comparing full documents, so this idea could scale past exhaustive pairwise comparison.[2]
For refund has not arrived and my refund has not arrived, the first set has three word pairs and the second has four. Three are shared, so similarity is 3 / 4 = 0.75.
1def shingles(text: str, width: int = 2) -> set[tuple[str, ...]]:
2 words = text.casefold().split()
3 if not words:
4 return set()
5 if len(words) < width:
6 return {tuple(words)}
7 return {tuple(words[index:index + width]) for index in range(len(words) - width + 1)}
8
9def jaccard(left: str, right: str) -> float:
10 a = shingles(left)
11 b = shingles(right)
12 union = a | b
13 return len(a & b) / len(union) if union else 0.0
14
15train_text = "refund has not arrived"
16eval_text = "my refund has not arrived"
17score = jaccard(train_text, eval_text)
18print("shared pair score:", round(score, 2))
19print("send to human review:", score >= 0.70)1shared pair score: 0.75
2send to human review: TrueShort messages need explicit handling because they may contain fewer words than the requested shingle width. Blank messages should already have failed the schema gate. Don't automatically delete every high-similarity example. Templates may be legitimate in training, and false matches can erase rare issues. Near-duplicate screening should produce a review queue or a documented rule tuned against real examples.
Now put the mechanics together. This compact artifact build repeats the conflict check inside the release path so the earlier quarantine rule can't be skipped. It:
The manifest is a small dataset receipt. Gebru et al. propose datasheets that document a dataset's motivation, composition, collection process, and recommended uses so consumers can judge whether it fits their task.[3] A production dataset deserves a full datasheet; this lab begins with the fields your next model run needs immediately.
1import hashlib
2import json
3import unicodedata
4from collections import Counter, defaultdict
5from pathlib import Path
6
7raw_rows = [
8 {"ticket_id": 401, "conversation_id": "c-a", "text": "Refund is still missing", "label": "escalate"},
9 {"ticket_id": 402, "conversation_id": "c-a", "text": " REFUND is still missing ", "label": "escalate"},
10 {"ticket_id": 403, "conversation_id": "c-i", "text": "Tracking page shows delayed", "label": "standard"},
11 {"ticket_id": 404, "conversation_id": "c-c", "text": "Package marked delivered, not here", "label": None},
12 {"ticket_id": 405, "conversation_id": "c-002", "text": "Return label will not open", "label": "standard"},
13 {"ticket_id": 406, "conversation_id": "c-e", "text": "Charged twice for one refund", "label": "escalate"},
14 {"ticket_id": 407, "conversation_id": "c-l", "text": "Delivery arrived this morning", "label": "standard"},
15 {"ticket_id": 408, "conversation_id": "return-003", "text": "Refund overdue after approval", "label": "escalate"},
16 {"ticket_id": 409, "conversation_id": "c-m", "text": "Address update pending review", "label": "standard"},
17 {"ticket_id": 410, "conversation_id": "c-m", "text": "Address update pending review", "label": "escalate"},
18]
19valid_labels = {"standard", "escalate"}
20required_fields = {"ticket_id", "conversation_id", "text", "label"}
21
22def normalize_text(text: str) -> str:
23 return " ".join(unicodedata.normalize("NFKC", text).casefold().split())
24
25def text_fingerprint(text: str) -> str:
26 return hashlib.sha256(text.encode("utf-8")).hexdigest()
27
28def reject_reason(row: dict) -> str | None:
29 if not required_fields.issubset(row):
30 return "missing_field"
31 if not isinstance(row["ticket_id"], int):
32 return "invalid_ticket_id"
33 if not isinstance(row["conversation_id"], str) or not row["conversation_id"].strip():
34 return "invalid_conversation_id"
35 if not isinstance(row["text"], str) or not row["text"].strip():
36 return "blank_text"
37 if row["label"] not in valid_labels:
38 return "invalid_label"
39 return None
40
41def stable_split(conversation_id: str) -> str:
42 bucket = int(hashlib.sha256(conversation_id.encode()).hexdigest()[:8], 16) % 100
43 if bucket < 70:
44 return "train"
45 if bucket < 85:
46 return "validation"
47 return "test"
48
49rejected = []
50prepared_by_fingerprint = defaultdict(list)
51for row in raw_rows:
52 reason = reject_reason(row)
53 if reason:
54 rejected.append({"ticket_id": row.get("ticket_id"), "reason": reason})
55 continue
56 text = normalize_text(row["text"])
57 key = text_fingerprint(text)
58 prepared_by_fingerprint[key].append({
59 "ticket_id": row["ticket_id"],
60 "conversation_id": row["conversation_id"],
61 "text": text,
62 "label": row["label"],
63 "split": stable_split(row["conversation_id"]),
64 "text_sha256": key,
65 })
66
67kept = []
68for duplicate_group in prepared_by_fingerprint.values():
69 labels = {row["label"] for row in duplicate_group}
70 if len(labels) > 1:
71 rejected.extend(
72 {"ticket_id": row["ticket_id"], "reason": "label_conflict"}
73 for row in duplicate_group
74 )
75 continue
76 kept.append(duplicate_group[0])
77 rejected.extend(
78 {"ticket_id": row["ticket_id"], "reason": "exact_duplicate"}
79 for row in duplicate_group[1:]
80 )
81
82output = Path("artifacts")
83output.mkdir(exist_ok=True)
84rows_path = output / "support_tickets.v1.jsonl"
85rejects_path = output / "support_tickets.v1.rejects.jsonl"
86rows_path.write_text("".join(json.dumps(row, sort_keys=True) + "\n" for row in kept), encoding="utf-8")
87rejects_path.write_text("".join(json.dumps(row, sort_keys=True) + "\n" for row in rejected), encoding="utf-8")
88artifact_sha256 = hashlib.sha256(rows_path.read_bytes()).hexdigest()
89manifest = {
90 "dataset_id": "support-ticket-routing",
91 "version": "v1",
92 "pipeline_version": "normalize-nfkc-casefold-space__dedup-quarantine-sha256__split-conversation-v1",
93 "rows_raw": len(raw_rows),
94 "rows_kept": len(kept),
95 "reject_reasons": dict(Counter(row["reason"] for row in rejected)),
96 "split_counts": dict(Counter(row["split"] for row in kept)),
97 "artifact_sha256": artifact_sha256,
98}
99(output / "support_tickets.v1.manifest.json").write_text(
100 json.dumps(manifest, indent=2, sort_keys=True) + "\n",
101 encoding="utf-8",
102)
103print("kept rows:", manifest["rows_kept"], "rejected:", manifest["reject_reasons"])
104print("split counts:", manifest["split_counts"])
105print("artifact digest prefix:", artifact_sha256[:16])1kept rows: 6 rejected: {'invalid_label': 1, 'exact_duplicate': 1, 'label_conflict': 2}
2split counts: {'train': 2, 'validation': 2, 'test': 2}
3artifact digest prefix: a346f8fcbec89f8cThe printed digest prefix is for human scanning; the manifest stores the full SHA-256 digest. The build groups fingerprints before choosing representatives, so agreeing exact copies are dropped while every row in a conflicting fingerprint group is quarantined. Preserve all rejection records for review.
An artifact isn't trustworthy because it exists. Load it as a downstream training job would and fail early when its contract is broken.
1import hashlib
2import json
3from collections import defaultdict
4
5rows = [json.loads(line) for line in rows_path.read_text(encoding="utf-8").splitlines()]
6saved_manifest = json.loads((output / "support_tickets.v1.manifest.json").read_text(encoding="utf-8"))
7groups = defaultdict(set)
8for row in rows:
9 groups[row["conversation_id"]].add(row["split"])
10
11no_group_leakage = all(len(splits) == 1 for splits in groups.values())
12digest_matches = hashlib.sha256(rows_path.read_bytes()).hexdigest() == saved_manifest["artifact_sha256"]
13labels_valid = {row["label"] for row in rows} <= valid_labels
14fingerprints_unique = len({row["text_sha256"] for row in rows}) == len(rows)
15
16assert no_group_leakage and digest_matches and labels_valid and fingerprints_unique
17print("no conversation leakage:", no_group_leakage)
18print("digest matches manifest:", digest_matches)
19print("labels remain valid:", labels_valid)
20print("fingerprints remain unique:", fingerprints_unique)1no conversation leakage: True
2digest matches manifest: True
3labels remain valid: True
4fingerprints remain unique: TrueThese four checks protect different truths: a split invariant, an artifact identity, a supervised-label contract, and an exact-deduplication invariant. A training script should reject data that fails any one of them.
A cleaned dataset can still be unhelpful. If validation contains no escalation cases, its accuracy won't reveal whether the model misses urgent refunds. Inspect label coverage by split before launching training.
1from collections import defaultdict
2
3rows = [
4 {"split": "train", "label": "standard"},
5 {"split": "train", "label": "escalate"},
6 {"split": "validation", "label": "standard"},
7 {"split": "test", "label": "standard"},
8 {"split": "test", "label": "escalate"},
9]
10required_labels = {"standard", "escalate"}
11coverage = defaultdict(set)
12for row in rows:
13 coverage[row["split"]].add(row["label"])
14
15missing = {
16 split: sorted(required_labels - coverage[split])
17 for split in ["train", "validation", "test"]
18 if required_labels - coverage[split]
19}
20print("coverage:", {split: sorted(values) for split, values in coverage.items()})
21print("missing labels:", missing)
22print("ready for comparison:", not missing)1coverage: {'train': ['escalate', 'standard'], 'validation': ['standard'], 'test': ['escalate', 'standard']}
2missing labels: {'validation': ['escalate']}
3ready for comparison: FalseWith a tiny sample, the right response isn't to keep resampling until validation looks convenient. Collect additional reviewed groups or adopt a documented group-aware assignment rule, record the change as a new dataset version, and rerun evaluation.
Customer support text can contain emails, phone numbers, order IDs, addresses, or payment details. A beginner pipeline shouldn't promise that one regular expression safely anonymizes all personal data. It should detect obvious sensitive patterns, quarantine rows for a reviewed redaction path, and record that decision in the manifest.
1import re
2
3email_pattern = re.compile(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b")
4rows = [
5 {"ticket_id": 501, "text": "Refund has not arrived for order A10234"},
6 {"ticket_id": 502, "text": "Send updates to [email protected] please"},
7]
8
9no_email_match = []
10needs_redaction_review = []
11for row in rows:
12 if email_pattern.search(row["text"]):
13 needs_redaction_review.append(row["ticket_id"])
14 else:
15 no_email_match.append(row["ticket_id"])
16
17print("no email match in this narrow gate:", no_email_match)
18print("redaction review required:", needs_redaction_review)1no email match in this narrow gate: [501]
2redaction review required: [502]This is a deliberately narrow detector. Ticket 501 still contains an order ID, so "no email match" isn't the same as "safe to export." A production export needs privacy review, access controls, retention rules, and tested redaction coverage for its actual data sources. A pipeline that hides what it can't detect is less useful than one that exposes its limits.
Data changes are model changes in disguise. Adding reviewed examples, adjusting normalization, repairing labels, or changing split rules can all move a metric. Give each released artifact a content digest and pipeline version so a comparison can name what changed.
1import hashlib
2import json
3
4def digest(rows: list[dict]) -> str:
5 payload = "\n".join(json.dumps(row, sort_keys=True) for row in rows) + "\n"
6 return hashlib.sha256(payload.encode()).hexdigest()[:12]
7
8v1 = [{"text": "refund missing", "label": "escalate"}]
9v2 = [*v1, {"text": "return label broken", "label": "standard"}]
10
11print("v1:", digest(v1), "rows:", len(v1))
12print("v2:", digest(v2), "rows:", len(v2))
13print("version changed:", digest(v1) != digest(v2))1v1: 6eeaad571016 rows: 1
2v2: 92b8336da382 rows: 2
3version changed: TrueHugging Face Datasets applies a related idea in its cache: a fingerprint tracks dataset state and is updated when transformations such as map() or shuffle() change the data processing history.[4] You still need human-readable documentation alongside machine fingerprints. Record source snapshots, license and privacy decisions, label guidelines, exclusions, transformations, split policy, known gaps, and intended uses in a datasheet or equivalent review artifact.[3]
The support example contains only a handful of rows, but its controls map directly to larger AI systems:
| Small lesson mechanism | Larger LLM or research use |
|---|---|
| Reject missing labels | Reject malformed supervised fine-tuning or preference rows. |
| Exact text fingerprint | Remove repeated corpus segments and track copied eval prompts. |
| Group split by conversation | Keep customer sessions, documents, or benchmark families from crossing splits. |
| Shingle overlap review | Screen pretraining or synthetic candidates against locked benchmarks. |
| Manifest and datasheet | Attach lineage and limitations to each training or evaluation release. |
| Coverage audit | Check that rare failure modes and policy-critical cases remain measurable. |
The general habit is simple: never quote a model score without being able to identify the exact data artifact, the split policy, and the contamination check that made the score interpretable.
Use the runnable labs above as a controlled failure exercise. Make one change at a time, predict which check should fail, then run the example.
08-verify-artifact.py, append {**rows[0], "ticket_id": 999, "split": "test"} to rows before building groups.08-verify-artifact.py, append a newline to rows_path after loading saved_manifest and before computing digest_matches.06-shingle-overlap.py, print jaccard("refund", "refund") and jaccard("", "").04-stable-grouped-split.py, assign buckets from str(row["ticket_id"]) instead of row["conversation_id"]. Explain what can happen to two turns from one conversation.no_group_leakage becomes False because one conversation now crosses splits.digest_matches becomes False because artifact bytes no longer match released receipt.1.0; two blank strings return 0.0. Blank rows belong at schema gate, not similarity queue.You can now trace a dataset row from raw export to released artifact. A training loop tells you how weights changed; a validation plan tells you how evidence was measured; a dataset receipt tells you whether those measurements deserve trust.
The next chapter turns timestamped shipment records into model inputs. The same discipline still applies: each feature needs a definition, a time boundary, and a version so offline training and live serving mean the same thing.
Before moving on, explain why a model trained on duplicated or contaminated data can show a stronger score while becoming no more useful for new customers. Then answer these checks.
Answer every question, then check your score. Score above 75% to mark this lesson complete.
9 questions remaining.
Language Models are Few-Shot Learners.
Brown, T., et al. · 2020 · NeurIPS 2020
On the Resemblance and Containment of Documents.
Broder, A. Z. · 1997
Datasheets for Datasets
Gebru, T., Morgenstern, J., Vecchione, B., et al. · 2021 · Communications of the ACM
Datasets Documentation.
Hugging Face. · 2026 · Official documentation