#!/usr/bin/env python3
import bz2
import gzip
import lzma
import urllib.request
import os
import shutil
import tarfile
import multiprocessing
import subprocess
import sys
import socket
from contextlib import contextmanager
from pmotools.utils.color_text import ColorText as CT
[docs]class Utils:
"""
A small utility class to hold static methods for various 1-off tools
"""
[docs] @staticmethod
def isMac():
return sys.platform == "darwin"
[docs] @staticmethod
def connectedInternet():
# from http://stackoverflow.com/questions/20913411/test-if-an-internet-connection-is-present-in-python
try:
# see if we can resolve the host name -- tells us if there is
# a DNS listening
host = socket.gethostbyname("www.google.com")
# connect to the host -- tells us if the host is actually
# reachable
socket.create_connection((host, 80), 2)
return True
except (AttributeError, ValueError, IndexError, KeyError):
pass
return False
[docs] @staticmethod
def which(program):
# from http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
def is_exe(fnp):
return os.path.isfile(fnp) and os.access(fnp, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
[docs] @staticmethod
def hasProgram(program):
whichOutput = Utils.which(program)
return whichOutput is not None
[docs] @staticmethod
def run_in_dir(cmd, d):
# print CT.boldBlack("here")
cmd = "cd " + Utils.shellquote(d) + " && " + cmd + " && cd -"
# print CT.boldBlack("newcmd")
print(CT.boldGreen(cmd))
Utils.run(cmd)
[docs] @staticmethod
def run(cmd):
# from http://stackoverflow.com/a/4418193
process = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
# output, errors = process.communicate()
# sys.stdout.write(output.decode('utf-8'))
# sys.stdout.flush()
output = ""
while True:
nextline = process.stdout.readline().decode("utf-8")
if nextline == "" and process.poll() is not None:
break
sys.stdout.write(nextline)
output = output + nextline
sys.stdout.flush()
exitCode = process.returncode
if exitCode == 0:
return output
raise Exception(cmd, exitCode, output)
[docs] @staticmethod
def runAndCapture(cmd):
# from http://stackoverflow.com/a/4418193
process = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output, errors = process.communicate()
# this is suppose to capture the output but it isn't for some reason so capturing it with the above
exitCode = process.returncode
if exitCode == 0:
return output.decode("utf-8")
raise Exception(cmd, exitCode, output.decode("utf-8"), errors)
[docs] @staticmethod
def shellquote(s):
# from http://stackoverflow.com/a/35857
return "'" + s.replace("'", "'\\''") + "'"
[docs] @staticmethod
def num_cores():
return multiprocessing.cpu_count()
[docs] @staticmethod
def mkdir(d):
"""mkdir if it doesn't already exist"""
if not os.path.exists(d):
print(CT.boldText("mkdir"), CT.boldGreen(d))
os.makedirs(d)
[docs] @staticmethod
def get_file(url, d):
"""get file from url and put it into directory d, return new name"""
fn = url.split("/")[-1]
out_fnp = os.path.join(d, fn)
urllib.request.urlretrieve(url, out_fnp)
return out_fnp
[docs] @staticmethod
def get_file_if_size_diff(url, d):
"""only download the file if it's needed, not completely fail proof since it is
just a size check but fairly likely not to be the same for a difference"""
fn = url.split("/")[-1]
out_fnp = os.path.join(d, fn)
net_file_size = int(urllib.request.urlopen(url).info()["Content-Length"])
if os.path.exists(out_fnp):
fn_size = os.path.getsize(out_fnp)
if fn_size == net_file_size:
print("skipping download of", CT.boldGreen(fn))
return out_fnp
else:
print(
"files sizes differed:",
"on disk:",
fn_size,
"from net:",
net_file_size,
)
print("retrieving", CT.boldGreen(fn), "from", CT.boldBlue(url))
urllib.request.urlretrieve(url, out_fnp)
return out_fnp
[docs] @staticmethod
def rm_rf(d):
"""remove directory forcibly"""
if os.path.exists(d):
print(CT.boldText("rm -rf"), CT.boldRed(d))
shutil.rmtree(d)
[docs] @staticmethod
def untar(fnp, d):
"""un pack compressed file, guessing format based on extension"""
if fnp.endswith(".tar.gz"):
tar = tarfile.open(fnp, "r:gz")
elif fnp.endswith(".tgz"):
tar = tarfile.open(fnp, "r:gz")
elif fnp.endswith(".tar.bz2"):
tar = tarfile.open(fnp, "r:bz2")
elif fnp.endswith(".tar"):
tar = tarfile.open(fnp, "r")
else:
raise Exception("invalid file? " + fnp)
print("untarring", CT.boldGreen(fnp), "to", CT.boldBlue(d))
tar.extractall(d)
tar.close()
[docs] @staticmethod
def getStrFromStrOrList(inputArg):
if isinstance(inputArg, list):
return str(inputArg[0])
elif not isinstance(inputArg, str):
return str(inputArg)
else:
return inputArg
[docs] @staticmethod
def clear_dir(d):
"""forcibly delete directory and then re-make it"""
Utils.rm_rf(d)
Utils.mkdir(d)
[docs] @staticmethod
def appendStrAsNeeded(input: str, ending: str):
"""
if a string doesn't end with a specific ending, append it, this is useful for ensuring file extensions are in output names without accidentally doubling it
:param input: the string to be appended
:param ending: the desired ending
:return: the string with eh ending appended if it doesn't already end with it
"""
if not input.endswith(ending):
return input + ending
return input
[docs] @staticmethod
def appendStrAsNeededDoubleEnding(input: str, ending1: str, ending2: str):
"""
if a string doesn't end with a specific combination of endings (e.g. if ending of input does not equal ending1 + ending2), append it, this is useful for ensuring file extensions plus .gz for zipped files are in output names without accidentally doubling it
:param input: the string to be appended
:param ending1: the first part of the desired ending
:param ending2: the second part of the desired ending
:return: the string with the ending1 + ending2 appended if it doesn't already end with it
"""
full_ending = ending1 + ending2
if not input.endswith(full_ending):
if input.endswith(ending1):
return input + ending2
else:
return input + full_ending
return input
[docs] @staticmethod
def process_delimiter_and_output_extension(
delim: str, output_extension: str = ".txt", gzip: bool = False
) -> tuple[str, str]:
"""
Process delimiter and extension, this allows for delim to be listed as tab or comma and it will replace appropriately the
:param delim: the delimiter to process
:param output_extension: the output extension
:param gzip: whether or not to add .gz to the output extension as well
:return: delimiter, extension
"""
out_delim = delim
out_output_extension = output_extension
output_extension = ".txt"
if delim == "tab" or delim == "\t":
out_delim = "\t"
out_output_extension = ".tsv"
elif delim == "comma" or delim == ",":
out_delim = ","
out_output_extension = ".csv"
if gzip:
out_output_extension += ".gz"
return out_delim, out_output_extension
[docs] @staticmethod
def outputfile_check(output_file: str, overwrite: bool = False):
"""
Check to see if the output file exists if overwrite is turned on or not
:param output_file: the output file that will be written to
:param overwrite: whether or not the output file can be overwritten
:return: None
"""
# only overwrite an existing file if --overwrite is on
if "STDOUT" != output_file and os.path.exists(output_file) and not overwrite:
raise Exception(
"Output file "
+ output_file
+ " already exists, use overwrite=T (or --overwrite if running from command line interface) to overwrite it"
)
[docs] @staticmethod
@contextmanager
def smart_open_write(filename):
"""
Context manager for writing to a file, stdout, or a gzip-compressed file.
Args:
filename (str): Output filename, "STDOUT" for standard output,
or a filename ending in ".gz" for gzip compression.
Yields:
file object: A writable file-like object.
"""
if filename == "STDOUT":
yield sys.stdout
elif filename.endswith(".gz"):
with gzip.open(filename, "wt", encoding="utf-8") as f:
yield f
else:
with open(filename, "w", encoding="utf-8") as f:
yield f
[docs] @staticmethod
@contextmanager
def smart_open_read_by_ext(filename):
"""
Context manager for reading a file, using extension-based detection.
Args:
filename (str): "STDIN", or a filename ending in .gz, .bz2, .xz, .lzma, or uncompressed.
Yields:
file object: Readable file-like object.
"""
if filename == "STDIN":
yield sys.stdin
elif filename.endswith(".gz"):
with gzip.open(filename, "rt", encoding="utf-8") as f:
yield f
elif filename.endswith(".bz2"):
with bz2.open(filename, "rt", encoding="utf-8") as f:
yield f
elif filename.endswith(".xz") or filename.endswith(".lzma"):
with lzma.open(filename, "rt", encoding="utf-8") as f:
yield f
else:
with open(filename, "r", encoding="utf-8") as f:
yield f
[docs] @staticmethod
@contextmanager
def smart_open_read_autodetect(filename):
"""
Context manager for reading a file, using magic number autodetection of compression type.
Supports gzip, bzip2, lzma/xz, or plain text regardless of extension.
Args:
filename (str): "STDIN" or a file path
Yields:
file object: Readable file-like object.
"""
if filename == "STDIN":
yield sys.stdin
return
# Read magic number
with open(filename, "rb") as raw_file:
magic = raw_file.read(6)
if magic.startswith(b"\x1f\x8b"): # gzip
opener = gzip.open
elif magic.startswith(b"BZh"): # bz2
opener = bz2.open
elif magic.startswith(b"\xfd7zXZ") or magic.startswith(
b"\x5d\x00\x00"
): # xz/lzma
opener = lzma.open
else:
opener = open
with opener(filename, "rt", encoding="utf-8") as f:
yield f