Build point-in-time delivery features from events and preserve the same meaning in online serving.
The last lesson defined a valid feature row for a late-delivery model. Now the system must produce millions of those rows for training and fresh values for live orders. That requires two paths: a historical batch path and a low-latency streaming or online path.
Both paths must answer the same question: what was known about this shipment at the prediction timestamp? If the batch job sees events that happened later, training receives a cleaner view of history than production ever can.
Consider one order:
| Time | Event |
|---|---|
| May 1 08:00 | parcel leaves warehouse |
| May 1 10:00 | prediction requested |
| May 1 13:00 | carrier records weather disruption |
| May 2 17:00 | parcel delivered late |
For the 10:00 prediction, the disruption at 13:00 can't become a feature. A normal database join that selects the latest row today will accidentally attach it during retraining. A point-in-time join selects only records with event_time <= prediction_time.
Feast documents point-in-time correct retrieval for historical training features so later values aren't joined onto earlier entities.[1] This is broader than a particular feature-store product: any batch pipeline needs this temporal rule.
The code below creates hours_since_last_scan for two prediction requests. The scan at 13:00 exists in storage by retraining time, but it can't affect a request scored at 10:00.
1from datetime import datetime
2
3def dt(value):
4 return datetime.fromisoformat(value)
5
6scans = [
7 {"order": "O-204", "time": dt("2026-05-01T08:00:00"), "status": "departed"},
8 {"order": "O-204", "time": dt("2026-05-01T13:00:00"), "status": "weather_delay"},
9 {"order": "O-204", "time": dt("2026-05-02T17:00:00"), "status": "delivered"},
10]
11requests = [
12 {"order": "O-204", "at": dt("2026-05-01T10:00:00")},
13 {"order": "O-204", "at": dt("2026-05-01T16:00:00")},
14]
15
16def as_of_scan(order, at):
17 visible = [s for s in scans if s["order"] == order and s["time"] <= at]
18 latest = max(visible, key=lambda s: s["time"])
19 return latest["status"], int((at - latest["time"]).total_seconds() / 3600)
20
21for request in requests:
22 status, age = as_of_scan(request["order"], request["at"])
23 print(request["at"].strftime("%H:%M"), status, age)110:00 departed 2
216:00 weather_delay 3The same event is correctly absent from the first row and present in the second. A unit test should preserve that behavior permanently: backfilling storage mustn't rewrite what a model could have known in the past.
A batch job is appropriate for training snapshots, daily aggregate features, and recomputation after a bug fix. It can scan complete partitions and write immutable artifacts. An online path is appropriate for hours_since_last_scan or current warehouse queue length when a customer asks for an ETA right now.
| Concern | Batch path | Online path |
|---|---|---|
| purpose | train and evaluate models | score current order |
| latency | minutes or hours | milliseconds |
| correction | backfill a versioned snapshot | process late event or invalidate cache |
| evidence | snapshot manifest and statistics | freshness timestamp and trace |
Different stores are acceptable. Different definitions are not. If batch uses a seven-day backlog mean while online serves a one-hour queue count under the same column name, a validation score can't predict live behavior.
Carrier events arrive late. An event recorded at 11:00 might describe a scan that happened at 09:30. The pipeline should keep both event_time and ingested_at. Historical training can rebuild the row according to event time, while live scoring at 10:00 can only use events ingested before 10:00 unless the product explicitly allows retroactive correction.
Freshness requires a serving policy:
| Condition | Serving decision |
|---|---|
| backlog feature updated within 15 minutes | score normally |
| backlog feature 45 minutes stale | score with fallback and log degraded mode |
| no carrier scans for expected lane | abstain from narrow ETA promise |
Google Cloud's production ML guidance separates automated data validation, model validation, pipeline triggers, metadata, and online validation before broad promotion.[2] Feature freshness belongs in that first validation boundary: no model fixes a missing or temporally invalid input.
| Evidence | A production-ready answer demonstrates |
|---|---|
| temporal correctness | constructs training rows with as-of joins and explicit event-time boundaries |
| freshness behavior | distinguishes late data, stale online features, and safe fallback actions |
| operational replay | explains versioned backfills without silently rewriting historical meaning |
| Symptom | Cause | Fix |
|---|---|---|
| Training gets better after a backfill but serving does not | latest-value leakage | use as-of joins |
| Online results disagree with offline replay | transformations diverged | publish one feature definition and parity samples |
| Reliable model emits bad ETAs during upstream lag | freshness wasn't part of serving policy | trace age and fail to safer response |