Laziness and parallelism

If load is called with lazy=true flag, data is not immediately loaded in memory, but a task is created at each file node for loading the file.

In contrast, load without lazy=true simply loads the data one file at a time eagerly.

Lazy-loading allows you to save precious memory if you're not going to use most of the data. (e.g. If you just want to look at yellow taxi data but you end up loading the whole dataset, it's ok when in lazy mode).

When you lazy-load and chain operations on the lazy loaded data, you are also telling FileTrees about the dependency of tasks involved in the computation. mapvalues or reducevalues on lazy-loaded data will themselves return trees with lazy values or a lazy value respectively. To compute lazy values, you can call the exec function. This will do the computation in parallel.

using Distributed # for @everywhere
@everywhere using FileTrees
@everywhere using DataFrames, CSV

taxi_dir = FileTree("taxi-data")

lazy_dfs = FileTrees.load(taxi_dir; lazy=true) do file
    DataFrame(CSV.File(path(file)))
end
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (Thunk(#1, (File(taxi-data/2019/01/green.csv),)))
│  │  └─ yellow.csv (Thunk(#1, (File(taxi-data/2019/01/yellow.csv),)))
│  └─ 02/
│     ├─ green.csv (Thunk(#1, (File(taxi-data/2019/02/green.csv),)))
│     └─ yellow.csv (Thunk(#1, (File(taxi-data/2019/02/yellow.csv),)))
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (Thunk(#1, (File(taxi-data/2020/01/green.csv),)))
   │  └─ yellow.csv (Thunk(#1, (File(taxi-data/2020/01/yellow.csv),)))
   └─ 02/
      ├─ green.csv (Thunk(#1, (File(taxi-data/2020/02/green.csv),)))
      └─ yellow.csv (Thunk(#1, (File(taxi-data/2020/02/yellow.csv),)))
yellow′ = mv(lazy_dfs,
             r"(.*)/(.*)/yellow.csv",
             s"yellow/\1/\2.csv")["yellow"]
yellow/
├─ 2019/
│  ├─ 01.csv (Thunk(#1, (File(taxi-data/2019/01/yellow.csv),)))
│  └─ 02.csv (Thunk(#1, (File(taxi-data/2019/02/yellow.csv),)))
└─ 2020/
   ├─ 01.csv (Thunk(#1, (File(taxi-data/2020/01/yellow.csv),)))
   └─ 02.csv (Thunk(#1, (File(taxi-data/2020/02/yellow.csv),)))
yellowdf = exec(reducevalues(vcat, yellow′))

first(yellowdf, 15)
15×18 DataFrame
│ Row │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │ passenger_count │ trip_distance │ RatecodeID │ store_and_fwd_flag │ PULocationID │ DOLocationID │ payment_type │ fare_amount │ extra   │ mta_tax │ tip_amount │ tolls_amount │ improvement_surcharge │ total_amount │ congestion_surcharge │
│     │ Int64    │ String               │ String                │ Int64           │ Float64       │ Int64      │ String             │ Int64        │ Int64        │ Int64        │ Float64     │ Float64 │ Float64 │ Float64    │ Float64      │ Float64               │ Float64      │ Float64?             │
├─────┼──────────┼──────────────────────┼───────────────────────┼─────────────────┼───────────────┼────────────┼────────────────────┼──────────────┼──────────────┼──────────────┼─────────────┼─────────┼─────────┼────────────┼──────────────┼───────────────────────┼──────────────┼──────────────────────┤
│ 1   │ 1        │ 2019-01-01 00:46:40  │ 2019-01-01 00:53:20   │ 1               │ 1.5           │ 1          │ N                  │ 151          │ 239          │ 1            │ 7.0         │ 0.5     │ 0.5     │ 1.65       │ 0.0          │ 0.3                   │ 9.95         │ missing              │
│ 2   │ 1        │ 2019-01-01 00:59:47  │ 2019-01-01 01:18:59   │ 1               │ 2.6           │ 1          │ N                  │ 239          │ 246          │ 1            │ 14.0        │ 0.5     │ 0.5     │ 1.0        │ 0.0          │ 0.3                   │ 16.3         │ missing              │
│ 3   │ 2        │ 2018-12-21 13:48:30  │ 2018-12-21 13:52:40   │ 3               │ 0.0           │ 1          │ N                  │ 236          │ 236          │ 1            │ 4.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 5.8          │ missing              │
│ 4   │ 2        │ 2018-11-28 15:52:25  │ 2018-11-28 15:55:45   │ 5               │ 0.0           │ 1          │ N                  │ 193          │ 193          │ 2            │ 3.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 7.55         │ missing              │
│ 5   │ 2        │ 2018-11-28 15:56:57  │ 2018-11-28 15:58:33   │ 5               │ 0.0           │ 2          │ N                  │ 193          │ 193          │ 2            │ 52.0        │ 0.0     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 55.55        │ missing              │
│ 6   │ 2        │ 2018-11-28 16:25:49  │ 2018-11-28 16:28:26   │ 5               │ 0.0           │ 1          │ N                  │ 193          │ 193          │ 2            │ 3.5         │ 0.5     │ 0.5     │ 0.0        │ 5.76         │ 0.3                   │ 13.31        │ missing              │
│ 7   │ 2        │ 2018-11-28 16:29:37  │ 2018-11-28 16:33:43   │ 5               │ 0.0           │ 2          │ N                  │ 193          │ 193          │ 2            │ 52.0        │ 0.0     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 55.55        │ missing              │
│ 8   │ 1        │ 2019-01-01 00:21:28  │ 2019-01-01 00:28:37   │ 1               │ 1.3           │ 1          │ N                  │ 163          │ 229          │ 1            │ 6.5         │ 0.5     │ 0.5     │ 1.25       │ 0.0          │ 0.3                   │ 9.05         │ missing              │
│ 9   │ 1        │ 2019-01-01 00:32:01  │ 2019-01-01 00:45:39   │ 1               │ 3.7           │ 1          │ N                  │ 229          │ 7            │ 1            │ 13.5        │ 0.5     │ 0.5     │ 3.7        │ 0.0          │ 0.3                   │ 18.5         │ missing              │
│ 10  │ 1        │ 2019-02-01 00:59:04  │ 2019-02-01 01:07:27   │ 1               │ 2.1           │ 1          │ N                  │ 48           │ 234          │ 1            │ 9.0         │ 0.5     │ 0.5     │ 2.0        │ 0.0          │ 0.3                   │ 12.3         │ 0.0                  │
│ 11  │ 1        │ 2019-02-01 00:33:09  │ 2019-02-01 01:03:58   │ 1               │ 9.8           │ 1          │ N                  │ 230          │ 93           │ 2            │ 32.0        │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 33.3         │ 0.0                  │
│ 12  │ 1        │ 2019-02-01 00:09:03  │ 2019-02-01 00:09:16   │ 1               │ 0.0           │ 1          │ N                  │ 145          │ 145          │ 2            │ 2.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 3.8          │ 0.0                  │
│ 13  │ 1        │ 2019-02-01 00:45:38  │ 2019-02-01 00:51:10   │ 1               │ 0.8           │ 1          │ N                  │ 95           │ 95           │ 2            │ 5.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 6.8          │ 0.0                  │
│ 14  │ 1        │ 2019-02-01 00:25:30  │ 2019-02-01 00:28:14   │ 1               │ 0.8           │ 1          │ N                  │ 140          │ 263          │ 2            │ 5.0         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 6.3          │ 0.0                  │
│ 15  │ 1        │ 2019-02-01 00:38:02  │ 2019-02-01 00:40:57   │ 1               │ 0.8           │ 1          │ N                  │ 229          │ 141          │ 2            │ 4.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 5.8          │ 0.0                  │

Here calling exec computes all the values required to compute the result. This means the green taxi data is never loaded into memory in this particular case.

Parallel invocation

To obtain parallelism you need to start julia in a parallel way:

export JULIA_NUM_THREADS=10   # 10 concurrent tasks per process (will use multi-threading)
julia -p 8                    # 8 OS pocesses

In the REPL:

using Distributed, .Threads
@everywhere using FileTrees, CSV, DataFrames

lazy_dfs = FileTrees.load(taxi_dir; lazy=true) do file
    # println("Loading $(path(file)) on $(myid()) on thread $(threadid())")
    DataFrame(CSV.File(path(file)))
end

first(exec(reducevalues(vcat, lazy_dfs[r"yellow.csv$"])), 15)
15×18 DataFrame
│ Row │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │ passenger_count │ trip_distance │ RatecodeID │ store_and_fwd_flag │ PULocationID │ DOLocationID │ payment_type │ fare_amount │ extra   │ mta_tax │ tip_amount │ tolls_amount │ improvement_surcharge │ total_amount │ congestion_surcharge │
│     │ Int64    │ String               │ String                │ Int64           │ Float64       │ Int64      │ String             │ Int64        │ Int64        │ Int64        │ Float64     │ Float64 │ Float64 │ Float64    │ Float64      │ Float64               │ Float64      │ Float64?             │
├─────┼──────────┼──────────────────────┼───────────────────────┼─────────────────┼───────────────┼────────────┼────────────────────┼──────────────┼──────────────┼──────────────┼─────────────┼─────────┼─────────┼────────────┼──────────────┼───────────────────────┼──────────────┼──────────────────────┤
│ 1   │ 1        │ 2019-01-01 00:46:40  │ 2019-01-01 00:53:20   │ 1               │ 1.5           │ 1          │ N                  │ 151          │ 239          │ 1            │ 7.0         │ 0.5     │ 0.5     │ 1.65       │ 0.0          │ 0.3                   │ 9.95         │ missing              │
│ 2   │ 1        │ 2019-01-01 00:59:47  │ 2019-01-01 01:18:59   │ 1               │ 2.6           │ 1          │ N                  │ 239          │ 246          │ 1            │ 14.0        │ 0.5     │ 0.5     │ 1.0        │ 0.0          │ 0.3                   │ 16.3         │ missing              │
│ 3   │ 2        │ 2018-12-21 13:48:30  │ 2018-12-21 13:52:40   │ 3               │ 0.0           │ 1          │ N                  │ 236          │ 236          │ 1            │ 4.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 5.8          │ missing              │
│ 4   │ 2        │ 2018-11-28 15:52:25  │ 2018-11-28 15:55:45   │ 5               │ 0.0           │ 1          │ N                  │ 193          │ 193          │ 2            │ 3.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 7.55         │ missing              │
│ 5   │ 2        │ 2018-11-28 15:56:57  │ 2018-11-28 15:58:33   │ 5               │ 0.0           │ 2          │ N                  │ 193          │ 193          │ 2            │ 52.0        │ 0.0     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 55.55        │ missing              │
│ 6   │ 2        │ 2018-11-28 16:25:49  │ 2018-11-28 16:28:26   │ 5               │ 0.0           │ 1          │ N                  │ 193          │ 193          │ 2            │ 3.5         │ 0.5     │ 0.5     │ 0.0        │ 5.76         │ 0.3                   │ 13.31        │ missing              │
│ 7   │ 2        │ 2018-11-28 16:29:37  │ 2018-11-28 16:33:43   │ 5               │ 0.0           │ 2          │ N                  │ 193          │ 193          │ 2            │ 52.0        │ 0.0     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 55.55        │ missing              │
│ 8   │ 1        │ 2019-01-01 00:21:28  │ 2019-01-01 00:28:37   │ 1               │ 1.3           │ 1          │ N                  │ 163          │ 229          │ 1            │ 6.5         │ 0.5     │ 0.5     │ 1.25       │ 0.0          │ 0.3                   │ 9.05         │ missing              │
│ 9   │ 1        │ 2019-01-01 00:32:01  │ 2019-01-01 00:45:39   │ 1               │ 3.7           │ 1          │ N                  │ 229          │ 7            │ 1            │ 13.5        │ 0.5     │ 0.5     │ 3.7        │ 0.0          │ 0.3                   │ 18.5         │ missing              │
│ 10  │ 1        │ 2019-02-01 00:59:04  │ 2019-02-01 01:07:27   │ 1               │ 2.1           │ 1          │ N                  │ 48           │ 234          │ 1            │ 9.0         │ 0.5     │ 0.5     │ 2.0        │ 0.0          │ 0.3                   │ 12.3         │ 0.0                  │
│ 11  │ 1        │ 2019-02-01 00:33:09  │ 2019-02-01 01:03:58   │ 1               │ 9.8           │ 1          │ N                  │ 230          │ 93           │ 2            │ 32.0        │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 33.3         │ 0.0                  │
│ 12  │ 1        │ 2019-02-01 00:09:03  │ 2019-02-01 00:09:16   │ 1               │ 0.0           │ 1          │ N                  │ 145          │ 145          │ 2            │ 2.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 3.8          │ 0.0                  │
│ 13  │ 1        │ 2019-02-01 00:45:38  │ 2019-02-01 00:51:10   │ 1               │ 0.8           │ 1          │ N                  │ 95           │ 95           │ 2            │ 5.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 6.8          │ 0.0                  │
│ 14  │ 1        │ 2019-02-01 00:25:30  │ 2019-02-01 00:28:14   │ 1               │ 0.8           │ 1          │ N                  │ 140          │ 263          │ 2            │ 5.0         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 6.3          │ 0.0                  │
│ 15  │ 1        │ 2019-02-01 00:38:02  │ 2019-02-01 00:40:57   │ 1               │ 0.8           │ 1          │ N                  │ 229          │ 141          │ 2            │ 4.5         │ 0.5     │ 0.5     │ 0.0        │ 0.0          │ 0.3                   │ 5.8          │ 0.0                  │

If running in an environment with 8 procs with 10 threads each, 80 tasks will work on them in parallel (they are ultimately scheduled by the OS). Once a task has finished, the data required to execute the task is freed from memory if no longer required by any other task. So in this example, the DataFrames loaded from disk are freed from memory right after they've been reduced with vcat.

reducevalues performs an associative reduce to aide in the freeing of memory: the first two files are loaded, vcat is called on them, and the input dataframes are freed from memory. And then when the next two files have been similarly vcated, the two resulting values are then vcated and freed, and so on.

If you wish to compute on more data than you have memory to hold, the following information should help you:

As discussed in this example, there are 80 concurrent tasks at any given time executing a task in the graph. So at any given time, the peak memory usage will be the peak memory usage of 80 of the tasks in the task graph. Hence one can plan how many processes and threads should be started at the beginning of a computation so as to keep the memory usage manageable.

It is also necessary to keep in mind what amount of memory a call to exec will produce, since that memory allocation cannot be avoided. This means reducevalues where the reduction computes a small value (such as sum or mean) works best.

Caching

The compute function is different from the exec function in that, it will compute the results of the tasks in the tree and leave the data on remote processes rather than fetch it to the master process. Calling compute on a tree will also cause any subsequent requests to compute the same tasks to be served from a cache memory rather than recomputed.