Laziness and parallelism

If load is called with lazy=true flag, data is not immediately loaded in memory, but a task is created at each file node for loading the file.

In contrast, load without lazy=true simply loads the data one file at a time eagerly.

Lazy-loading allows you to save precious memory if you're not going to use most of the data. (e.g. If you just want to look at yellow taxi data but you end up loading the whole dataset, it's ok when in lazy mode).

When you lazy-load and chain operations on the lazy loaded data, you are also telling FileTrees about the dependency of tasks involved in the computation. mapvalues or reducevalues on lazy-loaded data will themselves return trees with lazy values or a lazy value respectively. To compute lazy values, you can call the exec function. This will do the computation in parallel.

using FileTrees, DataFrames, CSV

taxi_dir = FileTree("taxi-data")

lazy_dfs = FileTrees.load(taxi_dir; lazy=true) do file
    DataFrame(CSV.File(path(file)))
end
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (FileTrees.Thunk)
│  │  └─ yellow.csv (FileTrees.Thunk)
│  └─ 02/
│     ├─ green.csv (FileTrees.Thunk)
│     └─ yellow.csv (FileTrees.Thunk)
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (FileTrees.Thunk)
   │  └─ yellow.csv (FileTrees.Thunk)
   └─ 02/
      ├─ green.csv (FileTrees.Thunk)
      └─ yellow.csv (FileTrees.Thunk)
yellow′ = mv(lazy_dfs,
             r"(.*)/(.*)/yellow.csv",
             s"yellow/\1/\2.csv")["yellow"]
yellow/
├─ 2019/
│  ├─ 01.csv (FileTrees.Thunk)
│  └─ 02.csv (FileTrees.Thunk)
└─ 2020/
   ├─ 01.csv (FileTrees.Thunk)
   └─ 02.csv (FileTrees.Thunk)
yellowdf = exec(reducevalues(vcat, yellow′))

first(yellowdf, 15)
15×18 DataFrame
 Row │ VendorID  tpep_pickup_datetime  tpep_dropoff_datetime  passenger_count  trip_distance  RatecodeID  store_and_fwd_flag  PULocationID  DOLocationID  payment_type  fare_amount  extra    mta_tax  tip_amount  tolls_amount  improvement_surcharge  total_amount  congestion_surcharge
     │ Int64     String31              String31               Int64            Float64        Int64       String1             Int64         Int64         Int64         Float64      Float64  Float64  Float64     Float64       Float64                Float64       Float64?
─────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
   1 │        1  2019-01-01 00:46:40   2019-01-01 00:53:20                  1            1.5           1  N                            151           239             1          7.0      0.5      0.5        1.65          0.0                     0.3          9.95             missing
   2 │        1  2019-01-01 00:59:47   2019-01-01 01:18:59                  1            2.6           1  N                            239           246             1         14.0      0.5      0.5        1.0           0.0                     0.3         16.3              missing
   3 │        2  2018-12-21 13:48:30   2018-12-21 13:52:40                  3            0.0           1  N                            236           236             1          4.5      0.5      0.5        0.0           0.0                     0.3          5.8              missing
   4 │        2  2018-11-28 15:52:25   2018-11-28 15:55:45                  5            0.0           1  N                            193           193             2          3.5      0.5      0.5        0.0           0.0                     0.3          7.55             missing
   5 │        2  2018-11-28 15:56:57   2018-11-28 15:58:33                  5            0.0           2  N                            193           193             2         52.0      0.0      0.5        0.0           0.0                     0.3         55.55             missing
   6 │        2  2018-11-28 16:25:49   2018-11-28 16:28:26                  5            0.0           1  N                            193           193             2          3.5      0.5      0.5        0.0           5.76                    0.3         13.31             missing
   7 │        2  2018-11-28 16:29:37   2018-11-28 16:33:43                  5            0.0           2  N                            193           193             2         52.0      0.0      0.5        0.0           0.0                     0.3         55.55             missing
   8 │        1  2019-01-01 00:21:28   2019-01-01 00:28:37                  1            1.3           1  N                            163           229             1          6.5      0.5      0.5        1.25          0.0                     0.3          9.05             missing
   9 │        1  2019-01-01 00:32:01   2019-01-01 00:45:39                  1            3.7           1  N                            229             7             1         13.5      0.5      0.5        3.7           0.0                     0.3         18.5              missing
  10 │        1  2019-02-01 00:59:04   2019-02-01 01:07:27                  1            2.1           1  N                             48           234             1          9.0      0.5      0.5        2.0           0.0                     0.3         12.3                    0.0
  11 │        1  2019-02-01 00:33:09   2019-02-01 01:03:58                  1            9.8           1  N                            230            93             2         32.0      0.5      0.5        0.0           0.0                     0.3         33.3                    0.0
  12 │        1  2019-02-01 00:09:03   2019-02-01 00:09:16                  1            0.0           1  N                            145           145             2          2.5      0.5      0.5        0.0           0.0                     0.3          3.8                    0.0
  13 │        1  2019-02-01 00:45:38   2019-02-01 00:51:10                  1            0.8           1  N                             95            95             2          5.5      0.5      0.5        0.0           0.0                     0.3          6.8                    0.0
  14 │        1  2019-02-01 00:25:30   2019-02-01 00:28:14                  1            0.8           1  N                            140           263             2          5.0      0.5      0.5        0.0           0.0                     0.3          6.3                    0.0
  15 │        1  2019-02-01 00:38:02   2019-02-01 00:40:57                  1            0.8           1  N                            229           141             2          4.5      0.5      0.5        0.0           0.0                     0.3          5.8                    0.0

Here calling exec computes all the values required to compute the result. This means the green taxi data is never loaded into memory in this particular case.

Executors

FileTrees allows for control over the execution performed by exec by supplying an executor as the first argument to exec. This allows for control over the parallelism vs overhead tradeoff.

The available Executors can be found in the Executor submodule. The following executors are always available:

Additionally, the following executors are available in extension modules:

Parallel invocation with Dagger

Lets look at how to use Dagger for massive parallelism.

To obtain parallelism you need to start julia in a parallel way:

export JULIA_NUM_THREADS=10   # 10 concurrent tasks per process (will use multi-threading)
julia -p 8                    # 8 OS pocesses

In the REPL:

using Distributed, .Threads
@everywhere using FileTrees, CSV, DataFrames, Dagger

lazy_dfs = FileTrees.load(taxi_dir; lazy=true) do file
    # println("Loading $(path(file)) on $(myid()) on thread $(threadid())")
    DataFrame(CSV.File(path(file)))
end

first(exec(Executor.Dagger(), reducevalues(vcat, lazy_dfs[r"yellow.csv$"])), 15)
15×18 DataFrame
 Row │ VendorID  tpep_pickup_datetime  tpep_dropoff_datetime  passenger_count  trip_distance  RatecodeID  store_and_fwd_flag  PULocationID  DOLocationID  payment_type  fare_amount  extra    mta_tax  tip_amount  tolls_amount  improvement_surcharge  total_amount  congestion_surcharge
     │ Int64     String31              String31               Int64            Float64        Int64       String1             Int64         Int64         Int64         Float64      Float64  Float64  Float64     Float64       Float64                Float64       Float64?
─────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
   1 │        1  2019-01-01 00:46:40   2019-01-01 00:53:20                  1            1.5           1  N                            151           239             1          7.0      0.5      0.5        1.65          0.0                     0.3          9.95             missing
   2 │        1  2019-01-01 00:59:47   2019-01-01 01:18:59                  1            2.6           1  N                            239           246             1         14.0      0.5      0.5        1.0           0.0                     0.3         16.3              missing
   3 │        2  2018-12-21 13:48:30   2018-12-21 13:52:40                  3            0.0           1  N                            236           236             1          4.5      0.5      0.5        0.0           0.0                     0.3          5.8              missing
   4 │        2  2018-11-28 15:52:25   2018-11-28 15:55:45                  5            0.0           1  N                            193           193             2          3.5      0.5      0.5        0.0           0.0                     0.3          7.55             missing
   5 │        2  2018-11-28 15:56:57   2018-11-28 15:58:33                  5            0.0           2  N                            193           193             2         52.0      0.0      0.5        0.0           0.0                     0.3         55.55             missing
   6 │        2  2018-11-28 16:25:49   2018-11-28 16:28:26                  5            0.0           1  N                            193           193             2          3.5      0.5      0.5        0.0           5.76                    0.3         13.31             missing
   7 │        2  2018-11-28 16:29:37   2018-11-28 16:33:43                  5            0.0           2  N                            193           193             2         52.0      0.0      0.5        0.0           0.0                     0.3         55.55             missing
   8 │        1  2019-01-01 00:21:28   2019-01-01 00:28:37                  1            1.3           1  N                            163           229             1          6.5      0.5      0.5        1.25          0.0                     0.3          9.05             missing
   9 │        1  2019-01-01 00:32:01   2019-01-01 00:45:39                  1            3.7           1  N                            229             7             1         13.5      0.5      0.5        3.7           0.0                     0.3         18.5              missing
  10 │        1  2019-02-01 00:59:04   2019-02-01 01:07:27                  1            2.1           1  N                             48           234             1          9.0      0.5      0.5        2.0           0.0                     0.3         12.3                    0.0
  11 │        1  2019-02-01 00:33:09   2019-02-01 01:03:58                  1            9.8           1  N                            230            93             2         32.0      0.5      0.5        0.0           0.0                     0.3         33.3                    0.0
  12 │        1  2019-02-01 00:09:03   2019-02-01 00:09:16                  1            0.0           1  N                            145           145             2          2.5      0.5      0.5        0.0           0.0                     0.3          3.8                    0.0
  13 │        1  2019-02-01 00:45:38   2019-02-01 00:51:10                  1            0.8           1  N                             95            95             2          5.5      0.5      0.5        0.0           0.0                     0.3          6.8                    0.0
  14 │        1  2019-02-01 00:25:30   2019-02-01 00:28:14                  1            0.8           1  N                            140           263             2          5.0      0.5      0.5        0.0           0.0                     0.3          6.3                    0.0
  15 │        1  2019-02-01 00:38:02   2019-02-01 00:40:57                  1            0.8           1  N                            229           141             2          4.5      0.5      0.5        0.0           0.0                     0.3          5.8                    0.0

If running in an environment with 8 procs with 10 threads each, 80 tasks will work on them in parallel (they are ultimately scheduled by the OS). Once a task has finished, the data required to execute the task is freed from memory if no longer required by any other task. So in this example, the DataFrames loaded from disk are freed from memory right after they've been reduced with vcat.

reducevalues performs an associative reduce to aide in the freeing of memory: the first two files are loaded, vcat is called on them, and the input dataframes are freed from memory. And then when the next two files have been similarly vcated, the two resulting values are then vcated and freed, and so on.

If you wish to compute on more data than you have memory to hold, the following information should help you:

As discussed in this example, there are 80 concurrent tasks at any given time executing a task in the graph. So at any given time, the peak memory usage will be the peak memory usage of 80 of the tasks in the task graph. Hence one can plan how many processes and threads should be started at the beginning of a computation so as to keep the memory usage manageable.

It is also necessary to keep in mind what amount of memory a call to exec will produce, since that memory allocation cannot be avoided. This means reducevalues where the reduction computes a small value (such as sum or mean) works best.

Caching

Daggers compute function is different from the exec function in that, it will compute the results of the tasks in the tree and leave the data on remote processes rather than fetch it to the master process. Calling compute on a tree will also cause any subsequent requests to compute the same tasks to be served from a cache memory rather than recomputed.