Passing Python objects between Tasks in Luigi?
Short answer: No.
Luigi parameters are limited to date/datetime objects, string, int and float. See docs for reference.
That means that you need to serialize your complex data structure as a string (using json, msgpack, whatever serializer you like, and even compress it) and pass it as a string parameter.
Of course, you may write a custom Parameter subclass, but you'll need to implement the serialize and parse methods basically.
But take into account: if you use parameters instead of saving your calculated data to a target, you will be loosing one key advantage of using Luigi: if the parent task in the tree fails more than the count of retries you specify, then you´ll need to run the task that calculates that complex data structure again. If your tasks calculates complex data or takes a considerable amount of time or consumes a lot of resources, then you should save the output as a target in order to not having to do all that expensive computation again.
And looking beyond: another task may need that data too, so why not save it?
Also, notice that targets are not only files: you may save your data to a database table, Redis, Hadoop, an Elastic Search index, and many more: http://luigi.readthedocs.io/en/stable/api/luigi.contrib.html#submodules
There is also other - still a bit hacky - way to achieve what you're trying to do with target instead of parameters.
There is a special MockFile target in luigi.mock
that allows you to store it's "file" in memory.
It's api is similar to other Target inheriting classes, so you'll have to open
, read
and write
to it. Suddenly it only supports string
input, so you're still need to serialise your object (that's due to sending this data through pipe between processes). See the following example (yaml serialisation):
import yaml
from luigi import Task
class TaskA(Task):
def output(self):
return MockFile('whatever')
def run(self):
object_to_send = yaml.dump({"example": "dict"})
_out = self.output().open('r')
_out.write(object_to_send)
_out.close()
class TaskB(Task):
def requires(self):
return TaskA()
def run(self):
_in = self.input().read('r')
serialised = _in.read()
deserialised = yaml.load(serialised)
print(deserialised)
Be aware, that serialising big objects might take a lot of time.