From fc0d43093b2d5347693e1fd7b1638fa72d883aa3 Mon Sep 17 00:00:00 2001 From: mihran113 Date: Tue, 14 Feb 2023 04:30:00 +0400 Subject: [PATCH 1/3] Add `excluded-artifacts` option to exclude artifacts based on glob expression --- aimlflow/cli.py | 7 ++++--- aimlflow/utils.py | 14 +++++++++++--- aimlflow/watcher.py | 4 +++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/aimlflow/cli.py b/aimlflow/cli.py index 413fb8f..45f5138 100644 --- a/aimlflow/cli.py +++ b/aimlflow/cli.py @@ -25,7 +25,8 @@ def cli_entry_point(): writable=True)) @click.option('--mlflow-tracking-uri', required=False, default=None) @click.option('--experiment', '-e', required=False, default=None) -def sync(aim_repo, mlflow_tracking_uri, experiment): +@click.option('--excluded-artifacts', required=False, default=None) +def sync(aim_repo, mlflow_tracking_uri, experiment, excluded_artifacts): repo_path = clean_repo_path(aim_repo) or Repo.default_repo_path() repo_inst = Repo.from_path(repo_path) @@ -33,10 +34,10 @@ def sync(aim_repo, mlflow_tracking_uri, experiment): if not mlflow_tracking_uri: raise ClickException('MLFlow tracking URI must be provided either through ENV or CLI.') - watcher = MLFlowWatcher(repo_inst, mlflow_tracking_uri, experiment) + watcher = MLFlowWatcher(repo_inst, mlflow_tracking_uri, experiment, excluded_artifacts) click.echo('Converting existing MLflow logs.') - convert_existing_logs(repo_inst, mlflow_tracking_uri, experiment) + convert_existing_logs(repo_inst, mlflow_tracking_uri, experiment, excluded_artifacts) click.echo(f'Starting watcher on {mlflow_tracking_uri}.') watcher.start() diff --git a/aimlflow/utils.py b/aimlflow/utils.py index 777ecac..e10e148 100644 --- a/aimlflow/utils.py +++ b/aimlflow/utils.py @@ -1,3 +1,4 @@ +import fnmatch import click import mlflow import json @@ -114,7 +115,10 @@ def collect_run_params(aim_run, mlflow_run): } -def collect_artifacts(aim_run, mlflow_run, mlflow_client): +def collect_artifacts(aim_run, mlflow_run, mlflow_client, excluded_artifacts): + if excluded_artifacts == '*': + return + run_id = mlflow_run.info.run_id artifacts_cache_key = '_mlflow_artifacts_cache' @@ -136,6 +140,10 @@ def collect_artifacts(aim_run, mlflow_run, mlflow_client): continue else: artifacts_cache.append(file_info.path) + + if fnmatch.fnmatch(file_info.path, excluded_artifacts): + continue + downloaded_path = mlflow_client.download_artifacts(run_id, file_info.path, dst_path=temp_path) if file_info.path.endswith(HTML_EXTENSIONS): if not __html_warning_issued: @@ -196,7 +204,7 @@ def collect_metrics(aim_run, mlflow_run, mlflow_client, timestamp=None): aim_run.track(m.value, step=m.step, name=m.key) -def convert_existing_logs(repo_inst, tracking_uri, experiment=None, no_cache=False): +def convert_existing_logs(repo_inst, tracking_uri, experiment=None, excluded_artifacts=None, no_cache=False): client = mlflow.tracking.client.MlflowClient(tracking_uri=tracking_uri) experiments = get_mlflow_experiments(client, experiment) @@ -215,7 +223,7 @@ def convert_existing_logs(repo_inst, tracking_uri, experiment=None, no_cache=Fal collect_metrics(aim_run, run, client) # Collect artifacts - collect_artifacts(aim_run, run, client) + collect_artifacts(aim_run, run, client, excluded_artifacts) run_cache.refresh() diff --git a/aimlflow/watcher.py b/aimlflow/watcher.py index 7ba6226..2b887a2 100644 --- a/aimlflow/watcher.py +++ b/aimlflow/watcher.py @@ -28,6 +28,7 @@ def __init__(self, repo: 'Repo', tracking_uri: str, experiment: str = None, + excluded_artifacts: str = None, interval: Union[int, float] = WATCH_INTERVAL_DEFAULT, ): @@ -38,6 +39,7 @@ def __init__(self, self._client = MlflowClient(tracking_uri) self._experiments = get_mlflow_experiments(self._client, experiment) + self._excluded_artifacts = excluded_artifacts self._repo = repo self._th_collector = Thread(target=self._watch, daemon=True) @@ -77,7 +79,7 @@ def _process_single_run(self, aim_run, mlflow_run): collect_metrics(aim_run, mlflow_run, self._client, timestamp=self._last_watch_time) # Collect artifacts - collect_artifacts(aim_run, mlflow_run, self._client) + collect_artifacts(aim_run, mlflow_run, self._client, self._excluded_artifacts) def _process_runs(self): watch_started_time = time.time() From 5c87d2b6445d468016eeda964f3ab88b41921ced Mon Sep 17 00:00:00 2001 From: mihran113 Date: Mon, 20 Feb 2023 21:54:44 +0400 Subject: [PATCH 2/3] Add support for multiple patterns for `--exclude-artifacts` --- CHANGELOG.md | 1 + aimlflow/cli.py | 9 +++++---- aimlflow/utils.py | 14 ++++++++++---- aimlflow/watcher.py | 6 +++--- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9527dd8..850bda3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Set mlflow experiment name as `aim.Run`'s experiment and parse the mlflow run name (mihran113) - Try to parse "string-ified" params values (sirykd) +- Add "--exclude-artifacts" option to ## 0.1.1 - Fix MLFlow active runs' query (mihran113) diff --git a/aimlflow/cli.py b/aimlflow/cli.py index 45f5138..4a612b2 100644 --- a/aimlflow/cli.py +++ b/aimlflow/cli.py @@ -25,8 +25,9 @@ def cli_entry_point(): writable=True)) @click.option('--mlflow-tracking-uri', required=False, default=None) @click.option('--experiment', '-e', required=False, default=None) -@click.option('--excluded-artifacts', required=False, default=None) -def sync(aim_repo, mlflow_tracking_uri, experiment, excluded_artifacts): +@click.option('--exclude-artifacts', multiple=True, required=False) +def sync(aim_repo, mlflow_tracking_uri, experiment, exclude_artifacts): + repo_path = clean_repo_path(aim_repo) or Repo.default_repo_path() repo_inst = Repo.from_path(repo_path) @@ -34,10 +35,10 @@ def sync(aim_repo, mlflow_tracking_uri, experiment, excluded_artifacts): if not mlflow_tracking_uri: raise ClickException('MLFlow tracking URI must be provided either through ENV or CLI.') - watcher = MLFlowWatcher(repo_inst, mlflow_tracking_uri, experiment, excluded_artifacts) + watcher = MLFlowWatcher(repo_inst, mlflow_tracking_uri, experiment, exclude_artifacts) click.echo('Converting existing MLflow logs.') - convert_existing_logs(repo_inst, mlflow_tracking_uri, experiment, excluded_artifacts) + convert_existing_logs(repo_inst, mlflow_tracking_uri, experiment, exclude_artifacts) click.echo(f'Starting watcher on {mlflow_tracking_uri}.') watcher.start() diff --git a/aimlflow/utils.py b/aimlflow/utils.py index 7b5fa2b..61196bf 100644 --- a/aimlflow/utils.py +++ b/aimlflow/utils.py @@ -118,8 +118,8 @@ def collect_run_params(aim_run, mlflow_run): } -def collect_artifacts(aim_run, mlflow_run, mlflow_client, excluded_artifacts): - if excluded_artifacts == '*': +def collect_artifacts(aim_run, mlflow_run, mlflow_client, exclude_artifacts): + if '*' in exclude_artifacts: return run_id = mlflow_run.info.run_id @@ -144,8 +144,14 @@ def collect_artifacts(aim_run, mlflow_run, mlflow_client, excluded_artifacts): else: artifacts_cache.append(file_info.path) - if fnmatch.fnmatch(file_info.path, excluded_artifacts): - continue + if exclude_artifacts: + exclude = False + for expr in exclude_artifacts: + if fnmatch.fnmatch(file_info.path, expr): + exclude = True + break + if exclude: + continue downloaded_path = mlflow_client.download_artifacts(run_id, file_info.path, dst_path=temp_path) if file_info.path.endswith(HTML_EXTENSIONS): diff --git a/aimlflow/watcher.py b/aimlflow/watcher.py index 43dcbf9..53e8147 100644 --- a/aimlflow/watcher.py +++ b/aimlflow/watcher.py @@ -28,7 +28,7 @@ def __init__(self, repo: 'Repo', tracking_uri: str, experiment: str = None, - excluded_artifacts: str = None, + exclude_artifacts: str = None, interval: Union[int, float] = WATCH_INTERVAL_DEFAULT, ): @@ -39,7 +39,7 @@ def __init__(self, self._client = MlflowClient(tracking_uri) - self._excluded_artifacts = excluded_artifacts + self._exclude_artifacts = exclude_artifacts self._experiment = experiment self._experiments = get_mlflow_experiments(self._client, self._experiment) self._repo = repo @@ -84,7 +84,7 @@ def _process_single_run(self, aim_run, mlflow_run): collect_metrics(aim_run, mlflow_run, self._client, timestamp=self._last_watch_time) # Collect artifacts - collect_artifacts(aim_run, mlflow_run, self._client, self._excluded_artifacts) + collect_artifacts(aim_run, mlflow_run, self._client, self._exclude_artifacts) def _process_runs(self): watch_started_time = time.time() From 3d2619dbc4f9d9057d81d5c78f4930fce00d3153 Mon Sep 17 00:00:00 2001 From: mihran113 Date: Mon, 20 Feb 2023 21:56:27 +0400 Subject: [PATCH 3/3] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 850bda3..5349bce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,9 @@ # Changelog ## Unreleased +- Add "--exclude-artifacts" option to sync command (mihran113) - Set mlflow experiment name as `aim.Run`'s experiment and parse the mlflow run name (mihran113) - Try to parse "string-ified" params values (sirykd) -- Add "--exclude-artifacts" option to ## 0.1.1 - Fix MLFlow active runs' query (mihran113)