-
Notifications
You must be signed in to change notification settings - Fork 36
/
metamanager.py
302 lines (250 loc) · 10.4 KB
/
metamanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# *****************************************************************************
#
# Copyright (c) 2019, the jupyter-fs authors.
#
# This file is part of the jupyter-fs library, distributed under the terms of
# the Apache License 2.0. The full license can be found in the LICENSE file.
#
from hashlib import md5
import json
import re
from traitlets import default
from tornado import web
from fs.errors import FSError
from fs.opener.errors import OpenerError, ParseError
from jupyter_server.base.handlers import APIHandler
from jupyter_server.services.contents.manager import AsyncContentsManager
from .auth import substituteAsk, substituteEnv, substituteNone
from .config import JupyterFs as JupyterFsConfig
from .fsmanager import FSManager
from .pathutils import (
path_first_arg,
path_second_arg,
path_kwarg,
path_old_new,
getDrive,
isDrive,
stripDrive,
)
__all__ = ["MetaManager", "MetaManagerHandler"]
class MetaManager(AsyncContentsManager):
copy_pat = re.compile(r"\-Copy\d*\.")
@default("files_handler_params")
def _files_handler_params_default(self):
return {"path": self.root_dir}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._jupyterfsConfig = JupyterFsConfig(config=self.config)
self._kwargs = kwargs
self._pyfs_kw = {}
self.resources = []
self._default_root_manager = self._jupyterfsConfig.root_manager_class(
**self._kwargs
)
self._managers = dict((("", self._default_root_manager),))
# copy kwargs to pyfs_kw, removing kwargs not relevant to pyfs
self._pyfs_kw.update(kwargs)
for k in (k for k in ("config", "log", "parent") if k in self._pyfs_kw):
self._pyfs_kw.pop(k)
self.initResource(*self._jupyterfsConfig.resources)
def initResource(self, *resources, options={}):
"""initialize one or more (name, url) tuple representing a PyFilesystem resource specification"""
# handle options
cache = options.get("cache", True)
verbose = options.get("verbose", False)
self.resources = []
managers = dict((("", self._default_root_manager),))
for resource in resources:
# server side resources don't have a default 'auth' key
if "auth" not in resource:
resource["auth"] = "ask"
# get deterministic hash of PyFilesystem url
_hash = md5(resource["url"].encode("utf-8")).hexdigest()[:8]
init = False
missingTokens = None
errors = []
if _hash in self._managers and cache:
# reuse existing cm
managers[_hash] = self._managers[_hash]
init = True
elif _hash in managers and cache:
# don't add redundant cm
init = True
else:
if resource["auth"] == "ask":
urlSubbed, missingTokens = substituteAsk(resource)
elif resource["auth"] == "env":
urlSubbed, missingTokens = substituteEnv(resource)
else:
urlSubbed, missingTokens = substituteNone(resource)
if missingTokens:
# skip trying to init any resource with missing info
_hash = "_NOT_INIT"
init = False
else:
# create new cm
default_writable = resource.get("defaultWritable", True)
try:
managers[_hash] = FSManager(
urlSubbed,
default_writable=default_writable,
parent=self,
**self._pyfs_kw,
)
init = True
except (FSError, OpenerError, ParseError) as e:
self.log.exception(
"Failed to create manager for resource %r",
resource.get("name"),
)
errors.append(str(e))
# assemble resource from spec + hash
newResource = {}
newResource.update(resource)
newResource.update({"drive": _hash, "init": init})
if self._jupyterfsConfig.surface_init_errors:
newResource["errors"] = errors
if missingTokens is not None:
newResource["missingTokens"] = missingTokens
if "tokenDict" in newResource:
# sanity check: tokenDict should not make the round trip
raise ValueError("tokenDict not removed from resource by initResource")
self.resources.append(newResource)
# replace existing contents managers with new
self._managers = managers
if verbose:
print(
"jupyter-fs initialized: {} file system resources, {} managers".format(
len(self.resources), len(self._managers)
)
)
return self.resources
@property
def root_manager(self):
# in jlab, the root drive prefix is blank
return self._managers.get("")
@property
def root_dir(self):
return self.root_manager.root_dir
async def copy(self, from_path, to_path=None):
"""Copy an existing file and return its new model.
If to_path not specified, it will be the parent directory of from_path.
If to_path is a directory, filename will increment `from_path-Copy#.ext`.
Considering multi-part extensions, the Copy# part will be placed before the first dot for all the extensions except `ipynb`.
For easier manual searching in case of notebooks, the Copy# part will be placed before the last dot.
from_path must be a full path to a file.
"""
path = from_path.strip("/")
if to_path is not None:
to_path = to_path.strip("/")
if "/" in path:
from_dir, from_name = path.rsplit("/", 1)
else:
from_dir = ""
from_name = path
model = self.get(path)
model.pop("path", None)
model.pop("name", None)
if model["type"] == "directory":
raise web.HTTPError(400, "Can't copy directories")
if to_path is None:
to_path = from_dir
if self.dir_exists(to_path):
name = self.copy_pat.sub(".", stripDrive(from_name))
# ensure that any drives are stripped from the resulting filename
to_name = self.increment_filename(name, to_path, insert="-Copy")
# separate path and filename with a slash if to_path is not just a drive string
to_path = ("{0}{1}" if isDrive(to_path) else "{0}/{1}").format(
to_path, to_name
)
model = self.save(model, to_path)
return model
def _getManagerForPath(self, path):
drive = getDrive(path)
mgr = self._managers.get(drive)
if mgr is None:
raise web.HTTPError(
404,
"Couldn't find manager {mgrName} for {path}".format(
mgrName=drive, path=path
),
)
return mgr, stripDrive(path)
is_hidden = path_first_arg("is_hidden", False)
dir_exists = path_first_arg("dir_exists", False)
file_exists = path_kwarg("file_exists", "", False)
exists = path_first_arg("exists", False)
save = path_second_arg("save", "model", True)
rename = path_old_new("rename", False)
get = path_first_arg("get", True)
delete = path_first_arg("delete", False)
get_kernel_path = path_first_arg("get_kernel_path", False, sync=True)
create_checkpoint = path_first_arg("create_checkpoint", False)
list_checkpoints = path_first_arg("list_checkpoints", False)
restore_checkpoint = path_second_arg(
"restore_checkpoint",
"checkpoint_id",
False,
)
delete_checkpoint = path_second_arg(
"delete_checkpoint",
"checkpoint_id",
False,
)
class MetaManagerHandler(APIHandler):
_jupyterfsConfig = None
@property
def fsconfig(self):
# TODO: This pattern will not pick up changes to config after this!
if self._jupyterfsConfig is None:
self._jupyterfsConfig = JupyterFsConfig(config=self.config)
return self._jupyterfsConfig
def _config_changed(self):
self._jupyterfsConfig
def _validate_resource(self, resource):
if self.fsconfig.resource_validators:
for validator in self.fsconfig.resource_validators:
if re.fullmatch(validator, resource["url"]) is not None:
break
else:
self.log.warning(
"Resource failed validation: %r vs %r",
resource["url"],
self.fsconfig.resource_validators,
)
return False
return True
@web.authenticated
async def get(self):
"""Returns all the available contents manager prefixes
e.g. if the contents manager configuration is something like:
{
"file": LargeFileContentsManager,
"s3": S3ContentsManager,
"samba": SambaContentsManager
}
the result here will be:
["file", "s3", "samba"]
which will allow the frontent to instantiate 3 new filetrees, one
for each of the available contents managers.
"""
self.finish(json.dumps(self.contents_manager.resources))
@web.authenticated
async def post(self):
# will be a list of resource dicts
body = self.get_json_body()
options = body["options"]
if not self.fsconfig.allow_user_resources:
if body["resources"]:
self.log.warning("User not allowed to configure resources, ignoring")
resources = self.fsconfig.resources
else:
client_resources = body["resources"]
valid_resources = list(filter(self._validate_resource, client_resources))
if "_addServerside" in options and options["_addServerside"]:
resources = list((*self.fsconfig.resources, *valid_resources))
else:
resources = valid_resources
self.finish(
json.dumps(self.contents_manager.initResource(*resources, options=options))
)