Skip to content

Commit

Permalink
Merge pull request #141 from lxl66566/master
Browse files Browse the repository at this point in the history
fix(utils): improve unzip/extract
  • Loading branch information
dvershinin authored Jan 26, 2024
2 parents 705ccaa + 6e916d4 commit d9cd7b6
Showing 1 changed file with 71 additions and 53 deletions.
124 changes: 71 additions & 53 deletions src/lastversion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import sys
import tarfile
import tempfile
import zipfile
from pathlib import Path
from urllib.parse import unquote

import distro
import requests
import tqdm

from .exceptions import TarPathTraversalException
from lastversion.exceptions import TarPathTraversalException

PY7ZR_AVAILABLE = False
try:
Expand Down Expand Up @@ -320,55 +321,77 @@ def download_file(url, local_filename=None):
return local_filename


def is_within_directory(directory, target):
"""Check if the target path is within the directory path."""
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)

prefix = os.path.commonprefix([abs_directory, abs_target])

return prefix == abs_directory
def check_if_tar_safe(tar_file: tarfile.TarFile, to_dir) -> bool:
"""CVE-2007-4559"""
all_members = tar_file.getnames()
root_dir = Path(all_members[0]).parent.resolve()
for member in all_members:
if not Path(member).resolve().is_relative_to(root_dir):
return False
return True


def safe_extract(tar, path=".", members=None):
"""Safe extract .tar.gz to workaround CVE-2007-4559. CVE-2007-4559
Args:
tar ():
path ():
members ():
def extract_tar_and_zip(buffer: io.BytesIO, to_dir):
"""Extract a tar/zip archive to dir.
If archive has only one top dir, it will be stripped.
"""
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise TarPathTraversalException("Attempted Path Traversal in Tar File")

tar.extractall(path, members)
def extract_archive(file: zipfile.ZipFile | tarfile.TarFile):
def get_contents(x) -> list:
return getattr(x, "getmembers", getattr(x, "infolist", None))()

def getname(x) -> str:
return getattr(x, "name", getattr(x, "filename", None))

def is_dir(x) -> bool:
return getattr(x, "isdir", getattr(x, "is_dir", None))()

contents = get_contents(file)
assert contents, "Empty archive"
top_dir = Path(getname(contents[0])).resolve()
only_one_top_dir = True
if not is_dir(contents[0]):
only_one_top_dir = False
if only_one_top_dir:
for item in contents[1:]:
if not Path(getname(item)).resolve().parent.is_relative_to(top_dir):
only_one_top_dir = False
break
log.info(f"only one top dir: {only_one_top_dir}")
if only_one_top_dir:
for item in contents[1:]:
new_path = str(Path(getname(item)).resolve().relative_to(top_dir))
if getattr(item, "name", False):
item.name = new_path
elif getattr(item, "filename", False):
item.filename = new_path
file.extract(item, to_dir)
else:
file.extractall(path=to_dir)

try:
with tarfile.open(fileobj=buffer, mode="r") as file:
if not check_if_tar_safe(file, to_dir):
raise TarPathTraversalException("Attempted Path Traversal in Tar File")
extract_archive(file)
except tarfile.ReadError:
with zipfile.ZipFile(buffer, "r") as file:
extract_archive(file)


def extract_7z(buffer: io.BytesIO, to_dir):
"""Extract a 7z archive to dir.
py7zr maybe hard to strip the top level dir.
"""
if not PY7ZR_AVAILABLE:
log.critical("pip install py7zr to support .7z archives")
return
with py7zr.SevenZipFile(buffer) as file:
file.extractall(path=to_dir)

def extract_tar(buffer, mode):
"""Extract a tar archive while stripping the top level dir.

Args:
buffer ():
mode ():
"""
smart_members = []
with tarfile.open(fileobj=buffer, mode=mode) as tar_file:
all_members = tar_file.getmembers()
if not all_members:
log.critical("No or not an archive")
root_dir = all_members[0].path
root_dir_with_slash_len = len(root_dir) + 1
for member in tar_file.getmembers():
if member.path.startswith(root_dir + "/"):
member.path = member.path[root_dir_with_slash_len:]
smart_members.append(member)
safe_extract(tar_file, members=smart_members)


def extract_file(url):
"""Extract an archive while stripping the top level dir."""
def extract_file(url: str, to_dir="."):
"""Extract an archive from url to dir, stripping the top level dir by default."""
if url.endswith(".7z") and not PY7ZR_AVAILABLE:
log.critical("pip install py7zr to support .7z archives")
return
Expand Down Expand Up @@ -403,15 +426,10 @@ def extract_file(url):
buffer.seek(0)
# Process the buffer (e.g., extract its contents)

mode = "r:gz"
if url.endswith(".tar.xz"):
mode = "r:xz"
elif url.endswith(".7z"):
if PY7ZR_AVAILABLE:
with py7zr.SevenZipFile(buffer) as archive:
archive.extractall()
return # Exit the function after extracting
extract_tar(buffer, mode)
if url.endswith(".7z"):
extract_7z(buffer, to_dir=to_dir)
else:
extract_tar_and_zip(buffer, to_dir=to_dir)
except KeyboardInterrupt:
pbar.close()
log.warning("Cancelled")
Expand Down

0 comments on commit d9cd7b6

Please sign in to comment.