Skip to content

Commit

Permalink
add line buffering and text mode to exec
Browse files Browse the repository at this point in the history
  • Loading branch information
azliu0 committed Nov 7, 2024
1 parent aa6f099 commit 8831064
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 11 deletions.
22 changes: 20 additions & 2 deletions modal/container_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class _ContainerProcess:
_stdout: _StreamReader
_stderr: _StreamReader
_stdin: _StreamWriter
_text: bool
_by_line: bool
_returncode: Optional[int] = None

def __init__(
Expand All @@ -27,14 +29,30 @@ def __init__(
client: _Client,
stdout: StreamType = StreamType.PIPE,
stderr: StreamType = StreamType.PIPE,
text: bool = True,
by_line: bool = False,
) -> None:
self._process_id = process_id
self._client = client
self._text = text
self._by_line = by_line
self._stdout = _StreamReader(
api_pb2.FILE_DESCRIPTOR_STDOUT, process_id, "container_process", self._client, stream_type=stdout
api_pb2.FILE_DESCRIPTOR_STDOUT,
process_id,
"container_process",
self._client,
stream_type=stdout,
text=text,
by_line=by_line,
)
self._stderr = _StreamReader(
api_pb2.FILE_DESCRIPTOR_STDERR, process_id, "container_process", self._client, stream_type=stderr
api_pb2.FILE_DESCRIPTOR_STDERR,
process_id,
"container_process",
self._client,
stream_type=stderr,
text=text,
by_line=by_line,
)
self._stdin = _StreamWriter(process_id, "container_process", self._client)

Expand Down
16 changes: 12 additions & 4 deletions modal/io_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(
object_type: Literal["sandbox", "container_process"],
client: _Client,
stream_type: StreamType = StreamType.PIPE,
by_line: bool = False, # if True, streamed logs are further processed into complete lines.
text: bool = True,
by_line: bool = False,
) -> None:
"""mdmd:hidden"""

Expand All @@ -91,6 +92,7 @@ def __init__(
self._stream = None
self._last_entry_id = None
self._line_buffer = ""
self._text = text
self._by_line = by_line
# Whether the reader received an EOF. Once EOF is True, it returns
# an empty string for any subsequent reads (including async for)
Expand All @@ -116,7 +118,7 @@ def __init__(
def file_descriptor(self):
return self._file_descriptor

async def read(self) -> str:
async def read(self) -> Union[str, bytes]:
"""Fetch and return contents of the entire stream. If EOF was received,
return an empty string.
Expand All @@ -139,7 +141,10 @@ async def read(self) -> str:
break
data += message

return data
if self._text:
return data
else:
return bytes(data, "utf-8")

async def _consume_container_process_stream(self):
"""
Expand Down Expand Up @@ -277,7 +282,10 @@ async def __anext__(self):
if value is None:
raise StopAsyncIteration

return value
if self._text:
return value
else:
return bytes(value, "utf-8")


MAX_BUFFER_SIZE = 2 * 1024 * 1024
Expand Down
10 changes: 5 additions & 5 deletions modal/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,12 @@ async def _get_task_id(self):
async def exec(
self,
*cmds: str,
# Deprecated: internal use only
pty_info: Optional[api_pb2.PTYInfo] = None,
pty_info: Optional[api_pb2.PTYInfo] = None, # Deprecated: internal use only
stdout: StreamType = StreamType.PIPE,
stderr: StreamType = StreamType.PIPE,
# Internal option to set terminal size and metadata
_pty_info: Optional[api_pb2.PTYInfo] = None,
text: bool = True, # Encode output as text
by_line: bool = False, # Line-buffered output
_pty_info: Optional[api_pb2.PTYInfo] = None, # Internal option to set terminal size and metadata
):
"""Execute a command in the Sandbox and return
a [`ContainerProcess`](/docs/reference/modal.ContainerProcess#modalcontainer_process) handle.
Expand Down Expand Up @@ -433,7 +433,7 @@ async def exec(
runtime_debug=config.get("function_runtime_debug"),
)
)
return _ContainerProcess(resp.exec_id, self._client, stdout=stdout, stderr=stderr)
return _ContainerProcess(resp.exec_id, self._client, stdout=stdout, stderr=stderr, text=text, by_line=by_line)

@property
def stdout(self) -> _StreamReader:
Expand Down
30 changes: 30 additions & 0 deletions test/io_streams_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,33 @@ async def sandbox_get_logs(servicer, stream):
out.append(line)

assert out == ["foobar\n", "baz"]


def test_stream_reader_bytes_mode(servicer, client):
"""Test that the stream reader works in bytes mode."""

async def sandbox_get_logs(servicer, stream):
await stream.recv_message()

log = api_pb2.TaskLogs(
data="foo\n",
file_descriptor=api_pb2.FILE_DESCRIPTOR_STDOUT,
)
await stream.send_message(api_pb2.TaskLogsBatch(entry_id="0", items=[log]))

# send EOF
await stream.send_message(api_pb2.TaskLogsBatch(eof=True))

with servicer.intercept() as ctx:
ctx.set_responder("SandboxGetLogs", sandbox_get_logs)

with enable_output():
stdout = StreamReader(
file_descriptor=api_pb2.FILE_DESCRIPTOR_STDOUT,
object_id="sb-123",
object_type="sandbox",
client=client,
text=False,
)

assert stdout.read() == b"foo\n"
14 changes: 14 additions & 0 deletions test/sandbox_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,20 @@ async def test_sandbox_async_for(app, servicer):
assert await sb.stderr.read.aio() == ""


@skip_non_linux
def test_sandbox_stdout_bytes_mode(app, servicer):
"""Test that the stream reader works in bytes mode."""

sb = Sandbox.create(app=app)

p = sb.exec("echo", "foo", text=False)
assert p.stdout.read() == b"foo\n"

p = sb.exec("echo", "foo", text=False)
for line in p.stdout:
assert line == b"foo\n"


@skip_non_linux
def test_app_sandbox(client, servicer):
image = Image.debian_slim().pip_install("xyz")
Expand Down

0 comments on commit 8831064

Please sign in to comment.