diff --git a/cycledash/api/bams.py b/cycledash/api/bams.py index febb9ec..1009643 100644 --- a/cycledash/api/bams.py +++ b/cycledash/api/bams.py @@ -10,7 +10,6 @@ from cycledash import db from cycledash.helpers import abort_if_none_for from cycledash.validations import Doc -import workers.indexer import projects from . import Resource, marshal_with, validate_with diff --git a/workers/indexer.py b/workers/indexer.py deleted file mode 100644 index 81468c2..0000000 --- a/workers/indexer.py +++ /dev/null @@ -1,40 +0,0 @@ -"""Index BAM Index (BAI) files - -This accepts a BAM HDFS path. If there's no BAM Index Index (.bam.bai.json) -file already available, it will generate one and put it on HDFS. -""" - -import json - -import bai_indexer -from StringIO import StringIO - -from workers.shared import (get_contents_from_hdfs, worker, - put_new_file_to_hdfs, does_hdfs_file_exist, - HdfsFileAlreadyExistsError, register_running_task, - DATABASE_URI, initialize_database) - -@worker.task(bind=True) -def index(self, bam_id): - engine, connection, metadata = initialize_database(DATABASE_URI) - bams_table = metadata.tables.get('bams') - bam = bams_table.select().where(bams_table.c.id == bam_id).execute().fetchone() - bam_path = bam['uri'] - - if '.bam' not in bam_path: - raise ValueError('Expected path to BAM file, got %s' % bam_path) - - bai_path = bam_path.replace('.bam', '.bam.bai') - bai_json_path = bam_path.replace('.bam', '.bam.bai.json') - - if does_hdfs_file_exist(bai_json_path): - return # nothing to do -- it's already been created - - contents = get_contents_from_hdfs(bai_path) - index_json = bai_indexer.index_stream(StringIO(contents)) - index_json_str = json.dumps(index_json) - - try: - put_new_file_to_hdfs(bai_json_path, index_json_str) - except HdfsFileAlreadyExistsError: - pass # we lost the race! (e.g. two runs were submitted simultaneously)