MediumSchedulingPython 3

Workflow State Machine

Track dependent jobs through legal workflow states with retry and cancellation behavior.

40m1 sample tests2 hidden tests

Workflow State Machine

Implement WorkflowRunner, an in-memory job tracker for dependent work. Jobs move through a small set of states:

text
1pending -> running -> succeeded 2pending -> running -> failed 3running -> retrying -> pending 4pending/running/retrying/failed -> canceled

Requirements

  • add_job(job_id, deps=None, max_attempts=1) registers a job.
  • ready() returns pending jobs whose dependencies have succeeded, in insertion order.
  • transition(job_id, state, reason=None) validates state transitions and returns the final state.
  • Starting a job increments its attempt count.
  • If a running job fails and still has attempts left, move it to retrying; otherwise move it to failed.
  • status(job_id) returns the current state, attempts, and history.

Example

python
1runner = WorkflowRunner() 2runner.add_job("extract") 3runner.add_job("train", deps=["extract"], max_attempts=2) 4 5assert runner.ready() == ["extract"] 6runner.transition("extract", "running") 7runner.transition("extract", "succeeded") 8assert runner.ready() == ["train"]

Constraints

  • Keep it single-process and in memory.
  • Use deterministic ordering.
  • Raise ValueError for invalid transitions or unknown jobs.

Editor
1
2
3
4
5
6
7
8
9
10
11
12
Sample Tests
unlocks dependencies in insertion order
from solution import WorkflowRunner

runner = WorkflowRunner()
runner.add_job("extract")
runner.add_job("train", deps=["extract"], max_attempts=2)
runner.add_job("report", deps=["train"])

assert runner.ready() == ["extract"]
assert runner.transition("extract", "running") == "running"
assert runner.transition("extract", "succeeded") == "succeeded"
assert runner.ready() == ["train"]
assert runner.status("extract")["attempts"] == 1
Results
Run sample tests or submit all tests.