What are helpful optimizations in R for big data sets?
What best practices can I apply and, in particular, what can I do to make these types of functions optimized for large datasets?
use data.table
package
library(data.table)
d1 = as.data.table(dataframe)
d2 = as.data.table(dataframe_two)
1
grouping by many columns is something that data.table is excellent at
see barchart at the very bottom of the second plot for comparison against dplyr spark and others for exactly this kind of grouping
https://h2oai.github.io/db-benchmark
by_cols = paste("key", c("a","b","c","d","e","f","g","h","i"), sep="_")
a1 = d1[, .(min_date = min(date_sequence)), by=by_cols]
note I changed date
to date_sequence
, I think you meant that as a column name
2
it is unclear on what fields you want to merge tables, dataframe_two does not have specified fields so the query is invalid
please clarify
3
data.table has very useful type of join called rolling join, which does exactly what you need
a3 = d2[d1, on=c("key_a","date_sequence"), roll="nearest"]
# Error in vecseq(f__, len__, if (allow.cartesian || notjoin || #!anyDuplicated(f__, :
# Join results in more than 2^31 rows (internal vecseq reached #physical limit). Very likely misspecified join. Check for #duplicate key values in i each of which join to the same group in #x over and over again. If that's ok, try by=.EACHI to run j for #each group to avoid the large allocation. Otherwise, please search #for this error message in the FAQ, Wiki, Stack Overflow and #data.table issue tracker for advice.
It results an error. Error is in fact very useful. On your real data it may work perfectly fine, as the reason behind the error (cardinality of matching rows) may be related to process of generating sample data. It is very tricky to have good dummy data for joining.
If you are getting the same error on your real data you may want to review design of that query as it attempts to make row explosion by doing many-to-many join. Even after already considering only single date_sequence
identity (taking roll
into account). I don't see this kind of question to be valid for that data (cadrinalities of join fields strictly speaking). You may want to introduce data quality checks layer in your workflow to ensure there are no duplicates on key_a
and date_sequence
combined.
By default, R works with data in memory. When your data gets significantly larger R can throw out-of-memory errors, or depending on your setup use the pagefile (see here) but the pagefiles is slow as it involves reading and writing to disk.
1. batching
From just a computation perspective, you may find improvements by batching your processing. Your examples include summarising the dataset down, so presumably your summarised dataset is much smaller than your input (if not, it would be worth considering other approaches to producing the same final dataset). This means that you can batch by your grouping variables.
I often do this by taking modulo of a numeric index:
num_batches = 50
output = list()
for(i in 0:(num_batches-1)){
subset = df %>% filter(numeric_key %% num_batches == i)
this_summary = subset %>%
group_by(numeric_key, other_keys) %>%
summarise(result = min(col)
output[[i]] = this_summary
}
final_output = bind_rows(output)
You can develop a similar approach for text-based keys.
2. reduce data size
Storing text requires more memory than storing numeric data. An easy option here is to replace strings with numeric codes, or store strings as factors. This will use less memory, and hence the computer has less information to read when grouping/joining.
Note that depending on your version of R, stringsAsFactors
may default to TRUE
or FALSE
. So probably best to set it explicitly. (discussed here)
3. move to disk
Beyond some size it is worth having data on disk and letting R manage reading to and from disk. This is part of the idea behind several existing R packages including bigmemory, ff and ffbase, and a host of parallelisation packages.
Beyond just depending on R, you can push tasks to a database. While a database will never perform as quickly as in-memory data, they are designed for handling large quantities of data. PostgreSQL is free and open source (getting started guide here), and you can run this on the same machine as R - it does not have to be a dedicated server. R also has a package specifically for PostgreSQL (RPostgreSQL). There are also several other packages designed for working with databases including dbplyr, DBI, RODBC if you want other options for interacting with databases.
While there is some overhead setting up a database, dplyr and dbplyr will translate your R code into SQL for you, so you don't have to learn a new language. The downside is that you are limited to core dplyr commands as translations from R to SQL are only defined for the standard procedures.
Expanding on @jangorecki's answer.
Data:
library(lubridate)
library(dplyr)
library(conflicted)
library(data.table)
dataframe = data.frame(bigint,
date_sequence = date_sequence[1:(1*10^7)],
key_a = key_a[1:(1*10^7)],
key_b = key_b[1:(1*10^7)],
key_c = key_c[1:(1*10^7)],
key_d = key_d[1:(1*10^7)],
key_e = key_e[1:(1*10^7)],
key_f = key_f[1:(1*10^7)],
key_g = key_g[1:(1*10^7)],
key_h = key_h[1:(1*10^7)],
key_i = key_i[1:(1*10^7)])
dataframe_two = dataframe %>% mutate(date_sequence1 = ymd(date_sequence) + days(1))
dataframe_two$date_sequence = NULL
Benchmarks:
1.
dplyr result of 2 runs: 2.2639 secs; 2.2205 secsst = Sys.time()
a1 = dataframe %>%
group_by(key_a, key_b, key_c,
key_d, key_e, key_f,
key_g, key_h, key_i) %>%
summarize(min_date = min(date_sequence)) %>% ungroup()
Sys.time() - st
data.table results: 1.0987 secs; 0.9825 secs
setDT(dataframe)
by_cols = paste("key", c("a","b","c","d","e","f","g","h","i"), sep="_")
st = Sys.time()
a2 = dataframe[, .(min_date = min(date_sequence)), by=by_cols]
Sys.time() - st
2.
dplyr
setDF(dataframe)
st = Sys.time()
df3 = merge(dataframe,
dataframe_two,
by = c("key_a", "key_b", "key_c",
"key_d", "key_e", "key_f",
"key_g", "key_h", "key_i"),
all.x = T) %>% as_tibble()
Sys.time() - st
# Error in merge.data.frame(dataframe, dataframe_two, by = c("key_a", "key_b", :
# negative length vectors are not allowed
data.table
setDT(dataframe)
setDT(dataframe_two)
st = Sys.time()
df3 = merge(dataframe,
dataframe_two,
by = c("key_a", "key_b", "key_c",
"key_d", "key_e", "key_f",
"key_g", "key_h", "key_i"),
all.x = T)
Sys.time() - st
# Error in vecseq(f__, len__, if (allow.cartesian || notjoin || !anyDuplicated(f__, # :
# Join results in more than 2^31 rows (internal vecseq reached physical limit).
# Very likely misspecified join. Check for duplicate key values in i each of which
# join to the same group in x over and over again. If that's ok, try by=.EACHI to
# run j for each group to avoid the large allocation. Otherwise, please search for
# this error message in the FAQ, Wiki, Stack Overflow and data.table issue tracker
# for advice.
This error is helpful and running the following:
uniqueN(dataframe_two, by = c("key_a", "key_b", "key_c",
"key_d", "key_e", "key_f",
"key_g", "key_h", "key_i"))
gives
12
When I am working with datasets containing about 10 million rows and 15 columns, I convert strings to factors before merging and have seen performance gains from approx. 30 secs to 10 secs for an inner join. To my surprise, setkey() was not as effective as converting strings to factors in that particular case.
EDIT: Reproducible example of data.table merge in 3 flavours (on character column, setkey, strings to factors)
Create tables:
x = 1e6
ids = x:(2*x-1)
chrs = rep(LETTERS[1:10], x)
quant_1 = sample(ids, x, replace = T)
quant_2 = sample(ids, x, replace = T)
ids_c = paste0(chrs, as.character(ids))
dt1 = data.table(unique(ids_c), quant_1)
dt2 = data.table(unique(ids_c), quant_2)
(i) on character column
system.time({result_chr = merge(dt1, dt2, by = 'V1')})
# user system elapsed
# 10.66 5.18 18.64
(ii) using setkey
system.time(setkey(dt1, V1))
# user system elapsed
# 3.37 1.55 5.66
system.time(setkey(dt2, V1))
# user system elapsed
# 3.42 1.67 5.85
system.time({result_setkey = merge(dt1, dt2, by = 'V1')})
# user system elapsed
# 0.17 0.00 0.16
(iii) strings to factors
dt3 = data.table(unique(ids_c), quant_1)
dt4 = data.table(unique(ids_c), quant_2)
system.time({dt3[, V1 := as.factor(V1)]})
# user system elapsed
# 8.16 0.00 8.20
system.time({dt4[, V1 := as.factor(V1)]})
# user system elapsed
# 8.04 0.00 8.06
system.time({result_fac = merge(dt3, dt4, by = 'V1')})
# user system elapsed
# 0.32 0.01 0.28
In this case, setkey is overall the fastest with total 11.67 seconds. However, if data is ingested with strings to factors as true then no need to use setkey.
Example 2: In case your data comes in one file with rows separated by an attribute, for example date, and you need to separate them first, then do a join.
Data:
dt5 = data.table(date = '202009', id = unique(ids_c), quant = quant_1)
dt6 = data.table(date = '202010', id = unique(ids_c), quant = quant_2)
# Original data comes combined
dt = rbindlist(list(dt5, dt6))
(i) setkey
system.time(setkey(dt, id))
# user system elapsed
# 5.78 3.39 10.78
dt5 = dt[date == '202009']
dt6 = dt[date == '202010']
system.time({result_setkey = merge(dt5, dt6, by = 'id')})
# user system elapsed
# 0.17 0.00 0.17
(ii) strings as factors
dt5 = data.table(date = '202009', id = unique(ids_c), quant = quant_1)
dt6 = data.table(date = '202010', id = unique(ids_c), quant = quant_2)
dt = rbindlist(list(dt5, dt6))
system.time({dt[, id := as.factor(id)]})
# user system elapsed
# 8.17 0.00 8.20
dt5 = dt[date == '202009']
dt6 = dt[date == '202010']
system.time({result_fac = merge(dt5, dt6, by = 'id')})
# user system elapsed
# 0.34 0.00 0.33
In this case, strings to factors is faster at 8.53 seconds vs 10.95. However, when shuffling the keys before creating the tables ids_c = sample(ids_c, replace = F)
, setkey performs 2x faster.
Also, note that not every function in data.table is faster than combination of base functions. For example:
# data.table
system.time(uniqueN(ids_c))
# user system elapsed
# 10.63 4.21 16.88
# base R
system.time(length(unique(ids_c)))
# user system elapsed
# 0.78 0.08 0.94
Important to note that uniqueN() consumes 4x less memory, so would be better to use if RAM size is a constraint. I've used profvis package for this flame graph (from a different run than above):
Finally, if working with datasets larger than RAM, have a look at disk.frame.