Source code for container_guts.utils.fileio

__author__ = "Vanessa Sochat"
__copyright__ = "Copyright 2021-2024, Vanessa Sochat"
__license__ = "MPL 2.0"

import errno
import hashlib
import json
import os
import re
import shutil
import stat
import tempfile

from container_guts.logger import logger


[docs] def can_be_deleted(path, ignore_files=None): """ A path can be deleted if it contains no entries, *or* if the only files are in ignore_files """ ignore_files = ignore_files or [] if os.path.exists(path): entries = os.listdir(path) if not entries: return True if set(ignore_files).issuperset(entries): return True return False
[docs] def creation_date(filename): """ Get the creation date, and fallback to modified date. """ stat = os.stat(filename) try: return stat.st_birthtime except AttributeError: return stat.st_mtime
[docs] def mkdirp(dirnames): """ Create one or more directories """ for dirname in dirnames: mkdir_p(dirname)
[docs] def mkdir_p(path): """ Make a directory path if it does not exist, akin to mkdir -p """ try: os.makedirs(path) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(path): pass else: logger.exit("Error creating path %s, exiting." % path)
[docs] def remove_to_base(path, base_path): """ Delete the tree under $path and all the parents up to $base_path as long as they are empty """ if not os.path.isdir(base_path): logger.exit("Error: %s is not a directory" % base_path) if not path.startswith(base_path): logger.exit("Error: %s is not a parent of %s" % (base_path, path)) if os.path.islink(path) or os.path.isfile(path): os.unlink(path) elif os.path.isdir(path): shutil.rmtree(path) # If directories above it are empty, remove while path != base_path: if os.path.exists(path): if not can_be_deleted(path, [".version"]): break shutil.rmtree(path) path = os.path.dirname(path)
[docs] def get_tmpfile(tmpdir=None, prefix=""): """ Get a temporary file with an optional prefix. """ # First priority for the base goes to the user requested. tmpdir = get_tmpdir(tmpdir) # If tmpdir is set, add to prefix if tmpdir: prefix = os.path.join(tmpdir, os.path.basename(prefix)) fd, tmp_file = tempfile.mkstemp(prefix=prefix) os.close(fd) return tmp_file
[docs] def get_tmpdir(tmpdir=None, prefix="", create=True): """ Get a temporary directory for an operation. """ tmpdir = tmpdir or tempfile.gettempdir() prefix = prefix or "guts-tmp" prefix = "%s.%s" % (prefix, next(tempfile._get_candidate_names())) tmpdir = os.path.join(tmpdir, prefix) if not os.path.exists(tmpdir) and create is True: os.mkdir(tmpdir) return tmpdir
[docs] def recursive_find(base, pattern=None): """ Find filenames that match a particular pattern, and yield them. """ # We can identify modules by finding module.lua for root, folders, files in os.walk(base): for file in files: fullpath = os.path.abspath(os.path.join(root, file)) if pattern and not re.search(pattern, fullpath): continue yield fullpath
[docs] def get_file_hash(image_path, algorithm="sha256"): """ Return an sha256 hash of the file based on a criteria level. """ try: hasher = getattr(hashlib, algorithm)() except AttributeError: logger.error("%s is an invalid algorithm.") logger.exit(" ".join(hashlib.algorithms_guaranteed)) with open(image_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) return hasher.hexdigest()
[docs] def copyfile(source, destination, force=True): """ Copy a file from a source to its destination. """ # Case 1: It's already there, we aren't replacing it :) if source == destination and force is False: return destination # Case 2: It's already there, we ARE replacing it :) if os.path.exists(destination) and force is True: os.remove(destination) shutil.copyfile(source, destination) return destination
[docs] def write_file(filename, content, mode="w", exec=False): """ Write content to a filename """ with open(filename, mode) as filey: filey.writelines(content) if exec: st = os.stat(filename) # Execute / search permissions for the user and others os.chmod(filename, st.st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) return filename
[docs] def write_json(json_obj, filename, mode="w"): """ Write json to a filename """ with open(filename, mode) as filey: filey.writelines(print_json(json_obj)) return filename
[docs] def read_file(filename, mode="r"): """ Read a file. """ with open(filename, mode) as filey: content = filey.read() return content
[docs] def read_json(filename, mode="r"): """ Read a json file to a dictionary. """ return json.loads(read_file(filename))