Saving My Sanity With SQLite
Or, bending batch processing jobs to my will (evil laughter ensues).
The Premise
Suppose you have a large quantity of hefty files sitting in cloud storage somewhere. Your mission is to download each file, perform some kind of work using that file, and then upload the results to another spot in cloud storage.
The actual reason behind work like this could be all kinds of things:
- An audit to see how many files have a particular property...
- A data migration...
- A one-off analytics report (my condolences)...
Whatever the details, the key properties of this work are:
- You need to perform the same kind of task over and over again for different inputs.
- The number of sub-tasks is large.
- Each sub-task has a non trivial chance of failing (maybe because of the network or an LLM being involved).
- The total time it takes to process all sub-tasks is substantial.
- You're performing the work rarely or as a one-off.
Tedious and Time-Consuming Options
Maybe my dayjob is unique in that I end up performing batch processing like this around once a month. As such, I've tried a lot of different methods for setting up these kinds of scripts.
Perhaps owing to the fact that one-off jobs are often rush jobs as well, most of the things I've tried are terrible for one reason or another.
All Gas No Brakes
The first option is the simplest, just process each task in a loop with no extra error handling or bookkeeping.
# Python
tasks = collect_tasks()
results = [process_task(t) for t in tasks]
write_results(results)
This approach is terrible for a number of reasons:
- If any one task throws an exception, the entire job fails. This means you need to babysit the script in case you have to restart it.
- You have no idea which tasks have succeeded or failed. So, when you do need to restart the job your best option is to comb through the logs (if the script has any) to figure out where it failed and adjust the script parameters so it starts from the right spot.
Retry Logic
Another step up the ladder is to try and catch any errors that occurred and retry the task(s) that failed. This approach has some issues too:
- It can be difficult to anticipate what errors might occur or deliberately trigger failures for testing your error handling logic. You might spend an extra hour on retry logic only to have the script crash anyway because an unexpected error occurred.
- Some tasks may continue to fail even if you retry them. So, without additional bookkeeping you still have to manually figure out what tasks need to be re-run.
CSV File Tracking
To address the problem of not knowing which tasks have failed or not, you can record the script's progress with a CSV file or similar. An especially convenient version of this is to have your script take a CSV file as input, where each row represents a sub-task. Then, if any sub-task fails, you can write it to a different file in the same format as the input CSV and keep running. When the script is finished, you can re-run any failed tasks by invoking the script again with the "list of failures" CSV as input.
Personally, if I'm doing some kind of one-off processing, I much prefer the "let it fail and run it later" approach this kind of bookkeeping provides.
The main downside of using CSV files is that if you want to do anything remotely elaborate with them, you're going to have to write it yourself. For example, the two CSV file approach I described earlier lets you re-run failed sub-tasks, but it doesn't let you pause script execution and resume it from where you left off. You could totally implement that feature using CSV files as a storage backend, but there's an easier way!
Using SQLite Instead
It probably goes without saying, but this is not my idea. My brother is the one who first introduced me to the concept, and some cursory web searches show that the idea is rather common. Though, the terms "task queue" and "job queue" are sufficiently vague that it seems like everyone has a slightly different idea of what they entail.
The premise is straightforward:
- One table stores the tasks that still need processing.
- Another table stores the results of each processed task.
- When a task is completed, it is "moved" from the queue table to the results table.
I've been iterating on a version of this idea for a couple of months and I think it's ready to share. I'm a sucker for giving my projects funny little names, so I've called it Herd.
SQLite is incredibly common, so I can reasonably assume it's going to be on any machine I'm going to be using as a workstation. And if it's not available, it's easy to install.
SQLite also has drivers for a huge number of languages, so if I need to port Herd from Python to some other language, it should be pretty straightforward.
A toy usage example of Herd looks like this:
class Computation(TypedDict):
operator: str # add, sub, mul, div
x: float
y: float
db_loc = Path("./q.db")
# A list of operations we need to perform
to_process: list[Computation] = [
{"operator": "add", "x": 10, "y": 40},
{"operator": "mul", "x": 34, "y": 9},
{"operator": "div", "x": 4, "y": 0},
{"operator": "sub", "x": 10, "y": 40},
{"operator": "div", "x": 100, "y": 7},
{"operator": "div", "x": 0, "y": 0},
]
# In reality, you're probably persisting your results to disk,
# another database, cloud object storage, etc.
results = []
with HerdQueue[Computation](db_loc) as q:
q.setup()
q.enqueue(to_process)
while task := q.peek():
# Also in reality, these operations probably take a really long time
# and fail for some non-deterministic reason, like a temporary network
# issue.
match task.params:
case {"operator": "add", "x": x, "y": y}:
result = x + y
case {"operator": "sub", "x": x, "y": y}:
result = x - y
case {"operator": "mul", "x": x, "y": y}:
result = x * y
case {"operator": "div", "x": x, "y": y}:
if y == 0:
result = None
else:
result = x / y
if result is None:
q.complete_task(task, 0, "Failed to compute math problem")
else:
results.append(result)
q.complete_task(task, 1)
In practice, I've been using Herd with
Typer CLIs such that I have one command for
running q.setup(), another command for populating the queue, and another
command for running the actual script.
So long as you ensure that complete_task is called when appropriate, the
persistent nature of the queue means that you can stop and start the script at
any point and it will pick up from where it left off. This also applies if the
script crashes. Of course, depending on your workload, it'll be up to you to
appropriately clean up any tasks that are partially completed.
Herd is just a single file, with the intent that if you want to use it, you can copy/paste it into your application and modify it to whatever your needs are. For example, it uses some new-ish generic type syntax that you might need to remove if you're using an older version of Python. At any rate, I tried to be thorough in the README and doc-strings so that the implementation is easy to understand and use.