I finally found some time to work on something I wanted to start immediately when I discovered Nim: A data frame API allowing lazy out-of-core data analytics. You can find a first working prototype over at GitHub (under the uninspired name NimData). So far I'm pretty happy with what is possible already now. In fact I got carried away a little bit when creating the readme example based on this Bundesliga data set ;).
Quick motivation: The project is targeted between the worlds of Pandas/R and map-reduce successors like Spark/Flink. In the Pandas/R world, the general assumption is that your data fits into memory, and things get awkward when the data size exceeds this threshold. Spark/Flink offer a different API design, which makes distributed out-of-core processing easier, and many companies are moving in this direction now. However, this can be a heavy investment because Spark/Flink basically require that you build and operate a cluster (=> costly/complicated) -- the performance of running in local mode is often not so great (partly due to required abstractions + JVM). NimData uses a very similar API design allowing out-of-core processing, but for the time being, the focus is just on running locally. In other words, the target is "medium" sized data, i.e., data which may not fit into memory entirely, but does not justify to invest in a cluster. I haven't started benchmarking yet, but I could imagine that Nim's native speed might even keep up with a small cluster for certain problems.
In terms of the design I'm at a point where I could use some help from the Nim veterans to verify if this is a good path. I went for a design based on dynamic dispatching, and I was facing some of problems related to both method dispatch and iterators. The biggest showstopper was that I could not mix generic and specific methods (which didn't allow to actually read from files :(), but then I found the work-around documented in #5325. But there are still some issues left, and I'm not sure if they are either a mistake on my part or a known/unknown Nim issue. Some of the issues are tricky, because they disappear in isolated examples. I have prepared the following minimal example which illustrates both the general design and the outstanding issues:
import times
import typetraits
import strutils
import sequtils
import future
import macros
type
DataFrame[T] = ref object of RootObj
CachedDataFrame[T] = ref object of DataFrame[T]
data: seq[T]
MappedDataFrame[U, T] = ref object of DataFrame[T]
orig: DataFrame[U]
f: proc(x: U): T
FilteredDataFrame[T] = ref object of DataFrame[T]
orig: DataFrame[T]
f: proc(x: T): bool
proc newCachedDataFrame[T](data: seq[T]): DataFrame[T] =
result = CachedDataFrame[T](data: data)
# -----------------------------------------------------------------------------
# Transformations
# -----------------------------------------------------------------------------
# Issue 1: I'm getting a deprecation warning for this function:
# `Warning: generic method not attachable to object type is deprecated`
# I don't understand why it is not attachable, T and U are both unambiguous
# from the mapping proc. Is this a showstopper, because it will break
# in a future version of Nim?
method map[U, T](df: DataFrame[U], f: proc(x: U): T): DataFrame[T] {.base.} =
result = MappedDataFrame[U, T](orig: df, f: f)
method filter[T](df: DataFrame[T], f: proc(x: T): bool): DataFrame[T] {.base.} =
result = FilteredDataFrame[T](orig: df, f: f)
# -----------------------------------------------------------------------------
# Iterators
# -----------------------------------------------------------------------------
# Issue 2: I don't understand why this dummy wrapper is required below.
# In the `for x in ...` lines below I was trying two variants:
#
# - `for x in it:` This gives a compilation error:
# `Error: type mismatch: got (iterator (): int{.closure.})`
# which is surprising because that is exactly the type required, isn't it?
# - `for x in it():` This compiles, but it leads to bugs!
# When chaining e.g. two `map` calls the resulting iterator will
# just return zero elements, irrespective of what the original
# iterator is.
#
# For some strange reason converting the closure iterator to an inline
# iterator can serve as a work around.
iterator toIterBugfix[T](closureIt: iterator(): T): T {.inline.} =
for x in closureIt():
yield x
method iter[T](df: DataFrame[T]): (iterator(): T) {.base.} =
quit "base method called (DataFrame.iter)"
method iter[T](df: CachedDataFrame[T]): (iterator(): T) =
result = iterator(): T =
for x in df.data:
yield x
method iter[U, T](df: MappedDataFrame[U, T]): (iterator(): T) =
result = iterator(): T =
var it = df.orig.iter()
for x in toIterBugfix(it): # why not just `it` or `it()`?
yield df.f(x)
method iter[T](df: FilteredDataFrame[T]): (iterator(): T) =
result = iterator(): T =
var it = df.orig.iter()
for x in toIterBugfix(it): # why not just `it` or `it()`?
if df.f(x):
yield x
# -----------------------------------------------------------------------------
# Actions
# -----------------------------------------------------------------------------
# Issue 3: I'm getting multiple warnings for this line, like:
# df_design_02.nim(87, 8) Warning: method has lock level <unknown>, but another method has 0 [LockLevel]
# df_design_02.nim(81, 8) Warning: method has lock level 0, but another method has <unknown> [LockLevel]
# df_design_02.nim(81, 8) Warning: method has lock level 0, but another method has <unknown> [LockLevel]
# df_design_02.nim(81, 8) Warning: method has lock level 0, but another method has <unknown> [LockLevel]
# df_design_02.nim(81, 8) Warning: method has lock level 0, but another method has <unknown> [LockLevel]
# Where the first warning points to the third definition of collect (the one for MappedDataFrame).
# I'm confused because none of them has an explicit locklevel.
method collect[T](df: DataFrame[T]): seq[T] {.base.} =
quit "base method called (DataFrame.collect)"
method collect[T](df: CachedDataFrame[T]): seq[T] =
result = df.data
method collect[S, T](df: MappedDataFrame[S, T]): seq[T] =
result = newSeq[T]()
let it = df.orig.iter()
for x in it():
result.add(df.f(x))
# This issue is triggered by client code looking like this (also
# demonstrates the iterator bug):
let data = newCachedDataFrame[int](@[1, 2, 3])
let mapped = data.map(x => x*2)
echo mapped.collect()
echo data.map(x => x*2).collect()
echo data.map(x => x*2).map(x => x*2).collect()
echo data.filter(x => x mod 2 == 1).map(x => x * 100).collect()
# Issue 4:
# The following errors with
# `Error: method is not a base`
# but only when there is client code calling it with different
# types T, e.g. int and string. Not a big problem right now since
# I can just make it a proc (no overloads required at the moment),
# but still a bit worrying, because later overloading might be necessary.
when false:
method count*[T](df: DataFrame[T]): int {.base.} =
result = 0
let it = df.iter()
for x in toIterBugfix(it):
result += 1
echo newCachedDataFrame[int](@[1, 2, 3]).count()
echo newCachedDataFrame[string](@["1", "2", "3"]).count()
# Issue 5: Trying to define generic numerical actions like mean/min/max fails
# with a compilation error:
# `Error: type mismatch: got (string) but expected 'float'`
# even though the method is never called with T being a string.
# Work around is also to use a proc, but overloading may be
# desirable.
when false:
method mean*[T](df: DataFrame[T]): float =
result = 0
var count = 0
let it = df.iter()
for x in it():
count += 1
result += x.float
result /= count.float
# To get the error it suffices (and is required) to just have a DataFrame[string]
# in scope:
let strData = newCachedDataFrame[string](@["A", "B"])
Any ideas what is happening here? I feel like Issues 3 + 4 might be related to #5325. And do you think the overall design makes sense and is in line with the Nim 1.0 roadmap (I'm a bit concerned about the deprecation warning of Issue 1)? I though about the following alternatives, not sure if they would work at all:
Regarding performance, I'm a bit concerned about the double indirection of having to wrap the closure iterators in an inline iterator. I want to start with benchmarking soon, and eventually it would be nice if the design is smart enough to "collapse" certain operations. For instance, if the transformation chain has a map+map, filter+filter, map+filter, ..., they could be merged together, so that they will be compiled in a single loop without another function call to a closure iterator. Any ideas of how to squeeze out the maximum performance are highly appreciated. Or rather any kind of contribution :).
Well your solution does work I think. It feels a bit like what Scala is doing in their collection library. But on the other hand Scala is on the JVM and the JVM is as badass that it can inline virtual methods. I think in the real world it really is a lot of overhead, that you do not want in your approach. I tried once to implement the same with just iterators, but sadly I was limited by the language:
iterator filter[T](arg: iterator(): T; cond: proc(arg: T): bool): T =
for x in arg:
if cond(x):
yield arg
iterator map[T,U](arg: iterator(): T; fun: proc(arg: T): U): U =
for x in arg:
yield fun(x)
let data = [1,2,3,4,5,6,7,8,9]
for x in filter(data.items(), proc(arg: int): bool = arg mod 2 == 1) ## undeclared field items, meh...
The language should just have some automatic feature to raise any iterator type to a closure iterator bundled with their arguments. Otherwise it is just meh to use, and I will avoid to design an API that is based on it, because it just feels not thought through to the end.
Something that might actually work very well, is when you write macros to define your own language within Nim.
macro dataLanguage(arg: untyped): untyped =
[...]
dataLanguage:
var result = myData.filter(_.name.len < 7).filter(_.age > 30).map(_.birthday)
Then you would have total control on how these expressions get compiled. They can be reduced to a single loop. You could inject commands for memory mapping, and even inject commands for managing a cloud of computers over a network. It is certainly the most powerful solution, but also very hard to do the right thing. I could totally understand, when you do not want to do that. I used scala notation here for the macro, and that would be possible. The _ is where the argument is inserted.
I think that issue #1 is a bug if https://github.com/nim-lang/Nim/blob/devel/tests/method/tgeneric_methods.nim is intended to continue to work.
Edit: I'm not as sure now - after doing some primitive debugging:
# Line 34 of your test - the 'map' method
@[map, , [U, T], (df: DataFrame[U]; f: proc (x: U): T): DataFrame[T], {.base.}, [[U, T]],
result = MappedDataFrame[U, T](orig: df, f: f)]
[U, T]
# This is 'result.sons' and 'results.sons[genericParamsPos]' in semstmts.nim
test.nim(34, 1) Warning: generic method not attachable to object type is deprecated [Deprecated]
# Filter - Line 38
@[filter, , [T], (df: DataFrame[T]; f: proc (x: T): bool): DataFrame[T], {.base.}, [[T]],
result = FilteredDataFrame[T](orig: df, f: f)]
[T]
@[iter, , [T], (df: DataFrame[T]): (iterator (): T), {.base.}, [[T]],
quit "base method called (DataFrame.iter)"]
[T]
@[iter, , [T], (df: CachedDataFrame[T]): (iterator (): T), , [[T]],
result = iterator (): T =
for x in df.data:
yield x
]
[T]
I think that it's giving this warning correctly because there is a generic parameter attached to the proc and not the DataFrame type. The others don't error because the generic parameters are attached to the DataFrame[T].
I found https://forum.nim-lang.org/t/2146 which gives some rationale for deprecating methods not attached to object types.
@krux: Thanks for the feedback! I was thinking about your dataLanguage idea as well. But I don't know if this would allow good composability in larger projects, i.e., you want data frames to be something that can be passed around etc. What I can probably do in the end is to write a macro which takes the iteration body:
iterate(dataFrame):
echo x
and the macro would analyze the processing pipeline and generate a single for loop internally. The good thing is that the user side of the API only exposes transformations and actions. So it doesn't really matter for now if I use closure iterators, and I can still switch the iteration logic internally later.
And the good news is that I get exceptionally good performance with the closure iterator approach already (I pushed a few first benchmarks results). I have optimized the CSV parsing macro a little bit, and CSV parsing is now a factor of 2 faster than Pandas, which is known to have a very fast parser. As expected for data which is still too small for Spark to shine, Nim is faster by a factor of 10 (although Spark already runs on 4 cores).
I also made some good progress in terms of features and updated the documentation a lot, so this is reaching a state where it is actually pretty much usable.
@perturbation2: Yes very good point, and thanks for the link. If Nim will not feature multi dispatch this might become a problem. For now I don't need multi dispatch for the higher order functions like map though (and maybe I never will), so I will simply stick to using a proc (which I hope will continue to work).
I also understand issue 3 now, for the record: The compiler is basically complaining that the base method has a provable lock level of 0, while one of the overloaded methods calls a procvar, and potentially, this procvar can lock. The locking system needs to know lock levels at compile time though, which is ruined by the dynamic dispatching in this case. There are two ways out: Convince the compiler that the procvar does not lock, or tell the compiler in the base method that there will be overloads of unknown lock level. The former would be nicer, but I can't really get it to work for now -- for the latter I have opened a tiny PR to allow doing that in Nim.
This is a really cool project, thanks a lot for your efforts! 👍
Kind regards, Axel
I just looked at the next steps...
Plotting via Bokeh would be awesome, I already use that library a lot from Python/Jupyter.
@chemist69: Thanks! And by the way, I forgot to mention the most important feature from a practical perspective: There already is a simple browser viewer available via df.openInBrowser(). No plotting yet, but at least handy for quick data inspection.
@jlp765: Handling different types is already possible, as illustrated in the readme example as well. The main difference between to dynamically typed APIs like Pandas is that you once have to tell the compiler about your schema -- but then you can fully benefit from type-safety in the processing pipeline. Let's say your CSV has columns name (string), age (int), height (float), birthday (date), then your code would look like this:
const schema = [
col(StrCol, "name"),
col(IntCol, "age"),
col(FloatCol, "height"),
col(DateCol, "birthday") # DateCol not yet implemented, but coming soon
]
let df = DF.fromText("data.csv.gz").map(schemaParser(schema, ','))
What happens here, is that the schemaParser macro builds a parser proc which takes a string as input and returns a named tuple of type tuple[name: string, age: int64, height: float, birthday: SomeDateTimeTypeTBD] (note that this allows to generate highly customized machine code, which is why the parser can be much faster than generic parsers). So yes, the data frame only holds a single type, but that type is heterogenous, and you can extract the individual "columns" back by e.g. df.map(x => x.name) giving you a DataFrame[string] instead of the full tuple.
Having to specify the schema might look tedious from a Pandas perspective. But the big benefit is that you can never get the column names or types wrong. In Pandas you see a lot of code which just says def preprocess_data(df), and it is neither clear what df really contains nor what assumptions preprocess_data makes on the data. This can be solved by extensive documentation & testing, but is still difficult to maintain in big projects. With a type safe schema the assumptions about the data become explicit in the code, and the compiler can ensure that they are satisfied.
Global aggregation is already available. You could do for instance df.map(x => x.age).mean() to get the average age. There is also reduce/fold, which allows to implement custom aggregation functions. What's still missing is groupBy and join, but they are high priority for me as well, so I hope I can add them soon.
Congratulations!
I like to use Pandas quite a bit for various data chores and I am really impressed by this work.
The API is nice and simple. I really like your work here.
I have to say I am a little confused about the aim of the library.
The whole point of the data frame abstraction - as I understand it - is that they are a columnar representation of data, usually in memory. What you are describing here seems more like lazy sequences. Most implementation I know of data frames have eager operations. For instance, I would expect df.map(x => x.age) to be a constant time operation (because the age column is already in memory).
In what ways does this library differ from other lazy sequence implementations such as lazy, iterutils, or mangle?
I do not want to sound dismissive - I am happy to see this library, but I am not sure what is the direction it wants to take.
@andrea: You are right, that is probably not yet very clear from the docs. They currently rely too much on being familiar with the concept of caching in frameworks like Spark (in fact, the API is modelled after Spark's RDDs, but for now without the distributed aspect).
The idea is basically to offer both lazy operations and eager operations. If your code looks like this
let df = DF.fromFile("data.csv").map(schemaParser, ',')
let averageAge = df.map(x => x.age).mean()
let maxHeight = df.map(x => x.height).max()
let longestName = df.map(x = x.name.len).max()
the operations are completely lazy, i.e., you would read the file three times from scratch, apply the parser each time, and continue with the respective operation. You would use this approach when your data is huge and it can't fit into memory entirely. This is obviously inefficient for small data which would fit into memory.
If you write the exact same code, but replace the first line by
let df = DF.fromFile("data.csv").map(schemaParser, ',').cache()
the result of the mapping operation would be persisted in memory (using a seq[T] internally). Thus, all following operations will use the cache, and you would have to read and parse the file only once. Caching gives very good control over trading off memory usage over recomputation. You can cache the computation pipeline at any point, as long as you have the required memory. Typically you would use cache after having done some (potentially complex) computations which are required for multiple consecutive computations like repeated iteration in machine learning algorithms. In some use cases the data can also be preprocessed to make it fit into memory, e.g., by filtering to the interesting bits of the data, downsampling, or simply projecting the input data down to a single column.
Other differences to purely lazy libraries are that data frames require operations like sort or groupby, which require some sort of internal persistence. For now I'm just using an implementation which falls back on storing the entire data in memory, but I hope that I can add some spill-to-disk features later.