Values in file trees

And how to work with them.

Files and subtrees in a FileTree can have any value attached to them (not necessarily those loaded from the file itself), you can map and reduce over these values using mapvalues and reducevalues, or combine them by merging with other trees or collapsing subtrees.

All these operations will be lazy if files are loaded lazily. Calling exec, compute or save on a lazy value or tree with lazy values will cause the dependent values to be loaded and computed.

Loading values

FileTrees.load can be used to load values into a tree.

using DataFrames, CSV, FileTrees

taxi_dir = FileTree("taxi-data")

dfs = FileTrees.load(taxi_dir) do file
    DataFrame(CSV.File(path(file)))
end
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (9×20 DataFrame)
│  │  └─ yellow.csv (9×18 DataFrame)
│  └─ 02/
│     ├─ green.csv (9×20 DataFrame)
│     └─ yellow.csv (9×18 DataFrame)
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (9×20 DataFrame)
   │  └─ yellow.csv (9×18 DataFrame)
   └─ 02/
      ├─ green.csv (9×20 DataFrame)
      └─ yellow.csv (9×18 DataFrame)

It's probably obvious, but you can load any file type and even values that have nothing to do with the file.

Loading can be made lazy using the lazy=true flag. More on laziness here.

lazy_dfs = FileTrees.load(taxi_dir; lazy=true) do file
    DataFrame(CSV.File(path(file)))
end
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (Thunk(#3, (File(taxi-data/2019/01/green.csv),)))
│  │  └─ yellow.csv (Thunk(#3, (File(taxi-data/2019/01/yellow.csv),)))
│  └─ 02/
│     ├─ green.csv (Thunk(#3, (File(taxi-data/2019/02/green.csv),)))
│     └─ yellow.csv (Thunk(#3, (File(taxi-data/2019/02/yellow.csv),)))
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (Thunk(#3, (File(taxi-data/2020/01/green.csv),)))
   │  └─ yellow.csv (Thunk(#3, (File(taxi-data/2020/01/yellow.csv),)))
   └─ 02/
      ├─ green.csv (Thunk(#3, (File(taxi-data/2020/02/green.csv),)))
      └─ yellow.csv (Thunk(#3, (File(taxi-data/2020/02/yellow.csv),)))

Below we will see the effect of each value-manipulating function when called with both lazy and non-lazy trees.

mapvalues

mapvalues(f, t::FileTree) applies f to every node in t which has a value loaded into it. It returns a new tree with the resultant values in place of the original ones.

Let's drop all but the first 5 columns of the dataframes we have loaded:

small_dfs = mapvalues(df->df[:, 1:5], dfs)
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (9×5 DataFrame)
│  │  └─ yellow.csv (9×5 DataFrame)
│  └─ 02/
│     ├─ green.csv (9×5 DataFrame)
│     └─ yellow.csv (9×5 DataFrame)
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (9×5 DataFrame)
   │  └─ yellow.csv (9×5 DataFrame)
   └─ 02/
      ├─ green.csv (9×5 DataFrame)
      └─ yellow.csv (9×5 DataFrame)

mapvalues on a lazy tree creates a lazy tree, where the values on exec will be the right computed values.

small_dfs_lazy = mapvalues(df->df[:, 1:5], lazy_dfs)
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (Thunk(#71, (Thunk(#3, ...),)))
│  │  └─ yellow.csv (Thunk(#71, (Thunk(#3, ...),)))
│  └─ 02/
│     ├─ green.csv (Thunk(#71, (Thunk(#3, ...),)))
│     └─ yellow.csv (Thunk(#71, (Thunk(#3, ...),)))
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (Thunk(#71, (Thunk(#3, ...),)))
   │  └─ yellow.csv (Thunk(#71, (Thunk(#3, ...),)))
   └─ 02/
      ├─ green.csv (Thunk(#71, (Thunk(#3, ...),)))
      └─ yellow.csv (Thunk(#71, (Thunk(#3, ...),)))

This map function should be instantaneous since it does not actually carry out the computation, instead returns a tree with lazy tasks that need to be carried out.

exec will materialize these values:

exec(small_dfs_lazy)
taxi-data/
├─ 2019/
│  ├─ 01/
│  │  ├─ green.csv (9×5 DataFrame)
│  │  └─ yellow.csv (9×5 DataFrame)
│  └─ 02/
│     ├─ green.csv (9×5 DataFrame)
│     └─ yellow.csv (9×5 DataFrame)
└─ 2020/
   ├─ 01/
   │  ├─ green.csv (9×5 DataFrame)
   │  └─ yellow.csv (9×5 DataFrame)
   └─ 02/
      ├─ green.csv (9×5 DataFrame)
      └─ yellow.csv (9×5 DataFrame)

reducevalues

reducevalues(f, t::FileTree) reduce all nodes in t into a single value by successively applying f. f is assumed to be associative and an ordering that is optimal for parallelism is chosen. If f is not associative, pass associative=false keyword argument.

first(reducevalues(vcat, small_dfs[r"yellow.csv$"]), 12)
12×5 DataFrame
│ Row │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │ passenger_count │ trip_distance │
│     │ Int64    │ String               │ String                │ Int64           │ Float64       │
├─────┼──────────┼──────────────────────┼───────────────────────┼─────────────────┼───────────────┤
│ 1   │ 1        │ 2019-01-01 00:46:40  │ 2019-01-01 00:53:20   │ 1               │ 1.5           │
│ 2   │ 1        │ 2019-01-01 00:59:47  │ 2019-01-01 01:18:59   │ 1               │ 2.6           │
│ 3   │ 2        │ 2018-12-21 13:48:30  │ 2018-12-21 13:52:40   │ 3               │ 0.0           │
│ 4   │ 2        │ 2018-11-28 15:52:25  │ 2018-11-28 15:55:45   │ 5               │ 0.0           │
│ 5   │ 2        │ 2018-11-28 15:56:57  │ 2018-11-28 15:58:33   │ 5               │ 0.0           │
│ 6   │ 2        │ 2018-11-28 16:25:49  │ 2018-11-28 16:28:26   │ 5               │ 0.0           │
│ 7   │ 2        │ 2018-11-28 16:29:37  │ 2018-11-28 16:33:43   │ 5               │ 0.0           │
│ 8   │ 1        │ 2019-01-01 00:21:28  │ 2019-01-01 00:28:37   │ 1               │ 1.3           │
│ 9   │ 1        │ 2019-01-01 00:32:01  │ 2019-01-01 00:45:39   │ 1               │ 3.7           │
│ 10  │ 1        │ 2019-02-01 00:59:04  │ 2019-02-01 01:07:27   │ 1               │ 2.1           │
│ 11  │ 1        │ 2019-02-01 00:33:09  │ 2019-02-01 01:03:58   │ 1               │ 9.8           │
│ 12  │ 1        │ 2019-02-01 00:09:03  │ 2019-02-01 00:09:16   │ 1               │ 0.0           │
reducevalues(vcat, small_dfs_lazy[r"yellow.csv$"])
Thunk(vcat, (Thunk(vcat, ...), Thunk(vcat, ...)))

This returned a delayed task that to compute the result. exec will compute it:

first(exec(reducevalues(vcat, small_dfs_lazy[r"yellow.csv$"])), 12)
12×5 DataFrame
│ Row │ VendorID │ tpep_pickup_datetime │ tpep_dropoff_datetime │ passenger_count │ trip_distance │
│     │ Int64    │ String               │ String                │ Int64           │ Float64       │
├─────┼──────────┼──────────────────────┼───────────────────────┼─────────────────┼───────────────┤
│ 1   │ 1        │ 2019-01-01 00:46:40  │ 2019-01-01 00:53:20   │ 1               │ 1.5           │
│ 2   │ 1        │ 2019-01-01 00:59:47  │ 2019-01-01 01:18:59   │ 1               │ 2.6           │
│ 3   │ 2        │ 2018-12-21 13:48:30  │ 2018-12-21 13:52:40   │ 3               │ 0.0           │
│ 4   │ 2        │ 2018-11-28 15:52:25  │ 2018-11-28 15:55:45   │ 5               │ 0.0           │
│ 5   │ 2        │ 2018-11-28 15:56:57  │ 2018-11-28 15:58:33   │ 5               │ 0.0           │
│ 6   │ 2        │ 2018-11-28 16:25:49  │ 2018-11-28 16:28:26   │ 5               │ 0.0           │
│ 7   │ 2        │ 2018-11-28 16:29:37  │ 2018-11-28 16:33:43   │ 5               │ 0.0           │
│ 8   │ 1        │ 2019-01-01 00:21:28  │ 2019-01-01 00:28:37   │ 1               │ 1.3           │
│ 9   │ 1        │ 2019-01-01 00:32:01  │ 2019-01-01 00:45:39   │ 1               │ 3.7           │
│ 10  │ 1        │ 2019-02-01 00:59:04  │ 2019-02-01 01:07:27   │ 1               │ 2.1           │
│ 11  │ 1        │ 2019-02-01 00:33:09  │ 2019-02-01 01:03:58   │ 1               │ 9.8           │
│ 12  │ 1        │ 2019-02-01 00:09:03  │ 2019-02-01 00:09:16   │ 1               │ 0.0           │

mapsubtrees + value operations

mapsubtrees is a powerful function since it allows you to recursively apply tree operations on subtrees of a tree.

See more about it here.