Data Pipelines with Luigi, Part 1
This introduction to Luigi was originally presented as a talk at PyData London 2015. Feel free to check out the slides!
At Growth Intelligence, we use predictive modeling to help generate high quality leads for our customers. We’re essentially helping them answer this question: where should I focus my outbound sales and marketing efforts to yield the highest possible ROI?
In order to do this, we track as many data points as we can about all the companies in the UK. Each of these data points comes with it’s own data pipelines: importing, processing, and upserting into a database. There it is fed into other data pipelines, like model building and indexing it into elasticsearch.
The more data pipelines you have, the more important it is that they are readable, maintainable, and can be chained together easily to automate your data flows. Luckily, the data team at Spotify open-sourced a Python library called Luigi which helps you do all of that and more.
Luigi provides structure for batch processing jobs through its Task class, which represents some unit of work in your pipeline. A Task logically takes in some data, does any processing you like, and then outputs one or more Targets. A Target is a Luigi class that represents where the data lives. A target could be a file on disk, a table in a database, a key in a bucket on S3, etc. With these two simple abstractions, Luigi provides a powerful interface for quickly creating complex, robust data pipelines. Now that you’re oriented a bit, let’s check out some code!
Let’s imagine that we have a csv file that contains data about companies here in the UK. To get our feet wet, let’s imagine we simply want to count the number of unique companies currently operating in the UK and write that count to a file on disk.
Here’s what our simple Task would look like:
class CompanyCount(luigi.Task): def output(self): return luigi.LocalTarget("count.csv") def run(self): count = count_unique_entries("companies.csv") with self.output().open("w") as out_file: out_file.write(count)
You can see we’ve defined a task “CompanyCount”. It has a couple of methods: output and run. When we run this from the command line, Luigi executes the code in the run method and finished by writing the count to the output target. Remember, valid output can be a lot of things: a location on disk, on a remote server, or location in a database. In this case, we’re simply writing to disk.
Now let’s get a little more complex with our example. We want to make sure we have the latest count, so instead of using our local, outdated file, we’re going to go and get the latest data from a government server (don’t worry, it’s publicly available =] ).
So now we have two tasks: CompanyDownload and CompanyCount.
class CompanyCount(luigi.Task): def requires(self): return CompanyDownload() def output(self): return luigi.LocalTarget("count.csv") def run(self): count = count_unique_entries(self.input()) with self.output().open("w") as out_file: out_file.write(count) class CompanyDownload(luigi.Task): def output(self): return luigi.LocalTarget("companies.csv") def run(self): data = get_company_download() with self.output().open('w') as out_file: out_file.write(data)
Here, you can see we’ve outlined a simple task that goes up and gets the company data and writes it to “companies.csv”.
Before we move on, let’s try running this from the command line. To run our company count task from beginning to end, we simply call:
python company_flow.py CompanyCount --local-scheduler
This tells Luigi which task we want to run. It’s worth noting that we told Luigi to use the local-scheduler. This tells Luigi to not use the central-scheduler, which is a daemon that comes bundled with Luigi and handles scheduling tasks. We’ll talk about the central scheduler in our next post, but for now, we’ll just use the local-scheduler.
When we run this from the command line, Luigi builds up a dependency graph and sees that before it can run CompanyCount, it needs to run CompanyDownload. It establishes this by calling the exists() method on required tasks, which simply checks to see if the Target returned by the output method already exists. If it does, that task is marked as DONE, otherwise it’s included in the task queue. So first Luigi runs CompanyDownload, and then if it executes successfully, it runs CompanyCount and generates a new count for us.
So that was the MVP Luigi task – but from these simple building blocks it is possible to build up complicated examples quite quickly. In our next post, we’ll show how we can extend this task and move it to the Luigi central scheduler which you will want to use in production.