library(conflicted)
library(tidyverse)
## ── Attaching core tidyverse packages ──────────────────────── tidyverse 2.0.0 ──
## ✔ dplyr 1.1.4 ✔ readr 2.1.5
## ✔ forcats 1.0.0 ✔ stringr 1.5.1
## ✔ ggplot2 3.5.0 ✔ tibble 3.2.1
## ✔ lubridate 1.9.3 ✔ tidyr 1.3.1
## ✔ purrr 1.0.2
conflicts_prefer(dplyr::filter)
## [conflicted] Will prefer dplyr::filter over any other package.
We want to keep track of the state of a table at different points in
time. The table has a primary key id
and a column x
that we want to
keep track of. The id
column is essential to identify rows across
different points in time, and the x
column is a proxy for arbitrary
payload data. In this example, V1
is the initial state of the table,
V2
is the state of the table after adding a row, V3
is the state of
the table after modifying a row, and V4
is the state of the table
after deleting a row.
V1 <- tibble(id = 1L, x = letters[1])
V1
## # A tibble: 1 × 2
## id x
## <int> <chr>
## 1 1 a
# Adding a row
V2 <- tibble(id = 1:2, x = letters[1:2])
V2
## # A tibble: 2 × 2
## id x
## <int> <chr>
## 1 1 a
## 2 2 b
# Modifying a row
V3 <- tibble(id = 1:2, x = letters[3:2])
V3
## # A tibble: 2 × 2
## id x
## <int> <chr>
## 1 1 c
## 2 2 b
# Deleting a row
V4 <- tibble(id = 2L, x = letters[2])
V4
## # A tibble: 1 × 2
## id x
## <int> <chr>
## 1 2 b
At each point in time, there is a table H
that contains the history of
the table V
at that point in time. The table H
has columns from
and to
that define the time interval for which the row is valid. The
table H
also contains the details from table V
at that point in
time.
H0 <- tibble(from = integer(), to = integer(), V1[integer(), ])
H1 <- tibble(from = 1L, to = NA_integer_, V1)
H2 <- tibble(from = 1:2, to = NA_integer_, V2)
H3 <- tibble(
from = 1:3,
to = c(3L, NA_integer_, NA_integer_),
bind_rows(V1[1, ], V3[2:1, ])
)
H4 <- tibble(
from = 1:3,
to = c(3L, NA_integer_, 4L),
bind_rows(V1[1, ], V3[2:1, ])
)
H4
is smaller than V1
, V2
, V3
, and V4
combined because we do
not store the same data multiple times:
nrow(H4)
## [1] 3
nrow(V1) + nrow(V2) + nrow(V3) + nrow(V4)
## [1] 6
With that, we can define a function at_time()
that takes a history
table and a point in time, and returns the observation table at that
point in time.
at_time <- function(V, time) {
V |>
filter(coalesce(from <= !!time, TRUE), coalesce(to > !!time, TRUE)) |>
select(-from, -to) |>
arrange(id)
}
H1 |>
at_time(1) |>
waldo::compare(V1)
## ✔ No differences
H2 |>
at_time(2) |>
waldo::compare(V2)
## ✔ No differences
H2 |>
at_time(1) |>
waldo::compare(V1)
## ✔ No differences
H3 |>
at_time(3) |>
waldo::compare(V3)
## ✔ No differences
H3 |>
at_time(2) |>
waldo::compare(V2)
## ✔ No differences
H3 |>
at_time(1) |>
waldo::compare(V1)
## ✔ No differences
H4 |>
at_time(4) |>
waldo::compare(V4)
## ✔ No differences
H4 |>
at_time(3) |>
waldo::compare(V3)
## ✔ No differences
H4 |>
at_time(2) |>
waldo::compare(V2)
## ✔ No differences
H4 |>
at_time(1) |>
waldo::compare(V1)
## ✔ No differences
The history tables can be decomposed into two tables: O
(observation)
and D
(difference). The observation table contains the details from
the history table at the point in time, and is identical to the data at
that point in time, save for the from
and to
columns. The difference
table contains the changes that happened compared to the prior point in
time.
Because we want to avoid storing the same data multiple times, we omit rows in a difference table that are identical to rows found in previous difference tables.
O1 <- H1
O1 |>
select(-from, -to) |>
waldo::compare(V1)
## ✔ No differences
D1 <- H2[0, ]
O2 <- H2
O2 |>
select(-from, -to) |>
waldo::compare(V2)
## ✔ No differences
D2 <- H3[1, ]
O3 <- H3[3:2, ]
O3 |>
select(-from, -to) |>
waldo::compare(V3)
## ✔ No differences
# This does not contain H4[1, ], on purpose:
D3 <- H4[3, ]
O4 <- H4[2, ]
O4 |>
select(-from, -to) |>
waldo::compare(V4)
## ✔ No differences
Binding the an observed table and past history tables give exactly the history table at that point in time.
bind_rows(O1) |>
arrange(from, id) |>
waldo::compare(H1)
## ✔ No differences
bind_rows(O2, D1) |>
arrange(from, id) |>
waldo::compare(H2)
## ✔ No differences
bind_rows(O3, D2, D1) |>
arrange(from, id) |>
waldo::compare(H3)
## ✔ No differences
bind_rows(O4, D3, D2, D1) |>
arrange(from, id) |>
waldo::compare(H4)
## ✔ No differences
Therefore, the at_time()
function also works when combining and
observation table with difference tables.
O4 |>
at_time(4)
## # A tibble: 1 × 2
## id x
## <int> <chr>
## 1 2 b
bind_rows(O4, D3) |>
at_time(3)
## # A tibble: 2 × 2
## id x
## <int> <chr>
## 1 1 c
## 2 2 b
bind_rows(O4, D3, D2) |>
at_time(2)
## # A tibble: 2 × 2
## id x
## <int> <chr>
## 1 1 a
## 2 2 b
bind_rows(O4, D3, D2, D1) |>
at_time(1)
## # A tibble: 1 × 2
## id x
## <int> <chr>
## 1 1 a
Because observation and difference tables are a superset of history tables, combining, e.g., one observation table and two difference tables allows reconstructing the original data for three points in time in the past.
bind_rows(O4, D3, D2, D1) |>
at_time(1) |>
waldo::compare(V1)
## ✔ No differences
bind_rows(O4, D3, D2, D1) |>
at_time(2) |>
waldo::compare(V2)
## ✔ No differences
bind_rows(O4, D3, D2) |>
at_time(2) |>
waldo::compare(V2)
## ✔ No differences
bind_rows(O4, D3, D2) |>
at_time(3) |>
waldo::compare(V3)
## ✔ No differences
bind_rows(O4, D3) |>
at_time(3) |>
waldo::compare(V3)
## ✔ No differences
bind_rows(O4, D3) |>
at_time(4) |>
waldo::compare(V4)
## ✔ No differences
How to construct O4
and D3
from O3
, D2
, D1
, and V4
? Same
question for constructing O3
and D2
from O2
, D1
, and V3
? Or
for constructing O2
and D1
from O1
and V2
? Or for the
initialization, constructing O1
from V1
?
We know that we can reconstruct the history table from the observation
and difference tables. This then boils down to the question of how to
construct O4
and D3
from O3
, H3
, and V4
.
O4
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 2 NA 2 b
D3
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 3 4 1 c
O3
## # A tibble: 2 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 3 NA 1 c
## 2 2 NA 2 b
H3
## # A tibble: 3 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 1 3 1 a
## 2 2 NA 2 b
## 3 3 NA 1 c
V4
## # A tibble: 1 × 2
## id x
## <int> <chr>
## 1 2 b
We know how to extract V3
from O3
:
O3 |>
at_time(3) |>
waldo::compare(V3)
## ✔ No differences
We then can compute the new or updated, and deleted rows. We also define
V0
and O0
as the empty tables.
V0 <- V1[0, ]
O0 <- O1[0, ]
compute_diff <- function(old, new, time) {
# Contains both new and updated rows
P <-
new |>
anti_join(old, by = names(new)) |>
mutate(from = as.integer(!!time), to = NA_integer_, .before = 1)
# The id values of the deleted rows
M <-
old |>
anti_join(new, by = "id") |>
select(id)
# The id values of the changed (new, updated, or deleted) rows
PM <-
P |>
select(id) |>
bind_rows(M)
list(P = P, M = M, PM = PM)
}
X4 <- compute_diff(H3, V4, 4)
X4
## $P
## # A tibble: 0 × 4
## # ℹ 4 variables: from <int>, to <int>, id <int>, x <chr>
##
## $M
## # A tibble: 2 × 1
## id
## <int>
## 1 1
## 2 1
##
## $PM
## # A tibble: 2 × 1
## id
## <int>
## 1 1
## 2 1
X3 <- compute_diff(H2, V3, 3)
X3
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 3 NA 1 c
##
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
##
## $PM
## # A tibble: 1 × 1
## id
## <int>
## 1 1
X2 <- compute_diff(H1, V2, 2)
X2
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 2 NA 2 b
##
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
##
## $PM
## # A tibble: 1 × 1
## id
## <int>
## 1 2
X1 <- compute_diff(H0, V1, 1)
X1
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 1 NA 1 a
##
## $M
## # A tibble: 0 × 1
## # ℹ 1 variable: id <int>
##
## $PM
## # A tibble: 1 × 1
## id
## <int>
## 1 1
The observation table is the same as the new table with from
and to
set to the relevant points in time. For new and updated rows, from
is
set to the current point in time; otherwise, the point in time from the
old observation table is used. The to
column is always set to missing.
Deleted rows must be removed from the observation table.
X4$P |>
bind_rows(O3) |>
distinct(id, .keep_all = TRUE) |>
anti_join(X4$M, by = "id") |>
arrange(id) |>
waldo::compare(O4)
## ✔ No differences
X3$P |>
bind_rows(O2) |>
distinct(id, .keep_all = TRUE) |>
anti_join(X3$M, by = "id") |>
arrange(id) |>
waldo::compare(O3)
## ✔ No differences
X2$P |>
bind_rows(O1) |>
distinct(id, .keep_all = TRUE) |>
anti_join(X2$M, by = "id") |>
arrange(id) |>
waldo::compare(O2)
## ✔ No differences
X1$P |>
bind_rows(O0) |>
distinct(id, .keep_all = TRUE) |>
anti_join(X1$M, by = "id") |>
arrange(id) |>
waldo::compare(O1)
## ✔ No differences
The new difference table is the history table with the changed rows and
to
set to the current point in time.
H3 |>
semi_join(X4$PM, by = "id") |>
filter(.by = id, row_number(from) == n()) |>
mutate(to = 4L) |>
waldo::compare(D3)
## ✔ No differences
H2 |>
semi_join(X3$PM, by = "id") |>
filter(.by = id, row_number(from) == n()) |>
mutate(to = 3L) |>
waldo::compare(D2)
## ✔ No differences
H1 |>
semi_join(X2$PM, by = "id") |>
filter(.by = id, row_number(from) == n()) |>
mutate(to = 2L) |>
waldo::compare(D1)
## ✔ No differences
The first observation table is the same as the first table with from
set to the first point in time and to
set to missing.
V1 |>
mutate(from = 1L, to = NA_integer_, .before = 1) |>
waldo::compare(O1)
## ✔ No differences
This defines a process for efficiently maintaining the observation and difference tables as new data arrives.
The approach above is useful if the data is stored in multiple flat
files. Given H3
and V4
, how to update H3
in the most efficient way
so that it becomes H4
? Can we use a variant of compute_diff()
and a
combination of rows_append()
, rows_update()
, rows_upsert()
and/or
rows_delete()
for this task?
compute_diff_history <- function(old, new, time) {
# Contains both new and updated rows
P <-
new |>
anti_join(old, by = names(new)) |>
mutate(from = as.integer(!!time), to = NA_integer_, .before = 1)
last <-
old |>
select(id, from) |>
arrange(from) |>
filter(.by = id, row_number(from) == n())
deleted <- anti_join(last, new, by = "id")
changed <- semi_join(last, P, by = "id")
# The id values of the rows to be patched to reflect deletion
C <-
bind_rows(deleted, changed) |>
mutate(to = as.integer(!!time))
list(P = P, C = C)
}
Y4 <- compute_diff_history(H3, V4, 4)
Y4
## $P
## # A tibble: 0 × 4
## # ℹ 4 variables: from <int>, to <int>, id <int>, x <chr>
##
## $C
## # A tibble: 1 × 3
## id from to
## <int> <int> <int>
## 1 1 3 4
H3 |>
rows_patch(Y4$C, by = c("id", "from")) |>
rows_append(Y4$P) |>
waldo::compare(H4)
## ✔ No differences
Y3 <- compute_diff_history(H2, V3, 3)
Y3
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 3 NA 1 c
##
## $C
## # A tibble: 1 × 3
## id from to
## <int> <int> <int>
## 1 1 1 3
H2 |>
rows_patch(Y3$C, by = c("id", "from")) |>
rows_append(Y3$P) |>
waldo::compare(H3)
## ✔ No differences
Y2 <- compute_diff_history(H1, V2, 2)
Y2
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 2 NA 2 b
##
## $C
## # A tibble: 0 × 3
## # ℹ 3 variables: id <int>, from <int>, to <int>
H1 |>
rows_patch(Y2$C, by = c("id", "from")) |>
rows_append(Y2$P) |>
waldo::compare(H2)
## ✔ No differences
Y1 <- compute_diff_history(H0, V1, 1)
Y1
## $P
## # A tibble: 1 × 4
## from to id x
## <int> <int> <int> <chr>
## 1 1 NA 1 a
##
## $C
## # A tibble: 0 × 3
## # ℹ 3 variables: id <int>, from <int>, to <int>
H0 |>
rows_patch(Y1$C, by = c("id", "from")) |>
rows_append(Y1$P) |>
waldo::compare(H1)
## ✔ No differences
Y0 <- compute_diff_history(H0, V0, 0)
Because rows_patch()
and rows_append()
work on data frames and
databases alike, and can persist the changes to a database with
in_place = TRUE
, the approach above defines a process for efficiently
maintaining the history table as new data arrives. Using a single
rows_upsert()
call is possible but worse because this would mean that
the payload would be overwritten for old rows.
The example above assumes that only few rows are changing for each data delivery. In real-world datasets, situations can occur where a few columns are changing for each data delivery across the entire dataset. In this case, no compression can be achieved by storing only the changed rows. A viable solution is to store the ever-changing columns in a separate table and join them with the history table when needed.
The naive approach to maintaining different versions of a table is to
store the entire table for each version (V#
in our example). This is
inefficient in terms of storage but offers the best performance for
querying.
A good compromise is to maintain a history or temporal table (H#
in
our example). This requires each row to be identified by a unique
identifier (the id
column). The id
column can be an integer, a GUID,
or any other unique identifier. Composite keys are also possible. A
temproal table contains two extra columns, from
and to
, that define
the time period during which a row is valid. These columns can be of any
ordered type, such as integers, dates, or timestamps. The at_time()
function provides a way to query such a table at a specific point in
time.
The maintenance of a temporal table as new data arrives is slightly
different depending on the storage medium because they have different
trade-offs. Flat files are easy to work with but require the entire
table to be read and written. To maintain efficiency, the history table
can be split into observation (O#
) and difference (D#
) tables. In
contrast, a database table can be changed in-place but requires the
changesets to be specified in bulk for efficiency.
For flat files, the compute_diff()
function provides a way to
efficiently maintain the observation and difference tables as new data
arrives. For each new data delivery, only the most recent observation
table must be replaced with a new difference table, and the new delivery
essentially becomes the new observation table.
The compute_diff_history()
function provides a way to efficiently
maintain a history table on a database new data arrives. It specifies
precisely the rows to be updated and appended. For updated rows, the
payload x
is never touched.