Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Caching using Parquet and pre-caching files #166

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions massql/msql_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

def main():
parser = argparse.ArgumentParser(description="MSQL CMD")
parser.add_argument('filename', help='Input filename')
parser.add_argument('query', help='Input Query')
parser.add_argument('filename', nargs='+', help='Input filename')
parser.add_argument('--query', default="QUERY scaninfo(MS2DATA)", help='Input Query')
parser.add_argument('--output_file', default=None, help='output results filename')
parser.add_argument('--parallel_query', default="NO", help='YES to make it parallel with ray locally, NO is default')
parser.add_argument('--cache', default="YES", help='YES to cache with feather, YES is the default')
parser.add_argument('--original_path', default=None, help='Original absolute path for the filename, useful in proteosafe')
parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file')
parser.add_argument('--extract_json', default=None, help='Extracting spectra found as json file')
parser.add_argument('--extract_mzML', default=None, help='Extracting spectra found as mzML file, not implemented yet')

args = parser.parse_args()

Expand All @@ -45,14 +45,16 @@ def main():

# Executing
all_results_list = []
for i, query in enumerate(all_queries):
results_df = msql_engine.process_query(query,
args.filename,
cache=(args.cache == "YES"),
parallel=PARALLEL)
for filename in args.filename:
for i, query in enumerate(all_queries):
results_df = msql_engine.process_query(query,
filename,
cache=(args.cache == "YES"),
parallel=PARALLEL)
results_df["filename"] = os.path.basename(filename)

results_df["query_index"] = i
all_results_list.append(results_df)
results_df["query_index"] = i
all_results_list.append(results_df)

# Merging
results_df = pd.concat(all_results_list)
Expand All @@ -70,7 +72,6 @@ def main():
pass

if args.output_file and len(results_df) > 0:
results_df["filename"] = os.path.basename(args.filename)

if args.original_path is not None:
useful_filename = args.original_path
Expand Down Expand Up @@ -113,13 +114,13 @@ def main():
else:
results_df.to_csv(args.output_file, index=False, columns=columns)

# Extracting
if args.extract_json is not None and len(results_df) > 0:
print("Extracting {} spectra".format(len(results_df)))
try:
msql_extract._extract_spectra(results_df, os.path.dirname(args.filename), output_json_filename=args.extract_json)
except:
print("Extraction Failed")
# Extracting
if args.extract_json is not None and len(results_df) > 0:
print("Extracting {} spectra".format(len(results_df)))
try:
msql_extract._extract_spectra(results_df, os.path.dirname(args.filename[0]), output_json_filename=args.extract_json)
except:
print("Extraction Failed")


if __name__ == "__main__":
Expand Down
36 changes: 33 additions & 3 deletions massql/msql_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,34 @@ def _extract_mgf_scan(input_filename, spectrum_identifier_list):

return output_list

def _extract_parquet_scan(input_filename, spectrum_identifier_list):
merged_df = pd.read_parquet(input_filename)

all_spectrum_identifier_set = set(spectrum_identifier_list)

filtered_df = merged_df[merged_df["scan"].isin(all_spectrum_identifier_set)]
grouped_df = filtered_df.groupby("scan")

output_list = []
for group, scan_df in grouped_df:
mz = scan_df["mz"].values
intensity = scan_df["i"].values

peaks_list = list(zip(mz, intensity))
peaks_list = [[float(peak[0]), float(peak[1])] for peak in peaks_list]

spectrum_obj = {}
spectrum_obj["peaks"] = peaks_list
spectrum_obj["mslevel"] = int(scan_df["mslevel"].iloc[0])
spectrum_obj["scan"] = str(scan_df["scan"].iloc[0])

if spectrum_obj["mslevel"] > 1:
spectrum_obj["precursor_mz"] = float(scan_df["precmz"].iloc[0])

output_list.append(spectrum_obj)

return output_list


def _extract_spectra(results_df, input_spectra_folder,
output_mgf_filename=None,
Expand All @@ -184,12 +212,14 @@ def _extract_spectra(results_df, input_spectra_folder,
input_spectra_filename = os.path.join(input_spectra_folder, results_by_file_df["filename"].iloc[0])

spectrum_obj_list = []
if ".mzML" in input_spectra_filename:
if input_spectra_filename.endswith(".mzML"):
spectrum_obj_list = _extract_mzML_scan(input_spectra_filename, list(set(results_by_file_df["scan"])))
if ".mzXML" in input_spectra_filename:
if input_spectra_filename.endswith(".mzXML"):
spectrum_obj_list = _extract_mzXML_scan(input_spectra_filename, list(set(results_by_file_df["scan"])))
if ".mgf" in input_spectra_filename:
if input_spectra_filename.endswith(".mgf"):
spectrum_obj_list = _extract_mgf_scan(input_spectra_filename, list(set(results_by_file_df["scan"])))
if input_spectra_filename.endswith(".parquet"):
spectrum_obj_list = _extract_parquet_scan(input_spectra_filename, list(set(results_by_file_df["scan"])))


for spectrum_obj in spectrum_obj_list:
Expand Down
43 changes: 15 additions & 28 deletions massql/msql_fileloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@ def load_data(input_filename, cache=False):
Returns:
[type]: [description]
"""
cache_filename = input_filename + ".msql.parquet"
if cache:
ms1_filename = input_filename + "_ms1.msql.feather"
ms2_filename = input_filename + "_ms2.msql.feather"
if os.path.exists(cache_filename):
cache_df = pd.read_parquet(cache_filename)

if os.path.exists(ms1_filename) or os.path.exists(ms2_filename):
try:
ms1_df = pd.read_feather(ms1_filename)
except:
ms1_df = pd.DataFrame()
try:
ms2_df = pd.read_feather(ms2_filename)
except:
ms2_df = pd.DataFrame()
ms1_df = cache_df[cache_df["mslevel"] == 1]
ms2_df = cache_df[cache_df["mslevel"] == 2]

return ms1_df, ms2_df

Expand All @@ -55,27 +49,22 @@ def load_data(input_filename, cache=False):

elif input_filename[-4:].lower() == ".txt":
ms1_df, ms2_df = _load_data_txt(input_filename)

elif input_filename.lower().endswith("parquet"):
merged_df = pd.read_parquet(input_filename)
ms1_df = merged_df[merged_df["mslevel"] == 1]
ms2_df = merged_df[merged_df["mslevel"] == 2]
cache = False
else:
print("Cannot Load File Extension")
raise Exception("File Format Not Supported")


# Saving Cache
if cache:
ms1_filename = input_filename + "_ms1.msql.feather"
ms2_filename = input_filename + "_ms2.msql.feather"

if not (os.path.exists(ms1_filename) or os.path.exists(ms2_filename)):
try:
ms1_df.to_feather(ms1_filename)
except:
pass

try:
ms2_df.to_feather(ms2_filename)
except:
pass
ms1_df["mslevel"] = 1
ms2_df["mslevel"] = 2

cache_df = pd.concat([ms1_df, ms2_df], axis=0)
cache_df.to_parquet(cache_filename)

return ms1_df, ms2_df

Expand Down Expand Up @@ -643,6 +632,4 @@ def _load_data_txt(input_filename):
ms1_df['rt'] = 0
ms1_df['polarity'] = "Positive"

print(ms1_df)

return ms1_df, pd.DataFrame()
6 changes: 6 additions & 0 deletions workflow/Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Workflow Tests

run_test:
nextflow run workflow.nf -c test.config

Expand All @@ -12,6 +14,10 @@ run_test_iron:
nextflow run workflow.nf -c test.config --input_spectra="./test/isa_9_fe.mzML" --query="QUERY scaninfo(MS1DATA) WHERE MS1MZ=X-2:INTENSITYMATCH=Y*0.063:INTENSITYMATCHPERCENT=25 AND MS1MZ=X:INTENSITYMATCH=Y:INTENSITYMATCHREFERENCE:INTENSITYPERCENT=5 AND MS2PREC=X" --resume


# Caching Tests
run_caching_test:
nextflow run workflow_caching.nf -c test.config

cluster_test:
nextflow run workflow.nf -c cluster.config \
--input_spectra="/data/massive/MSV000082622/ccms_peak/**.mzML" \
Expand Down
4 changes: 2 additions & 2 deletions workflow/workflow.nf
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ if(params.parallel_files == "YES"){
"""
$params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \
"$input_spectrum" \
"${params.query}" \
--query "${params.query}" \
--output_file "${mangled_output_filename}_output.tsv" \
--parallel_query $params.parallel_query \
--cache NO \
Expand Down Expand Up @@ -67,7 +67,7 @@ else{
"""
$params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \
"$input_spectrum" \
"${params.query}" \
--query "${params.query}" \
--output_file "${mangled_output_filename}_output.tsv" \
--parallel_query $params.parallel_query \
--cache NO \
Expand Down
34 changes: 34 additions & 0 deletions workflow/workflow_caching.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env nextflow

params.input_spectra = 'data/GNPS00002_A3_p.mzML'

_spectra_ch = Channel.fromPath( params.input_spectra )
_spectra_ch.into{_spectra_ch1;_spectra_ch2}

_spectra_ch3 = _spectra_ch1.map { file -> tuple(file, file.toString().replaceAll("/", "_").replaceAll(" ", "_"), file) }

TOOL_FOLDER = "$baseDir/bin"
params.publishdir = "nf_output"
params.PYTHONRUNTIME = "python" // this is a hack because CCMS cluster does not have python installed

process cacheData {
errorStrategy 'ignore'
time '4h'
//maxRetries 3
//memory { 6.GB * task.attempt }
memory { 12.GB }

publishDir "$params.publishdir/msql_cache", mode: 'copy'

input:
set val(filepath), val(mangled_output_filename), file(input_spectrum) from _spectra_ch3

output:
file "*parquet"

"""
$params.PYTHONRUNTIME $TOOL_FOLDER/msql_cmd.py \
"$input_spectrum"
mv ${input_spectrum}.msql.parquet ${mangled_output_filename}.msql.parquet
"""
}