Skip to content

Commit

Permalink
[plugins] speedup journal collection
Browse files Browse the repository at this point in the history
Instead of generating all the logs and tailing the last 100M,
we get the first 100M of 'journalctl --reverse' that we then
reverse again using our own implementation of tac.

To handle multiline logs we would need to use "tac -brs '^[^ ]'"
that takes ~30s on 100M of logs when plain 'tac' takes ~0.3s.
Our simple implementation in python takes 0.7s,
and avoid an extra dependency.

On journalctl timeout we now get the most recents logs.

During collection logs are now buffered on disk, so we use 2xsizelimit.
While running our tac we could actually truncate the source file to
limit disk usage. Previously buffering was in RAM (also 2xsizelimit).

On my test server, logs plugin runtime goes from 34s to 9.5s.

Signed-off-by: Etienne Champetier <[email protected]>
  • Loading branch information
champtar committed Dec 13, 2024
1 parent 678ea65 commit b4534fd
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 11 deletions.
19 changes: 14 additions & 5 deletions sos/report/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
binary=False, sizelimit=None, subdir=None,
changes=False, foreground=False, tags=[],
priority=10, cmd_as_tag=False, to_file=False,
container_cmd=False, runas=None):
tac=False, container_cmd=False, runas=None):
"""Execute a command and save the output to a file for inclusion in the
report.
Expand Down Expand Up @@ -2400,6 +2400,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
:param cmd_as_tag: Format command string to tag
:param to_file: Write output directly to file instead
of saving in memory
:param tac: Reverse lines order (need to_file=True)
:param runas: Run the `cmd` as the `runas` user
:returns: dict containing status, output, and filename in the
Expand Down Expand Up @@ -2451,7 +2452,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
cmd, timeout=timeout, stderr=stderr, chroot=root,
chdir=runat, env=_env, binary=binary, sizelimit=sizelimit,
poller=self.check_timeout, foreground=foreground,
to_file=out_file, runas=runas
to_file=out_file, tac=tac, runas=runas
)

end = time()
Expand Down Expand Up @@ -2489,7 +2490,7 @@ def _collect_cmd_output(self, cmd, suggest_filename=None,
result = sos_get_command_output(
cmd, timeout=timeout, chroot=False, chdir=runat,
env=env, binary=binary, sizelimit=sizelimit,
poller=self.check_timeout, to_file=out_file
poller=self.check_timeout, to_file=out_file, tac=tac,
)
run_time = time() - start
self._log_debug(f"could not run '{cmd}': command not found")
Expand Down Expand Up @@ -3083,10 +3084,18 @@ def add_journal(self, units=None, boot=None, since=None, until=None,
if output:
journal_cmd += output_opt % output

fname = journal_cmd
tac = False
if log_size > 0 and is_executable("head"):
journal_cmd = f"sh -c '{journal_cmd} --reverse | " \
"head -c {log_size*1024*1024}'"
log_size = 0
tac = True

self._log_debug(f"collecting journal: {journal_cmd}")
self._add_cmd_output(cmd=journal_cmd, timeout=timeout,
self._add_cmd_output(cmd=journal_cmd, timeout=timeout, tac=tac,
sizelimit=log_size, pred=pred, tags=tags,
priority=priority)
priority=priority, suggest_filename=fname)

def _expand_copy_spec(self, copyspec):
def __expand(paths):
Expand Down
53 changes: 47 additions & 6 deletions sos/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading
import time
import io
import mmap
from contextlib import closing
from collections import deque

Expand Down Expand Up @@ -213,7 +214,7 @@ def is_executable(command, sysroot=None):
def sos_get_command_output(command, timeout=TIMEOUT_DEFAULT, stderr=False,
chroot=None, chdir=None, env=None, foreground=False,
binary=False, sizelimit=None, poller=None,
to_file=False, runas=None):
to_file=False, tac=False, runas=None):
# pylint: disable=too-many-locals,too-many-branches
"""Execute a command and return a dictionary of status and output,
optionally changing root or current working directory before
Expand Down Expand Up @@ -275,8 +276,11 @@ def _check_poller(proc):
else:
expanded_args.append(arg)
if to_file:
# pylint: disable=consider-using-with
_output = open(to_file, 'w', encoding='utf-8')
if tac:
_output = tempfile.TemporaryFile(dir=os.path.dirname(to_file))
else:
# pylint: disable=consider-using-with
_output = open(to_file, 'w', encoding='utf-8')
else:
_output = PIPE
try:
Expand All @@ -285,10 +289,10 @@ def _check_poller(proc):
bufsize=-1, env=cmd_env, close_fds=True,
preexec_fn=_child_prep_fn) as p:

if not to_file:
reader = AsyncReader(p.stdout, sizelimit, binary)
else:
if to_file:
reader = FakeReader(p, binary)
else:
reader = AsyncReader(p.stdout, sizelimit, binary)

if poller:
while reader.running:
Expand All @@ -301,13 +305,19 @@ def _check_poller(proc):
except Exception:
p.terminate()
if to_file:
if tac:
with open(to_file, 'wb') as f_dst:
tac_logs(_output, f_dst)
_output.close()
# until we separate timeouts from the `timeout` command
# handle per-cmd timeouts via Plugin status checks
reader.running = False
return {'status': 124, 'output': reader.get_contents(),
'truncated': reader.is_full}
if to_file:
if tac:
with open(to_file, 'wb') as f_dst:
tac_logs(_output, f_dst)
_output.close()

# wait for Popen to set the returncode
Expand All @@ -332,6 +342,37 @@ def _check_poller(proc):
raise e


def tac_logs(f_src, f_dst):
"""Python implementation of the tac utility with support
for multiline logs (starting with space). It is intended
to reverse the output of 'journalctl --reverse'.
"""
NEWLINE = b'\n'
SPACE = 32
with mmap.mmap(f_src.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# find the last NEWLINE, this skips the last line if it's partial
sep1 = sep2 = mm.rfind(NEWLINE)
while sep2 >= 0:
sep1 = mm.rfind(NEWLINE, 0, sep1)
# multiline logs have a first line not starting with space
# followed by lines starting with spaces
# line 5
# line 4
# multiline 4
# line 3
if mm[sep1+1] == SPACE:
# first line starts with a space
# (this should not happen)
if sep1 == -1:
break
# go find the previous NEWLINE
continue
# write the log line ending with the NEWLINE
f_dst.write(mm[sep1+1:sep2+1])
sep2 = sep1
mm.close()


def import_module(module_fqname, superclasses=None):
"""Imports the module module_fqname and returns a list of defined classes
from that module. If superclasses is defined then the classes returned will
Expand Down

0 comments on commit b4534fd

Please sign in to comment.