Skip to content

Commit

Permalink
2024 bug fixes (#206)
Browse files Browse the repository at this point in the history
* Add support for Oracle "start" keyword, Fixes #205
* Add error handling for invalid file paths, Fixes #203
  • Loading branch information
burrowse authored Dec 13, 2024
1 parent e59d1c7 commit a0d36ce
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 199 deletions.
143 changes: 75 additions & 68 deletions R/LoadSyntheaTables.r
Original file line number Diff line number Diff line change
Expand Up @@ -33,80 +33,87 @@ LoadSyntheaTables <-
syntheaFileLoc,
bulkLoad = FALSE)
{
csvList <- list.files(syntheaFileLoc, pattern = "*.csv")

conn <- DatabaseConnector::connect(connectionDetails)
if (file.exists(syntheaFileLoc)) {
csvList <- list.files(syntheaFileLoc, pattern = "*.csv")

for (csv in csvList) {
syntheaTable <-
data.table::fread(
file = paste0(syntheaFileLoc, "/", csv),
stringsAsFactors = FALSE,
header = TRUE,
sep = ",",
na.strings = ""
)
conn <- DatabaseConnector::connect(connectionDetails)

writeLines(paste0("Loading: ", csv))
for (csv in csvList) {
syntheaTable <-
data.table::fread(
file = paste0(syntheaFileLoc, "/", csv),
stringsAsFactors = FALSE,
header = TRUE,
sep = ",",
na.strings = ""
)

# experiencing type conversion errors and need to explicitly case some columns
if ("START" %in% colnames(syntheaTable)) {
syntheaTable$START <-
as.Date(syntheaTable$START, format = "%Y-%m-%d")
}
if ("STOP" %in% colnames(syntheaTable)) {
syntheaTable$STOP <-
as.Date(syntheaTable$STOP, format = "%Y-%m-%d")
}
if ("DATE" %in% colnames(syntheaTable)) {
syntheaTable$DATE <-
as.Date(syntheaTable$DATE, format = "%Y-%m-%d")
}
if ("START_DATE" %in% colnames(syntheaTable)) {
syntheaTable$START_DATE <-
as.Date(syntheaTable$START_DATE, format = "%Y-%m-%d")
}
if ("END_DATE" %in% colnames(syntheaTable)) {
syntheaTable$END_DATE <-
as.Date(syntheaTable$END_DATE, format = "%Y-%m-%d")
}
if ("BIRTHDATE" %in% colnames(syntheaTable)) {
syntheaTable$BIRTHDATE <-
as.Date(syntheaTable$BIRTHDATE, format = "%Y-%m-%d")
}
if ("DEATHDATE" %in% colnames(syntheaTable)) {
syntheaTable$DEATHDATE <-
as.Date(syntheaTable$DEATHDATE, format = "%Y-%m-%d")
}
if ("CODE" %in% colnames(syntheaTable)) {
syntheaTable$CODE <- as.character(syntheaTable$CODE)
}
if ("REASONCODE" %in% colnames(syntheaTable)) {
syntheaTable$REASONCODE <-
as.character(syntheaTable$REASONCODE)
}
if ("PHONE" %in% colnames(syntheaTable)) {
syntheaTable$PHONE <-
as.character(syntheaTable$PHONE)
}
if ("UTILIZATION" %in% colnames(syntheaTable)) {
syntheaTable$UTILIZATION <-
as.numeric(syntheaTable$UTILIZATION)
writeLines(paste0("Loading: ", csv))

# experiencing type conversion errors and need to explicitly case some columns
if ("START" %in% colnames(syntheaTable)) {
syntheaTable$START <-
as.Date(syntheaTable$START, format = "%Y-%m-%d")
}
if ("STOP" %in% colnames(syntheaTable)) {
syntheaTable$STOP <-
as.Date(syntheaTable$STOP, format = "%Y-%m-%d")
}
if ("DATE" %in% colnames(syntheaTable)) {
syntheaTable$DATE <-
as.Date(syntheaTable$DATE, format = "%Y-%m-%d")
}
if ("START_DATE" %in% colnames(syntheaTable)) {
syntheaTable$START_DATE <-
as.Date(syntheaTable$START_DATE, format = "%Y-%m-%d")
}
if ("END_DATE" %in% colnames(syntheaTable)) {
syntheaTable$END_DATE <-
as.Date(syntheaTable$END_DATE, format = "%Y-%m-%d")
}
if ("BIRTHDATE" %in% colnames(syntheaTable)) {
syntheaTable$BIRTHDATE <-
as.Date(syntheaTable$BIRTHDATE, format = "%Y-%m-%d")
}
if ("DEATHDATE" %in% colnames(syntheaTable)) {
syntheaTable$DEATHDATE <-
as.Date(syntheaTable$DEATHDATE, format = "%Y-%m-%d")
}
if ("CODE" %in% colnames(syntheaTable)) {
syntheaTable$CODE <- as.character(syntheaTable$CODE)
}
if ("REASONCODE" %in% colnames(syntheaTable)) {
syntheaTable$REASONCODE <-
as.character(syntheaTable$REASONCODE)
}
if ("PHONE" %in% colnames(syntheaTable)) {
syntheaTable$PHONE <-
as.character(syntheaTable$PHONE)
}
if ("UTILIZATION" %in% colnames(syntheaTable)) {
syntheaTable$UTILIZATION <-
as.numeric(syntheaTable$UTILIZATION)
}

suppressWarnings({
DatabaseConnector::insertTable(
conn,
tableName = paste0(syntheaSchema, ".", strsplit(csv, "[.]")[[1]][1]),
data = as.data.frame(syntheaTable),
dropTableIfExists = FALSE,
createTable = FALSE,
bulkLoad = bulkLoad,
progressBar = TRUE
)
})
}

suppressWarnings({
DatabaseConnector::insertTable(
conn,
tableName = paste0(syntheaSchema, ".", strsplit(csv, "[.]")[[1]][1]),
data = as.data.frame(syntheaTable),
dropTableIfExists = FALSE,
createTable = FALSE,
bulkLoad = bulkLoad,
progressBar = TRUE
)
})
on.exit(DatabaseConnector::disconnect(conn))
}else {
stop(
paste0("Synthea File Location specified is invalid: ", syntheaFileLoc, ". Please provide a valid fully qualified (absolute) path to the directory.")
)
}

on.exit(DatabaseConnector::disconnect(conn))

}
192 changes: 100 additions & 92 deletions R/LoadVocabFromCsv.r
Original file line number Diff line number Diff line change
Expand Up @@ -37,111 +37,119 @@ LoadVocabFromCsv <-
"drug_strength.csv"
)

fileList <- list.files(vocabFileLoc)
if (file.exists(vocabFileLoc)) {

fileList <- fileList[which(tolower(fileList) %in% csvList)]
fileList <- list.files(vocabFileLoc)

conn <- DatabaseConnector::connect(connectionDetails)
fileList <- fileList[which(tolower(fileList) %in% csvList)]

for (csv in fileList) {
writeLines(paste0("Working on file ", paste0(vocabFileLoc, "/", csv)))
conn <- DatabaseConnector::connect(connectionDetails)

writeLines(" - reading file ")
vocabTable <-
data.table::fread(
file = paste0(vocabFileLoc, "/", csv),
stringsAsFactors = FALSE,
header = TRUE,
sep = delimiter,
na.strings = ""
)

if (tolower(csv) == "concept.csv" || tolower(csv) == "concept_relationship.csv" || tolower(csv) == "drug_strength.csv") {
writeLines(" - handling dates")
vocabTable$valid_start_date <-
as.Date(as.character(vocabTable$valid_start_date), "%Y%m%d")
vocabTable$valid_end_date <-
as.Date(as.character(vocabTable$valid_end_date), "%Y%m%d")
vocabTable <- dplyr::tibble(vocabTable)
}
for (csv in fileList) {
writeLines(paste0("Working on file ", paste0(vocabFileLoc, "/", csv)))

writeLines(" - type converting")
vocabTable <- readr::type_convert(df = vocabTable,
col_types = readr::cols(),
na = c("")) %>%
dplyr::tibble()

if (tolower(csv) == "drug_strength.csv") {
vocabTable <- vocabTable %>%
mutate_at(
vars(
"amount_value",
"amount_unit_concept_id",
"numerator_value",
"numerator_unit_concept_id",
"denominator_value",
"denominator_unit_concept_id",
"box_size"
),
~ replace(., is.na(.), 0)
writeLines(" - reading file ")
vocabTable <-
data.table::fread(
file = paste0(vocabFileLoc, "/", csv),
stringsAsFactors = FALSE,
header = TRUE,
sep = delimiter,
na.strings = ""
)
}

chunkSize <- 1e7
numberOfRowsInVocabTable <- nrow(vocabTable)
numberOfChunks <-
ceiling(x = numberOfRowsInVocabTable / chunkSize)

writeLines(
paste0(
" - uploading ",
numberOfRowsInVocabTable,
" rows of data in ",
numberOfChunks,
" chunks."
)
)
if (tolower(csv) == "concept.csv" || tolower(csv) == "concept_relationship.csv" || tolower(csv) == "drug_strength.csv") {
writeLines(" - handling dates")
vocabTable$valid_start_date <-
as.Date(as.character(vocabTable$valid_start_date), "%Y%m%d")
vocabTable$valid_end_date <-
as.Date(as.character(vocabTable$valid_end_date), "%Y%m%d")
vocabTable <- dplyr::tibble(vocabTable)
}

sql <-
"DELETE FROM @table_name;"
DatabaseConnector::renderTranslateExecuteSql(
connection = conn,
sql = sql,
table_name = paste0(cdmSchema, ".", strsplit(csv, "[.]")[[1]][1])
)
writeLines(" - type converting")
vocabTable <- readr::type_convert(df = vocabTable,
col_types = readr::cols(),
na = c("")) %>%
dplyr::tibble()

startRow <- 1
for (j in (1:numberOfChunks)) {
if (numberOfRowsInVocabTable >= startRow) {
maxRows <- min(numberOfRowsInVocabTable,
startRow + chunkSize)
chunk <- vocabTable[startRow:maxRows, ]
writeLines(
paste0(
" - chunk uploading started on ",
Sys.time(),
" for rows ",
startRow,
" to ",
maxRows
if (tolower(csv) == "drug_strength.csv") {
vocabTable <- vocabTable %>%
mutate_at(
vars(
"amount_value",
"amount_unit_concept_id",
"numerator_value",
"numerator_unit_concept_id",
"denominator_value",
"denominator_unit_concept_id",
"box_size"
),
~ replace(., is.na(.), 0)
)
}

chunkSize <- 1e7
numberOfRowsInVocabTable <- nrow(vocabTable)
numberOfChunks <-
ceiling(x = numberOfRowsInVocabTable / chunkSize)

writeLines(
paste0(
" - uploading ",
numberOfRowsInVocabTable,
" rows of data in ",
numberOfChunks,
" chunks."
)
suppressWarnings({
DatabaseConnector::insertTable(
connection = conn,
tableName = paste0(cdmSchema, ".", strsplit(csv, "[.]")[[1]][1]),
data = chunk,
dropTableIfExists = FALSE,
createTable = FALSE,
bulkLoad = bulkLoad,
progressBar = TRUE
)

sql <-
"DELETE FROM @table_name;"
DatabaseConnector::renderTranslateExecuteSql(
connection = conn,
sql = sql,
table_name = paste0(cdmSchema, ".", strsplit(csv, "[.]")[[1]][1])
)

startRow <- 1
for (j in (1:numberOfChunks)) {
if (numberOfRowsInVocabTable >= startRow) {
maxRows <- min(numberOfRowsInVocabTable,
startRow + chunkSize)
chunk <- vocabTable[startRow:maxRows, ]
writeLines(
paste0(
" - chunk uploading started on ",
Sys.time(),
" for rows ",
startRow,
" to ",
maxRows
)
)
})
startRow <- maxRows + 1
suppressWarnings({
DatabaseConnector::insertTable(
connection = conn,
tableName = paste0(cdmSchema, ".", strsplit(csv, "[.]")[[1]][1]),
data = chunk,
dropTableIfExists = FALSE,
createTable = FALSE,
bulkLoad = bulkLoad,
progressBar = TRUE
)
})
startRow <- maxRows + 1
}
}
writeLines(" - Success")
}
writeLines(" - Success")
}

on.exit(DatabaseConnector::disconnect(conn))
on.exit(DatabaseConnector::disconnect(conn))
}
else {
stop(
paste0("Vocabulary File Location specified is invalid: ", vocabFileLoc, ". Please provide a valid fully qualified (absolute) path to the directory.")
)
}
}
Loading

0 comments on commit a0d36ce

Please sign in to comment.