MediumSchedulingPython 3
Workflow State Machine
Track dependent jobs through legal workflow states with retry and cancellation behavior.
40m1 sample tests2 hidden tests
Workflow State Machine
Implement WorkflowRunner, an in-memory job tracker for dependent work. Jobs move through a small set of states:
text
1pending -> running -> succeeded
2pending -> running -> failed
3running -> retrying -> pending
4pending/running/retrying/failed -> canceledRequirements
add_job(job_id, deps=None, max_attempts=1)registers a job.ready()returns pending jobs whose dependencies have succeeded, in insertion order.transition(job_id, state, reason=None)validates state transitions and returns the final state.- Starting a job increments its attempt count.
- If a running job fails and still has attempts left, move it to
retrying; otherwise move it tofailed. status(job_id)returns the current state, attempts, and history.
Example
python
1runner = WorkflowRunner()
2runner.add_job("extract")
3runner.add_job("train", deps=["extract"], max_attempts=2)
4
5assert runner.ready() == ["extract"]
6runner.transition("extract", "running")
7runner.transition("extract", "succeeded")
8assert runner.ready() == ["train"]Constraints
- Keep it single-process and in memory.
- Use deterministic ordering.
- Raise
ValueErrorfor invalid transitions or unknown jobs.
Editor
1 2 3 4 5 6 7 8 9 10 11 12
Sample Tests
unlocks dependencies in insertion order
from solution import WorkflowRunner
runner = WorkflowRunner()
runner.add_job("extract")
runner.add_job("train", deps=["extract"], max_attempts=2)
runner.add_job("report", deps=["train"])
assert runner.ready() == ["extract"]
assert runner.transition("extract", "running") == "running"
assert runner.transition("extract", "succeeded") == "succeeded"
assert runner.ready() == ["train"]
assert runner.status("extract")["attempts"] == 1Results
Run sample tests or submit all tests.