Source code for hpotk.util._io

import gzip
import io
import logging
import pathlib
import ssl
import sys
import typing
import warnings
from urllib.request import urlopen

import certifi


[docs] def looks_like_url(file: str) -> bool: """ Checks if the `file` looks like a URL. :param file: file to check. :return: `True` if the `file` starts with `http://` or `https://`. """ return file.startswith("http://") or file.startswith("https://")
[docs] def looks_gzipped(file: str) -> bool: """ Checks file suffix to determine if it looks gzipped. :param file: file path to check. :return: `True` if the `file` ends with `.gz`. """ return file.endswith(".gz")
def _parse_encoding( encoding: typing.Optional[str], logger: logging.Logger, ) -> str: if encoding is None: encoding = sys.getdefaultencoding() logger.debug("Using default encoding '%s'", encoding) else: logger.debug("Using provided encoding '%s'", encoding) return encoding
[docs] def open_text_io_handle_for_reading( fh: typing.Union[typing.TextIO, typing.BinaryIO, pathlib.Path, str], timeout: int = 30, encoding: typing.Optional[str] = None, ) -> typing.TextIO: """ Open a `io.TextIO` file handle based on `fh`. :param fh: a `str` or `typing.IO` to read from. If `str`, then it should be a path to a local file or a URL of a remote resource. Either `http` or `https` protocols are supported. The content will be uncompressed on the fly if the file name ends with `.gz`. If `fh` is an IO wrapper, the function ensures we get a text wrapper that uses given encoding. :param timeout: timeout in seconds used when accessing a remote resource. :param encoding: encoding used to decode the input or the system preferred encoding if unset. :return: the :class:`io.TextIO` wrapper. """ logger = logging.getLogger("hpotk.util") encoding = _parse_encoding(encoding, logger) logger.debug(f"Opening {fh}") if isinstance(fh, (pathlib.Path, str)): # Can be a path to local file or URL fp = str(fh) if looks_like_url(fp): ctx = ssl.create_default_context(cafile=certifi.where()) logger.debug("Looks like a URL: %s", fp) if not isinstance(timeout, int) or timeout <= 0: raise ValueError(f"If {fp} looks like URL then timeout {timeout} must be a positive `int`") logger.debug("Downloading with timeout=%ds", timeout) handle = urlopen( fp, timeout=timeout, context=ctx, ) else: logger.debug("Looks like a local file: %s", fp) handle = open(fp, "rb") if looks_gzipped(fp): logger.debug("Looks like a gzipped data, decompressing on the fly") return gzip.open(handle, mode="rt", newline="", encoding=encoding) else: logger.debug("Looks like decompressed data") return io.TextIOWrapper(handle, encoding=encoding) elif isinstance(fh, typing.IO): if isinstance(fh, typing.BinaryIO): logger.debug("Looks like a binary IO") return io.TextIOWrapper(fh, encoding=encoding) elif isinstance(fh, typing.TextIO): return fh else: raise ValueError(f"Unexpected type {type(fh)}") else: raise ValueError(f"Unexpected type {type(fh)}")
def open_text_io_handle( fh: typing.Union[typing.TextIO, typing.BinaryIO, str], timeout: int = 30, encoding: typing.Optional[str] = None, ) -> typing.TextIO: """ Open a `io.TextIO` file handle based on `fh`. :param fh: a `str` or `typing.IO` to read from. If `str`, then it should be a path to a local file or a URL of a remote resource. Either `http` or `https` protocols are supported. The content will be uncompressed on the fly if the file name ends with `.gz`. If `fh` is an IO wrapper, the function ensures we get a text wrapper that uses given encoding. :param timeout: timeout in seconds used when accessing a remote resource. :param encoding: encoding used to decode the input or the system preferred encoding if unset. :return: the :class:`io.TextIO` wrapper. """ # REMOVE(v1.0.0) warnings.warn( "The method has been deprecated and will be removed in v1.0.0. Use `open_text_io_handle_for_reading` instead", DeprecationWarning, stacklevel=2, ) return open_text_io_handle_for_reading(fh, timeout, encoding)
[docs] def open_text_io_handle_for_writing( fh: typing.Union[typing.TextIO, typing.BinaryIO, pathlib.Path, str], encoding: typing.Optional[str] = None, ) -> typing.TextIO: """ Open a `io.TextIO` file handle based on `fpath`. :param fh: a `str` with a path to a local file The content will be compressed on the fly if the file name ends with `.gz`. :param encoding: encoding used to encode the output or the system preferred encoding if unset. :return: a :class:`io.TextIO` wrapper. """ logger = logging.getLogger("hpotk.util") encoding = _parse_encoding(encoding, logger) if isinstance(fh, (pathlib.Path, str)): fp = str(fh) if looks_gzipped(fp): logger.debug("Looks like gzipped data, compressing on the fly") return gzip.open(fh, mode="wt", newline="", encoding=encoding) else: return open(fh, "w") elif isinstance(fh, typing.BinaryIO): logger.debug("Looks like a binary IO") return io.TextIOWrapper(fh, encoding=encoding) elif isinstance(fh, typing.TextIO): return fh else: raise ValueError(f"Unexpected type {type(fh)}")