Can luigi rerun tasks when the task dependencies become out of date?
One way you could accomplish your goal is by overriding the complete(...)
method.
The documentation for complete
is straightforward.
Simply implement a function that checks your constraint, and returns False
if you want to recompute the task.
For example, to force recomputation when a dependency has been updated, you could do:
def complete(self):
"""Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
import os
import time
def mtime(path):
return time.ctime(os.path.getmtime(path))
# assuming 1 output
if not os.path.exists(self.output().path):
return False
self_mtime = mtime(self.output().path)
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in self.requires():
if not el.complete():
return False
for output in el.output():
if mtime(output.path) > self_mtime:
return False
return True
This will return False
when any requirement is incomplete or any has been modified more recently than the current task or the output of the current task does not exist.
Detecting when code has changed is harder. You could use a similar scheme (checking mtime
), but it'd be hit-or-miss unless every task has its own file.
Because of the ability to override complete
, any logic you want for recomputation can be implemented. If you want a particular complete
method for many tasks, I'd recommend sub-classing luigi.Task
, implementing your custom complete
there, and then inheriting your tasks from the sub-class.
I'm late to the game, but here's a mixin that improves the accepted answer to support multiple input / output files.
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://stackoverflow.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return time.ctime(os.path.getmtime(path))
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True
To use it, you would just declare your class using, for example class MyTask(Mixin, luigi.Task)
.
The above code works well for me except that I believe for proper timestamp comparison mtime(path)
must return a float instead of a string ("Sat " > "Mon "...[sic]). Thus simply,
def mtime(path):
return os.path.getmtime(path)
instead of:
def mtime(path):
return time.ctime(os.path.getmtime(path))