Summary
There is a vast constellation of tools and platforms for processing and analyzing your data. In this episode Matthew Rocklin talks about how Dask fills the gap between a task oriented workflow tool and an in memory processing framework, and how it brings the power of Python to bear on the problem of big data.
Preamble
- Hello and welcome to the Data Engineering Podcast, the show about modern data infrastructure
- Go to dataengineeringpodcast.com to subscribe to the show, sign up for the newsletter, read the show notes, and get in touch.
- You can help support the show by checking out the Patreon page which is linked from the site.
- To help other people find the show you can leave a review on iTunes, or Google Play Music, and tell your friends and co-workers
- Your host is Tobias Macey and today I’m interviewing Matthew Rocklin about Dask and the Blaze ecosystem.
Interview with Matthew Rocklin
- Introduction
- How did you get involved in the area of data engineering?
- Dask began its life as part of the Blaze project. Can you start by describing what Dask is and how it originated?
- There are a vast number of tools in the field of data analytics. What are some of the specific use cases that Dask was built for that weren’t able to be solved by the existing options?
- One of the compelling features of Dask is the fact that it is a Python library that allows for distributed computation at a scale that has largely been the exclusive domain of tools in the Hadoop ecosystem. Why do you think that the JVM has been the reigning platform in the data analytics space for so long?
- Do you consider Dask, along with the larger Blaze ecosystem, to be a competitor to the Hadoop ecosystem, either now or in the future?
- Are you seeing many Hadoop or Spark solutions being migrated to Dask? If so, what are the common reasons?
- There is a strong focus for using Dask as a tool for interactive exploration of data. How does it compare to something like Apache Drill?
- For anyone looking to integrate Dask into an existing code base that is already using NumPy or Pandas, what does that process look like?
- How do the task graph capabilities compare to something like Airflow or Luigi?
- Looking through the documentation for the graph specification in Dask, it appears that there is the potential to introduce cycles or other bugs into a large or complex task chain. Is there any built-in tooling to check for that before submitting the graph for execution?
- What are some of the most interesting or unexpected projects that you have seen Dask used for?
- What do you perceive as being the most relevant aspects of Dask for data engineering/data infrastructure practitioners, as compared to the end users of the systems that they support?
- What are some of the most significant problems that you have been faced with, and which still need to be overcome in the Dask project?
- I know that the work on Dask is largely performed under the umbrella of PyData and sponsored by Continuum Analytics. What are your thoughts on the financial landscape for open source data analytics and distributed computation frameworks as compared to the broader world of open source projects?
Keep in touch
Links
- http://matthewrocklin.com/blog/work/2016/09/22/cluster-deployments?utm_source=rss&utm_medium=rss
- https://opendatascience.com/blog/dask-for-institutions/?utm_source=rss&utm_medium=rss
- Continuum Analytics
- 2sigma
- X-Array
- Tornado
- Airflow
- Luigi
- Mesos
- Kubernetes
- Spark
- Dryad
- Yarn
- Read The Docs
- XData
The intro and outro music is from The Hug by The Freak Fandango Orchestra / CC BY-SA
Hello, and welcome to the Data Engineering Podcast, the show about modern data infrastructure. Go to data engineering podcast.com to subscribe to the show, sign up for the newsletter, read the show notes, and get in touch. You can help support the show by checking out the Patreon page, which is linked from the site. And to help other people find the show, you can leave a review on Itunes or Google Play Music and tell your friends and coworkers or share it on social media.
[00:00:26] Tobias Macey:
I'm your host, Tobias Macy. And today, I'm interviewing Matthew Rocklin about Dask and the Blaze ecosystem. So, Matt, could you please introduce yourself? Hi, Tobias. Thanks for having me. I'm, very excited to see your new show. So my name is Matt. I'm a software developer, an Office for a software developer at Continuum Analytics. I'm the lead developer on a project called DaaS, which we'll talk about in a bit. So I'm wondering how you first got involved in the space of data engineering and building tools for it. Yeah. So I probably first got in started as with many open source software developers in grad school while I was procrastinating on a dissertation, and I was in sort of more numerical analysis or scientific computing field and as a computer scientist.
So I was engaged more in, you know, large matrix multiplications, sort of large simulations on high performance computers. And I also were really active in the the Python of the source space, which was a nice combination of people who cared about performance, like, I cared about performance, also people who cared about accessibility and moving a lot of that accessible a lot of that sort of high performance computing down to sort of the sort of average user. There's, I think, a lot of social value in that. So I was sort of doing some post docs, and I sort of got more excited about open source work. And so I transitioned over to Continuum Analytics.
Continuum is sort of a for profit company within the open source Python ecosystem, and that sort of brought me into more, you know, more business use cases, more sort of general computing
[00:01:42] Matthew Rocklin:
rather than scientific computing. And so I've taken my sort of experience in numerical analysis and sort of shifted it over to more data engineering use cases. So as you mentioned, you're 1 of the primary developers for the DASK project. So I'm wondering if you can explain a bit about what DASK is and how it began, and also a little bit about its relation to the overall Blaze ecosystem. Yeah. So Dask is a Python library for parallel computing.
[00:02:08] Tobias Macey:
So at its sort of core so Dask has a few things. At its core, it runs Python functions in parallel on different computers. It tracks where they end up and then it runs more functions on those results. So if you have maybe 3 tasks you want to run, a, b, and c, maybe a and b can run-in parallel and c depends on the outputs of both a and b. You might run a on 1 machine and b on the other machine, and then you need to run c somewhere. You might run it on the machine that has a or the machine that has b, you know, based on maybe, you know, the smaller of the 2. So you want to move the, you know, the smallest amount of data. So in doing this, Dask thinks about how to run functions on different computers.
It also thinks about how to move data between different computers. It also has to sort of make that decision of where you choose to run a task based on the state of everything as it is currently. In sort of technical terms, DASK is a dynamic distributed task scheduler. So it schedules tasks. On top of that task scheduler, once you have that system that can run all these little small Python functions everywhere, you can start to build some big data systems. So on top of sort of the DAS schedulers, we have built DASK array, which looks like the Python NumPy project but, you know, on very large datasets, or DAS DataFrame, which looks like the Python Pandas project for large data frames or sort of other machine learning things, etcetera. So DASK has been sort of integrating itself into the rest of the PyData ecosystem, providing parallelism, and making this sort of already very fast ecosystem of Python, extend out to distributed computers, which is sort of something that has lacked historically. Yeah. It's sorry. Go ahead. I was gonna say you also asked about Blaze.
So DaaS sort of fell out of the Blaze project, which is now sort of a family of projects. So there's projects like Numba, which is a sort of Python to LLVM compiler. So it takes sort of your Python code, you know, very naive 4 loopy Python code and sort of runs it at c or 4 trans speeds. There's projects like DINE, the c plus plus array, BLOSC or b calls compression and on disk storage, and Blaze, sort of the current Blaze software project, sort of the query optimizer, and sort of translates SQL like expressions to various systems. And so I think the the idea of this family of projects, if you add them all together, you get sort of a full cohesive big data system.
But in practice, they all tend to be used probably more independently
[00:04:23] Matthew Rocklin:
than as sort of the conglomerate. Yeah. And 1 of the obvious parallels is the Hadoop ecosystem where it's got the data storage layer and a number of different processing layers built on top of that with Hadoop being the sort of forefather of all of that and then additional projects coming out of it. And a lot of the existing big data stack has historically been built around the JVM. It seems a lot of that is due to sort of the gravitational pull of the he Hadoop ecosystem. And so it's interesting to see an analogous ecosystem growing out in an in a different language space. So what is it do you think about the current time and place in technology that makes it so that it's actually a good opportunity for a Python offering to come up and sort of challenge the existing sort of standard best practice way of handling large datasets?
[00:05:15] Tobias Macey:
Right. Actually, I would actually say it's not not it's not a challenge. So if you look at Hadoop originally, it was really some large it was it was a large model like project. And then Hadoop itself broke itself up into yarn, you know, a resource resource manager, HDFS, a file system, the MapReduce runtime, which is sort of, you know, stagnant a little bit as other runtimes have come on top on top of it like Spark or Hive or Impala. And I think once this sort of decision was made to fragment things, it allowed for a lot of growth in an ecosystem. And so a lot of these sort of Hadoop ecosystem systems like maybe drill or Impala or Hawk, you know, a lot of these actually you know, some of them are JVM based, some of them are not JVM based, but they can all sort of speak to HDFS.
They can speak to each other. They use the same data formats. And so I think the system has really opened up. So it's no longer, JVM, non JVM. It's just a lot of distributed systems that are all playing in the same space. So DASK doesn't compete there. It just sort of lives in that same same world. You can, you know, deploy Dask with yarn. Dask can read data from HDFS. It can write data to parquet files, etcetera.
[00:06:18] Matthew Rocklin:
So it's just sort of another player inside of that same space. And so what are some of the specific use cases that Dask was built for that weren't currently solved by any of the existing options? And I guess what is it about the
[00:06:31] Tobias Macey:
problems domains that people were trying to tackle that, DASK was needed for? Yeah. So I would say I'm a little bit of an outside of this community, but I'd say that most of the sort of big data tooling was built either by large web companies like Google or Twitter, or it was built sort of in service of business intelligence, largely SQL. Most of the tools you mentioned were sort of SQL databases or systems that are sort of SQL plus, systems like Hadoop or Spark, which give you, you know, map, shuffle, group by, reduce, join, but sort of a relatively small number of operations that hit a very common case. Dask originally came out of sort of the more data sciency ecosystem where the the complexity of algorithm was was just a little bit greater.
So for example, if you want to to do some matrix operations or if you wanted to implement to do a nice principal cona analysis, we're gonna do some we're gonna do some time series analysis. Those things you can do in Hadoop or Spark, but, not often in sort of with sort of modern algorithms. And so Dask is slightly more flexible in the way that it represents computations, and that allows it to handle messier situations. So a few sort of concrete examples. So, like, time series analysis. Let's say you have data and it's maybe it's grouped by week, and and you wanna group it by month or vice versa. So if your if your dataset is grouped by week and you're grouped by month, the weeks and the months don't overlap exactly. There's sort of a few days on either end that you need to to share.
And that sort of point to point sharing is sort of very easy to write down on something like Dask but it's maybe harder to write down something like that in MapReduce, which sort of only understands, you know, map and then shuffle and aggregate. Same with Spark internally. So if you look at something I think 2 Sigma is a financial company here in New York City that actually has had implemented, a fair amount of time series stuff on top of Spark that has sort of dive fairly deeply within the internals to achieve sort of good performance on time series. So it can be done with these tools, but it's a little more complex. And with Dask, those sort of, trickier algorithms are just easier. Another good example is, just any n dimensional array operation.
So a lot of climate scientists are now using DASK to, you know, reason about these large data cubes of, say, the temperature everywhere on the earth for the last 50 years. That sort of operation is very easy to write down in DAS, but it's maybe harder in other systems. Yeah. It seems like a lot of the existing big data infrastructure is largely for you write a processing routine, and then you run it on a schedule or as a continuous operation
[00:08:52] Matthew Rocklin:
so that you can transform the data as it travels from point a to point b. And there are some ad hoc analysis tools that are available in the space, but it seems like most of them are more geared towards the build and debug a single operation or set of operations and then make sure that they're running properly so that the end users can have their data available and properly cleaned for doing the analysis on, whereas it seems that DASK is potentially something that can be used directly against the raw data to do that exploratory analysis or also on the end result of those other processing operations. So I'm wondering what your opinion is on how it fits in that space and what sorts of,
[00:09:37] Tobias Macey:
niches it would fill in. Yeah. So let me let me take a take a try at it. So DAS people use DAS for a couple of reasons. 1st, so 1 of the things is it is just a Python library, and so a lot of people who are using it were coming from Python originally, and they wanted to use it. They wanted to use some parallelism, and Dask was sort of a quick and easy win. It can do all of the sort of standard load my data, map over my data, dump my data somewhere else, and that's you can do that just fine. Other people are using Dask in the way that they might use something like Luigi or Airflow. So Luigi or Airflow are sort of a nice sort of counterpoint to systems like Hadoop or Spark in that they, have the same model of a task scheduler, so there's, you know, a 1, 000 little tasks you wanna run that all depend on each other in some complicated way. Yeah. So it might be, you know, load my data and then, you know, for every 3 days of data, combine them in sort of a rolling or in a rolling sample, something like that.
And so these for system like Luige and Airflow have a bit more flexibility to them. Right? They can handle more complex situations, but they often weren't designed for computationally intensive tasks. They don't share data between workers. They, you know, have relatively long latencies, etcetera. And so DASK is sort of something of a of a mix between a system like Spark and a system like Airflow. It has some of the flexibility of Airflow and has some of the sort of scaling and performance of something like Spark. It tends to be used either in data ingest or for a lot of for parallel problems that are a bit messier to write down in something like Spark. I can give you some examples of that. Would that be useful? Yeah. I definitely think it would be useful to sort of enumerate
[00:11:10] Matthew Rocklin:
what are some of the best use cases for a tool like DASK, particularly in comparison to something like Spark or alternatively something like Airflow. Because as you said, it does kind of straddle the space between the 2 where Airflow and Luigi are good for building up a number of tasks that move data between disparate systems, and they have to have a dumping ground in between those various spots where potentially, the output of 1 operation in a Luigi pipeline is used at some stage of the business, and then you need to do a further roll up or aggregation of the data or enrich that data with something else so that somebody else in the business can use it for a different purpose. Whereas it seems that DASK is more useful for being able to provide a continuous operational flow for the data from a source to a destination and have and then the, intermediate dumping is more of an optional
[00:12:01] Tobias Macey:
attribute of the system. Right. Yeah. So as you're saying, Luigi Airflow sort of usually depend on some globally accessible data store like s s 3 or a database or something that all of your workers can talk to. And so a Luigi task or Airflow task tends to dump its result into that that global data data store. While in Dask, Dask maintains a lot of state in memory. And so when a task completes on a Dask worker, it sits on that worker, until some other task needs it. And, ideally, that task moves where that worker is and runs it there. Or if necessary, DaaS will sort of move that data to some other worker as necessary either because, you know, 2 pieces of data are needed or because that worker is saturated or what have you. So I think a common a good use case to sort of wrap your head around DaaS is, so it's used a little bit within Capital 1, their credit card company. They're sort of my favorite example because they are, willing to be public about their use of Dask.
And so in Capital 1, it's used to figure out if they should offer you a better credit line. So imagine you have a credit card with Capital 1. Do you have a credit card with such a company? Yes. Yeah. Great. And so, you know, they might want, like, an email sometime that says, you know, hey. We decided to up your credit limit by $1, 000, or maybe it's $10, 000. And how they do that is they have lots of little small functions that run. And that function might be, you know, are you a student? Are you graduating soon? If so, what is your expected income? Oh, maybe you have credit may have a debt elsewhere.
Can we estimate your debt in some way? And there's all these little functions that answer all these little questions using lots of small machine learning models, and they depend on each other. Right? You know, if they wanna sort of calculate your income, also calculate your expected debt and then see, you know, based on those information and your job trajectory, if you're likely to be able to pay off your loan in the future. And so this is the sort of sort of there's obviously a lot of parallelism in this problem. You can run all these different functions, but the complexity of all these tasks that depend on each other is also sort of intense, sort of rules out, Hadoop or Spark.
Additionally, they, you know, they often need to make these decisions relatively quickly. So, you know, you might be clicking on a website and trying to get your new credit card right there, and they have, you know, a 5 second budget or a 2 second budget in order to return that around. And that so maybe rules out airflow and Luigi because you need to have that data in memory all the time. And so Dask sort of sits in between those 2. It can do the scalability. It holds data in memory. It operates at, you know, millisecond overhead, but it also handles all sort of the complexity of Airflow or Luigi.
And so while Dask has been used for sort of large data frames and large arrays, those sort of use cases are also well served by other systems. If you want a large data frame, you can use Spark. You can use a SQL database. It's really these cases of, complex intertwined tasks that also require performance. That seems to be where Dask is really getting, picked up, at least,
[00:14:51] Matthew Rocklin:
you know, among enterprise users. Given the fact that all the data stays resident in memory, it seems like that's a potential scaling issue where you need to make sure that you have enough overhead so that if you have 1 particular customer who happens to have a lot more data than the standard user, then you might find yourself surprised where all of a sudden you hit a memory ceiling on your instances. And so but you said that 1 of the capabilities of Dask is that it can ship that data between multiple workers as well. So I'm assuming that there's some capability for balancing that to alleviate any sorts of issues of hitting that memory ceiling. Yeah. We run into problems like
[00:15:31] Tobias Macey:
that all the time, so and we we have solutions. So 1 thing is DASK just dumps data to disk. You know, so at a certain point, if your memory starts to blow up, it starts to just serialize things down to disk. But that rarely happens. It's much more common that we'll just move tasks to other machines. So we should sort of think about whenever sort of a task comes up, the scheduler so in Dask, there's a centralized scheduler which knows all the computers it has, all the workers, and all the data that those those workers have. And it has sort of an estimate of the run time of most of the tasks based on how similar tasks have run-in the past, and it knows the size of all of all of the intermediate results. And so when a task comes in, it says, okay. Where should I run you? And we'll find all the workers that have the data that task needs, and it will make, you know, a decision. It might choose to run a task on a worker that has that data. If that worker is really saturated, it might choose to move that data somewhere else, or it might choose to wait. And all of those sort of decisions that it has to make are sort of what goes into making a nice dynamic task scheduler.
And Dask, generally speaking, makes that decision pretty well most of the time. We run into lots of situations like the 1 you described, and it handles it pretty well. So sort of at its core, there's a task scheduler which has sort of a bunch of heuristics and makes decisions on where to run tasks well to avoid bottlenecks. And that's really sort of, like, the the novel part of Dask. That is the thing that Dask does better than most other systems. It is very, very smart on where to send that that that small task. And
[00:16:56] Matthew Rocklin:
1 of the appeals of things like Luigi and Airflow is that because it has that dependency graph between tasks, if 1 of the if 1 of the processes in the pipeline fails, then it's intelligent enough to know that subsequent tasks shouldn't run and that it can, you know, that you can optionally retry the failing task or alert when it does fail so that somebody can fix it. Does Dask have similar capabilities built into it? Yeah. So if a task fails, we don't run the things downstream.
[00:17:24] Tobias Macey:
If a worker fails, we'll move the task elsewhere. If server worker comes in with the results, we'll we'll not run it. So you bring up a good point. So 1 of the things that Dask doesn't do is there's no way to say, retry this task if it fails. Dask assumes that any results that if you have an exception, that is the final result. And that sort of shows maybe a cultural difference between Airflow and Luigi and Dask, where Dask was more centered around computational problems, while Airflow and Luigi were more centered around, you know, data engineering problems, where a lot of these problems like retries or, you know, Airflow's cron capability, are sort of a set of features that are really valuable in those use cases that Dask doesn't have. To turn it to your point, Dask handles things like failures. It handles things
[00:18:04] Matthew Rocklin:
like exceptions. So it seems like there's even the potential for there to be sort of a hybrid approach where Dask is actually something that gets triggered as part of a pipeline used in Luigi or Airflow. You know, I'm sure that there are cases where that wouldn't make sense, but I can definitely see cases where that would be potentially very beneficial where you have a lot of preprocessing of data where sometimes there are transient failures due to network issues or just the data is not quite available yet. And so then you have that task retry capability. And then when you need to have the more complex processing where the data streams fully through the task graph, you would then kick off the task drop to do that. Yeah. Definitely. I would say that if if Luigi and Airflow are working well for you, if you don't need a whole lot of inter node communication,
[00:18:44] Tobias Macey:
you should stick to them and maybe use DASK, you know, maybe you use just DASK data frame in the same way you do something like Spark. So So if you, you know, if part of your job is really intense Pandas operation or really intense Scikit learn operation, you might then kick off Dask to handle that instead. And 1 of the things that I was noticing when I was going through the documentation for Dask is that because of the way that the
[00:19:06] Matthew Rocklin:
task chains are built up using, Python dictionary data structures. It seems like there's the potential that you could accidentally introduce a cycle or other kind of bug into the task chain. So I'm wondering if there's any sort of static analysis or prerun checklist that you can run to make sure that you don't introduce any sort of cycles where you would end up with an unexpected result because of that. Yeah. You were obviously in the the developer docs. The developer docs. Yes. Yeah. The the graph spec is is a fun thing in Dask. It's actually pretty accessible if you sort of, you know,
[00:19:39] Tobias Macey:
like basic data structures. You could construct a graph that has a cycle or is in some way illegal. It's very rare for users to sort of use that API if you're using sort of the standard use facing APIs, that'll never happen. But if it does happen, Dask does do a check. It does do sort of a a linear time optimization pass over any graph that gets sent to a scheduler. This is both to check for things like what you mentioned, like cycles, but it's also to perform standard compiler optimizations like fusion or calling out tasks that aren't necessary or in the cases of some of the collections like arrays or data frames that has some more sophisticated optimizations.
But, yeah, we've certainly run into cases like we described, and they've been resolved. It is harder today to break DASK than
[00:20:22] Matthew Rocklin:
than it was at least when it was when we were starting. And given the fact that a lot of the computation happens in memory, the obvious analogy there is for Spark. So I'm wondering from a performance perspective and also from a sort of usability perspective how the 2 systems compare. Obviously, you're a little bit biased in that given that you're the author of 1 of the tools. But, you know, just taking that as a compare contrast for somebody who is considering using Spark but is more familiar with the Python ecosystem. You know, what are some of the trade offs that they would be,
[00:20:53] Tobias Macey:
facing if they were to go with Dask instead of Spark? I thank you for the the premise. I am biased, and I'm also even worse, I'm also financially incentivized. Right? So I am paid to work on Dask, and I'm you know, I like my job, so I'm incentivized here to tell you good things about Dask. That being said, I do try to be a, you know, a good citizen in the open source community. So first of all, what are some good things about Spark? I would say that, you know, 1 of the things that Spark does quite well is that it implements SQL. Right? If what you want is a database plus some stuff, I think Spark is an excellent choice. Spark's also been been around for a very long time, relatively a long time relative to Dask. And so it's more mature. It's, you know, got a larger developer base, etcetera.
You know, you don't get fired for choosing Spark these days. I think that if you are a Python user and you like pandas or NumPy, you will find Dask array or Dask data frame to be sort of drop in replacements. So, 1 of the reasons why Dask has developed so quickly is because we collaborate very strongly with the Pandas developers, with the NumPy developers, with the Jupyter and scikit learn developers, to build all of these sort of parallel Py data projects on top of Dask. And so the Dask data frame looks and feels just like a a Pandas DataFrame. You type in Dask DataFrame dot read CSV, but instead of giving a single file name, you give it a bunch of files, you know, with a glob string on s 3 or or in HDFS.
It feels very natural. All of your grouper operations are there, your joint operations, the time series. You have an index. So if you're sort of familiar with the Pandas model, it will take you all of a minute, to get used to Dask. Performance wise, I haven't run benchmarks in a while. I know, like, a couple years ago, they were it was sort of depends on what what you do. Generally speaking today, Dask offers relatively little overhead on top of Pandas, so you're mostly running at Pandas speed, which is fast. It's actually quite hard to break Pandas speed. Even sort of good databases don't do it. So I don't know. I've I haven't done I haven't run the numbers in a while, but I would expect Dask to be as fast as you can get for things that Pandas does well.
[00:22:55] Matthew Rocklin:
And these days Spark is also pretty good, so maybe they're about comparable. And you mentioned the compatibility with the pandas API and also some of the NumPy APIs with the plugins to Dask. So I'm wondering for somebody who already has an existing code base that's leveraging NumPy and or pandas to do some analysis, but then they start needing to scale it out beyond the capabilities of a single core or a single machine. What does the integration process look like, and what are some of the edge cases that they need to be aware of as they try to
[00:23:23] Tobias Macey:
replace the their usage of pandas or NumPy with Dask? Yeah. That's a great question. So there really is sort of 2 possible cases that you're talking about. 1 is that using NumPy or Pandas and they just want a strictly bigger NumPy or Pandas. In this case, you switch out the imports. You should make sure that you're not using particular functions like Dask array. The NumPy clone doesn't have a sort function, but it does have top k or it doesn't have median. Right? Some operations like median are actually quite difficult to do exactly in parallel, but there are new operations like maybe approximate quantile that are present. So you just sort of swap out some operations for for others.
Up until recently, the products didn't support mutation, and so we sort of wanted to have a very pure workflow that has since been added. So that's that's available now. Then you operate sort of normally with pandas or NumPy syntax, and you call, sort of a a compute method when you're done. So you do, you know, my data frame dot group by the column, give me the sum on some other column, and then dot compute whenever you're finished, and that then triggers off the computation to run. You also need to start thinking about when you want to persist data on the cluster or not. So might want to say load your data, do some filtering, and then sort of clamp that down so that's living on the cluster, and then you might do many computations off of that sort of pre filtered data. Additionally, I would say say that you would start not with a cluster but with just a thread pool. So DAS is actually quite good at scaling down.
It was originally designed to run on a single machine. You're just on your laptop with threads or with processes. So it's very easy to just pip install Dask, import Dask, and go. You don't need to set anything up. You can get running in the next minute if you're listening to the to the podcast. It's very easy to start. So I said there were sort of 2, use cases as that was the first 1. You just want a bigger numpy or pandas. And I claim that while that is attractive, that is actually rarely what people end up needing. So most people are using NumPy or Pandas not because they want just NumPy or because they want just Pandas. They're often using it with a bunch of code around NumPy and Pandas.
So they have some custom libraries using scikit learn on top of it, and they've got a lot of for loops. They've brings out a lot of custom work on top of their on top of their data frame, on top of their array. And it's this custom code that's actually really awkward or frustrating to handle with sort of big data collections like DAS DataFrames or with Spark or Hadoop or databases. You might ask the question, why hasn't everyone switched from Pandas to Postgres? Postgres arguably offers a more mature database technology and yet pandas are still very popular, And that's because of the ability to write all of this stuff around Pandas, its ability to be messy. And so for this task, during this, you're sort of exercising not the big data frame aspect of Dask, but you're instead exercising the dynamic task scheduling part of Dask, the ability to write just arbitrary Python functions and send them to run somewhere. And so for this, there's a the public API is the Dask delayed, decorator.
So you can decorate some Python functions, and instead of running, they'll then become a small lazy task. And you can sort of call these functions on top of each other, and you build up a large graph in that way. And so oftentimes, you know, I can sort of sometimes a continuum has a lot of clients, and I can usually go into a codebase and tell them how to use Dask and usually get sort of the computational core of their code parallelized in about an hour or 2 after they learn it, which takes about an hour. So this involves just decorating functions and then using your exact same previous 4 loopy code, but now your 4 loops are generating tasks rather than actually calling functions.
[00:27:03] Matthew Rocklin:
So switching gears a bit, I'm wondering if you can talk a bit about some of the most interesting or unexpected uses of Dask that you've seen. Yeah. So I think that the the Capital 1 example is probably a good example there. Right? You have a large, complex
[00:27:16] Tobias Macey:
graph that is created by lots of different analysts who are producing all of these little models, and it's parallel and used to run on parallel hardware, but it doesn't fit any particular mold. And that that model actually ends up being pretty common inside of a lot of companies. I just I moved from the Bay Area to New York City in July, and since then, I've been sort of interacting with all these sort of finance companies, and it's a very common use case. It seems like many companies have built internally some sort of task scheduler that they are just dying to get rid of because it is a pain to maintain.
So that's sort of 1 common use case, and there are sort of more fun examples. The Capital 1 is sort of the public 1. Another really cool use case, I think, is actually climate science. So on top of DASK so on top of DASK, there's DASK Array, which looks like NumPy or a large n dimensional array. On top of DASK Array, there's a project called Xarray. This This was originally by Stefan Heuer who was at the Climate Corporation and then it is now at Google Research. And, X-ray is used by a large component a large number of climate scientists to analyze atmospheric data, oceanographic data, etcetera. So this might be the temperature and the pressure and the wind speed everywhere on the earth for every fraction of a degree for the last 50 years. These datasets are 10 gigabytes, 100 gigabytes, many terabytes, and it's actually it's actually really cool. So this is sort of the first community that used Dask heavily, and and now there are, you know, there's frequently people at conferences or workshops who are using Dask to compute on 100 gigabyte datasets just using their laptop.
They're sort of reading from disk in a nice smooth way, and they can operate on much larger datasets than they could before. Those are sort of maybe 2 extremes, maybe. So the the Capital 1 case is ad hoc task scheduling, and then the climate science is using all the structured algorithms inside of DASK, DASK array. Maybe another sort of oddball 1 is there's the, Cesium database. It's a small project at UC Berkeley that is using Dask as a task scheduler. I think they replaced, Celery with Dask, and the reason for replacing it was, I think, latency. So Dask has something like a 20 millisecond round trip latency, which was really good because they were serving requests from a website. You click a button, and then that fires off a task, and that task, you know, does a combination, comes back, and they wanted this sort of really tight round trip. They're using task schedulers before Celery, but Celery had, like, a 500 millisecond delay, which was just sort of very painful for their, for the web experience. And what are some of the most significant problems that you've,
[00:29:49] Matthew Rocklin:
been faced with while working on DASK and as well as some that are still
[00:29:55] Tobias Macey:
as well as some issues that still need to be overcome as you move forward? I would say the biggest issue today isn't a technical 1. It's a social or cultural 1, in that most of the engineers I talk with, the companies it's very easy for me to convince technical people of the merits of the use of DaaS, if it is of use to them, but it's often hard to get their managers to agree. So there's this, I think a lot of companies are now deciding how to build out their parallel infrastructure, and this question is sort of Spark or Dask. That seems to be sort of the the competition that's arising inside of some of these at least the sort of enterprise financial institutions I've been talking to recently.
And so there's sort of sort of I need to get sort of get the word out there more, and so podcasts like yours are very helpful for this. So there's the cultural 1. Right? DaaS does not have the extent of hype that some of the other, big data systems have primarily because it wasn't mostly targeted towards big business. It was targeted more towards data science. But in terms of technical limitations so things that I'm thinking about right now include running into some interesting bottlenecks on inter node communication. It's hard to get above sort of a few 100 megabytes per second across different workers.
This is not normally a problem for most workloads. Data frame people never run into this, but, some DAS users are on sort of exotic networks where they have, you know, multi gigabyte per second bandwidth across across the network, and so we're running into some in some issues with Tornado. So the distributed system is built on top of the Python Tornado web framework, which has been excellent. We've been super happy, and they've been super responsive to issues and help. But it just it just wasn't designed for a large data transfer.
And so we're sort of trying now rethinking about how our, Internet communication will work. We're also constantly improving scheduling, so we often run into new classes of algorithms that, stress all the scheduling heuristics. I mean, it's now, you know, figure out how to make our our scheduler better, and that's always very fun. There was a fun exercise I was doing with Olivier Grisell, who works on scikit learn, some of the core developers, and he was doing some sort of cross validation machine learning thing. I'm not an expert there, so I wasn't very familiar, And he was sort of he was asking questions about how do I tweak things just so to make it just right, and I said, oh, actually, you know, just just try it out, and all the naive things worked. And he was sort of very, very happy that he sort of went into this problem expecting to have to do a lot of work, and everything was totally fine. But you never sort of know. You never know when a problem will will stress things.
A good example there actually is a lot of users were interested in using GPUs. They might have maybe 100 nodes, 100 machines in their cluster. 10 of those machines have 2 GPUs each, and they want to mark which tasks require a GPU and make sure that those only run on GPU nodes and that only 2 GPU tasks are run at once. There's a lot of these sort of interesting fun scheduling tricks, and it's a challenge to implement all of those features while still keeping the code base nimble so that it can accept more feature features in the in the future? Yeah. Scheduling algorithms
[00:32:54] Matthew Rocklin:
are something that are over my head currently because I haven't really had to dig into them much, but it's definitely something that is increasingly of import with the increase in distributed systems and distributed data analytics, but also in terms of things like cluster scheduling for containerized workloads with Mesos or Kubernetes. So I'm wondering what sorts of bodies of knowledge you're able to draw from to be able to come up with appropriate algorithms for the different workloads that you're you're supporting with DASK? Yeah. That's actually a really good question. So, actually, I did part of my dissertation on,
[00:33:29] Tobias Macey:
task scheduling. There's a large body of academic work that is actually virtually useless in this case. It was sort of a it was a real eye opener to interact with, this sort of real problem. I would say that task scheduling sort of falls into a few different categories. Either you're doing job scheduling, so this is there's sort of no dependencies between any of the tasks. You have a bunch of machines. You have a bunch of jobs to run. You want to sort of make sure that they run and, you know, make sure that no 1 gets unfairly punished.
And there's, you know, some fun research there. It's not of particular interest to me just because there aren't any data dependencies, which are pretty essential for any sort of non trivial algorithm. Then there's sort of very extreme scheduling at sort of exascale. You'll find lots of people talking about 1, 000, 000, 000 node scheduling and how that will likely work. There's these fairly hard to find sort of middle ground published work on task scheduling. If If you read sort of the original Spark paper, they say, Oh, yeah. We just sort of did what what Dryad did. Dryad was an earlier project out of Microsoft.
If you look at Dryad, they'll say, oh, we did sort of a naive thing. I think in a lot of big data systems like Spark or Hadoop or or DRIL, I assume. I haven't looked at their their paper or their work. They're operating in a fairly constrained case. They know that they're going to map or they know they're going to filter. They know that they're going to join, And as a result of that that sort of foreknowledge, their task scheduling can be fairly naive, and that's okay because they're able to to know much more semantically about what they're trying to do. In DaaS, we don't have that luxury. We are scheduling arbitrary graphs, and so it's it's much more Wild West. And I haven't found a whole lot of papers out there. I think there's something, by CL, like, it's like French for sky, c I l.
There's some work there. But generally speaking, this is sort of Wild West territory. There isn't a whole lot of of work out there. Or if there is work, it seems to be proprietary and people don't have a strong interest in publishing it. So from the perspective of a data engineer or somebody managing data infrastructure,
[00:35:27] Matthew Rocklin:
what are the particular interfaces to Dask or use cases of Dask that would be most beneficial to that particular area of work? Yeah. So I'd say that
[00:35:38] Tobias Macey:
there's an interesting so first of all, for data engineering generally, there are lots of uses of parallelism, and the core DASK system can be used in many of those. Right? It's very easy to do streaming computation on tasks. It's very easy to do lots of these things. To sort of call it a particular case, I think that the when I have to data engineers who are supporting users, Dask tends to serve as a sort of gateway to their compute resources. So what I mean by that is that they have a bunch of computers that are intended for all the researchers to to use to accelerate their their research and a bunch of researchers who are complaining that they don't know how to use the the cluster.
And so they're trying to sort of find some medium by which their users can draw on their cluster resources. And, you know, lots of tools can help them do this, job schedules work, spark work, etcetera. It's been fun seeing various companies start to use Dask in sort of a institutional kind of way where it's always running and someone, you know, logs into maybe a Jupyter Notebook, and they click the give me more Dask workers, please, and then they just start running with those extra resources. And you can sort of scale those up as things become more taxing, you can scale them down as they sort of start to free things, and that's sort of treating the cluster as sort of just a large set of cores that you can use
[00:36:57] Matthew Rocklin:
is of great value to some of these these companies and academic institutions as well. It's also a common problem inside of universities. Yeah, I can definitely see where having sort of a managed task service available for anybody who needs to do arbitrary computations on the available datasets would be pretty useful.
[00:37:14] Tobias Macey:
And from the way you're talking about its use with Jupyter Notebooks, it sounds like there's a kernel available for plugging into Jupyter as well. Is that correct? It's under kernel. I mean, Dask just runs in Python. So Dask is just a Python library. You just interact you import Dask and you're you're good to go. So there's no built in Jupyter integration. Well, the things I was talking about were deploying deploying DaaS workers. This has so far been entirely ad hoc in each case. DaaS is deployed on lots of different kinds of clusters, so we've deployed it on Yarn, on Mesos, on Kubernetes, on sort of more older HPC kind of schedulers like SGE or Slurm, and actually have yet to run into a case where I have 2 different people who are deploying it in the same way. And so so far, this has all been we've been rewriting this each time, but but it would be nice to get to the point where we can sort of make a guide or provide best practices or try to take out some of the sort of common things we've been seeing and make some tooling around that.
I wrote a blog post a few months ago on deploying Dask on clusters, which talks about some of these issues. It's a it's a nice read. Talks about both the technical problems and the social problems, and that there are so many different ways people want to use this or any cluster computing system that it's very hard to build sort of a single solution.
[00:38:27] Matthew Rocklin:
We just haven't yet found the solution yet. Yeah, we'll include that link to that article in the show notes. It definitely sounds like it would be relevant for a number of listeners. From a financial perspective, I know that you are sponsored in your work on DASK by Continuum Analytics and that the project itself is is held under the umbrella of the PI Data Organization. So I'm wondering what your thoughts are on the general financial landscape for open source data analytics and distributed computation frameworks. And maybe as a comparison to the broader world of open source projects and some of the issues that are being tackled there in terms of how to fund and maintain open source infrastructure.
[00:39:09] Tobias Macey:
Yeah. No. It's a fascinating question and a topic that if you go to open source conferences is a is a big issue. Actually, just as a early shout out, I'd like to say that if you, have funding you know, if any listeners have funding or they have companies that wanna donate anything, I'd highly recommend read the docs dot org. I think it's an organization that is of great use to all of us. I use read the docs several times per day, and yet it's it's something that's sort of very hard to find a good funding model for, so sort of a general plea. But, yeah, so I am in the very fortunate situation of getting paid real money to work on a free and open source software project, so I both sort of get to pay rent and have sort of a strong impact on the world, really sort of. I I feel that I do, at least through, you know, collaborations with climate scientists and such.
On the other hand, though, because I am now financially incentivized, the public should probably stop trusting me. Right? Everything I've said in this show, you should probably not believe me. You should go and look at it yourself. I sort of want to make DASK sound good. That is is good for me. And at some point, you know, this is this is good because we're funding lots of open source software, but it's also it's a little bit of a weird situation we're developing because all these people that we sort of normally trust as being honest scientific researchers now hold all of these conflicts of interest. And so it's it's sort of generally speaking, I think open source software is getting a little bit trickier, especially when, that software is useful for businesses as sort of any distributed computing system is.
So DaaS DaaS is actually in a pretty good state. I sort of don't feel much pressure. I think this is mostly because most of our grants most of our funding is through, you know, grants like the XSED grant in DARPA or the Moore Foundation, both of which seem to act pretty altruistically and care strongly about honesty and sort of open transparency. We do have some nice support contracts with financial companies, but they also are kind of nice in that they don't want us to say anything about them, so there's sort of, again, very few conflicts of interest there. Then also companies like Continuum so Continuum also funds Dask development, but it's nice in that Continuum like, doesn't actually make any direct money from Dask. They're sort of only in it because it sort of helps the long term health of the Python ecosystem within which they're invested. I think think it would be sort of a different story if DASK was funded by, you know, DASK Incorporated or whatever, if we had sort of a lot of short term pressure. Yeah. It's definitely interesting how,
[00:41:31] Matthew Rocklin:
at least it's in the big data space, there seems to be a lot more organizational investment in some of the big data tools with things like DataStax being the primary contributor to Cassandra or Cloudera being a big supporter of Hadoop along with Hortonworks and things like that where there's a company that has formed around the open source data processing tool versus a number of other large open source projects such as Ruby on Rails or Django that are used all over the place, but they don't really have a single organization
[00:42:09] Tobias Macey:
behind it. Right. Yeah. There's also a nice there's a nice set of companies now. I'm thinking actually of companies like, you know, like Bloomberg or Capital 1 that are funding open source projects just because it is actually just the the cheapest way to get their software built. So a lot of, like, the JupyterLab development JupyterLab is sort of a fun next generation of Jupyter. It's being funded by Bloomberg, and Bloomberg, I think, you know, doesn't have any financial benefit from this. They just want that software, and it is just cheaper for them to fund open source developers to build and then subsequently maintain that software than it is for them to build it in house. I think that the sort of costs of software maintenance became very real for a lot of larger companies, and they're realizing that if you can build community community made software, that is a much cheaper alternative.
So there's a lot of good companies that, even though they're acting completely in their own self interest, also act for the benefit of the world. In a way, that I think actually creates relatively few conflicts of interest. Also, to defend companies like maybe Cloudera and Hortonworks, What's sort of nice about them, and I think Continuum also falls in this category, is that they're not they're not beholden to particular technology. They're bolden to a sort of a larger ecosystem, and I think that defusing that need makes them act in a better way.
So, you know, if if Hadoop was suddenly not the right thing, I think Cloudera would not be pushing Hadoop. They would be pushing the new thing. They're not particularly tied to any particular system. They're tied to a
[00:43:39] Matthew Rocklin:
larger collection of tools. Yeah. And also through their stewardship, it makes sure that the existing tools do continue to improve and develop new capabilities that pushes forward the boundaries of the industry as a whole. Absolutely. Yeah. I would say Continuum's
[00:43:56] Tobias Macey:
enterprise support of Python is 1 of the reasons why we're seeing Python in enterprise now, development of conda or development of other can just download Anaconda, something that really helps increase confidence among a lot of companies, along a lot of government organizations, etcetera. So are there any other topics that you think we should cover? Very briefly, you were talking earlier about why Python is sort of entering into the the big data space and sort of what what maybe caused that to happen. And I was just wondering I'll add a add a little bit of a note there. I think it's largely because of technologies like Docker or Conda. I think that 1 of the big reasons why the JVM sort of succeeded in distributed infrastructure is because of the JAR and and redeployment.
You can trust that your code will run anywhere, whereas Python was sort of tied to a system Python that is really annoying to install source libraries, etcetera. And tools like vpanda that provides binary packages or Docker environments gave IT departments a lot more confidence in a broader set of tools, and so it's now fun to see a lot of other systems, you know, like Node,
[00:45:01] Matthew Rocklin:
like Python, like Ruby, now get deployed in the same way that, JVM technologies were deployed with the same faith and confidence? Alright. Well, for anybody who wants to keep in touch, I'll ask you for your preferred contact information, and I'll add that to the show notes, so they can refer there. I really appreciate you taking the time out of your day to talk about DASK. It's definitely an interesting project and 1 that I am going to be continuing to keep an eye on and hopefully find some use cases for in the not too distant future. I'm sure that a number of listeners will find some relevant use cases for it, and,
[00:45:34] Tobias Macey:
I appreciate your time. Yeah. No. This is a great time. If users are interested in trying it out, you can go to daske.pydata.org, and you'll also try it out on EC2 using the DAS EC2 project, which there will be a link there. That'll give you a nice Jupyter notebook, cluster, and everything running. Thank you, Duys, for your time and for the opportunity to eventually.
Introduction and Host Welcome
Interview with Matthew Rocklin
Matthew Rocklin's Background
Introduction to Dask
Blaze Ecosystem and Dask's Role
Dask vs. Hadoop Ecosystem
Use Cases for Dask
Dask in Financial Services
Handling Memory and Task Scheduling
Error Handling and Task Failures
Integration with Existing Tools
Performance Comparison with Spark
Interesting Use Cases of Dask
Challenges and Future Improvements
Dask for Data Engineers
Deployment and Cluster Management
Funding and Open Source Sustainability
Final Thoughts and Contact Information