From 2e9c585122bc990f3abefd6ab9d17c4d5a557b2e Mon Sep 17 00:00:00 2001 From: lujie Date: Fri, 15 Oct 2021 13:19:52 +0800 Subject: [PATCH] SAMZA-2702:fix resource leak due to Files.list and Files.line --- .../java/org/apache/samza/storage/StorageManagerUtil.java | 7 +++++-- .../org/apache/samza/sql/client/impl/SamzaExecutor.java | 5 +++-- .../main/java/org/apache/samza/sql/util/SqlFileParser.java | 5 +++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java index badeb28c00..59b322f5b9 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.samza.SamzaException; import org.apache.samza.checkpoint.CheckpointId; import org.apache.samza.checkpoint.CheckpointV2; @@ -379,11 +380,13 @@ public List getTaskStoreCheckpointDirs(File storeBaseDir, String storeName String taskStoreName = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode).getName(); if (storeDir.exists()) { // new store or no local state - List checkpointDirs = Files.list(storeDir.toPath()) + try (Stream stream = Files.list(storeDir.toPath())) { + List checkpointDirs = stream .map(Path::toFile) .filter(file -> file.getName().contains(taskStoreName + "-")) .collect(Collectors.toList()); - return checkpointDirs; + return checkpointDirs; + } } else { return Collections.emptyList(); } diff --git a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java index 3e9126dd39..42eb52732c 100755 --- a/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java +++ b/samza-sql-shell/src/main/java/org/apache/samza/sql/client/impl/SamzaExecutor.java @@ -64,6 +64,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.Stream; /** @@ -203,8 +204,8 @@ public List consumeQueryResult(ExecutionContext context, int startRow, public NonQueryResult executeNonQuery(ExecutionContext context, File sqlFile) throws ExecutorException { LOG.info("Sql file path: " + sqlFile.getPath()); List executedStmts; - try { - executedStmts = Files.lines(Paths.get(sqlFile.getPath())).collect(Collectors.toList()); + try (Stream stream = Files.lines(Paths.get(sqlFile.getPath()))) { + executedStmts = stream.collect(Collectors.toList()); } catch (IOException e) { throw new ExecutorException(e); } diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java index d68eba197c..4e03464351 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SqlFileParser.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; @@ -73,8 +74,8 @@ private SqlFileParser() { public static List parseSqlFile(String fileName) { Validate.notEmpty(fileName, "fileName cannot be empty."); List sqlLines; - try { - sqlLines = Files.lines(Paths.get(fileName)).collect(Collectors.toList()); + try (Stream stream = Files.lines(Paths.get(fileName))) { + sqlLines = stream.collect(Collectors.toList()); } catch (IOException e) { String msg = String.format("Unable to parse the sql file %s", fileName); LOG.error(msg, e);