"""
Common Utilities
================
Define the common utilities objects that don't fall in any specific category.
References
----------
- :cite:`DjangoSoftwareFoundation2022` : Django Software Foundation. (2022).
slugify. Retrieved June 1, 2022, from https://github.com/django/django/\
blob/0dd29209091280ccf34e07c9468746c396b7778e/django/utils/text.py#L400
- :cite:`Kienzle2011a` : Kienzle, P., Patel, N., & Krycka, J. (2011).
refl1d.numpyerrors - Refl1D v0.6.19 documentation. Retrieved January 30,
2015, from
http://www.reflectometry.org/danse/docs/refl1d/_modules/refl1d/\
numpyerrors.html
"""
from __future__ import annotations
import functools
import inspect
import os
import re
import types
import unicodedata
import warnings
from contextlib import contextmanager
from copy import copy
from pprint import pformat
import numpy as np
from colour.constants import THRESHOLD_INTEGER
from colour.hints import (
Any,
Callable,
DTypeBoolean,
Generator,
Iterable,
Literal,
Mapping,
Sequence,
TypeVar,
)
from colour.utilities import CanonicalMapping, Lookup, is_xxhash_installed
__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "colour-developers@colour-science.org"
__status__ = "Production"
__all__ = [
"is_caching_enabled",
"set_caching_enable",
"caching_enable",
"CacheRegistry",
"CACHE_REGISTRY",
"handle_numpy_errors",
"ignore_numpy_errors",
"raise_numpy_errors",
"print_numpy_errors",
"warn_numpy_errors",
"ignore_python_warnings",
"attest",
"batch",
"disable_multiprocessing",
"multiprocessing_pool",
"is_iterable",
"is_numeric",
"is_integer",
"is_sibling",
"filter_kwargs",
"filter_mapping",
"first_item",
"copy_definition",
"validate_method",
"optional",
"slugify",
"int_digest",
]
_CACHING_ENABLED: bool = not os.environ.get("COLOUR_SCIENCE__DISABLE_CACHING", False)
"""
Global variable storing the current *Colour* caching enabled state.
"""
[docs]
def is_caching_enabled() -> bool:
"""
Return whether *Colour* caching is enabled.
Returns
-------
:class:`bool`
Whether *Colour* caching is enabled.
Examples
--------
>>> with caching_enable(False):
... is_caching_enabled()
False
>>> with caching_enable(True):
... is_caching_enabled()
True
"""
return _CACHING_ENABLED
[docs]
def set_caching_enable(enable: bool):
"""
Set *Colour* caching enabled state.
Parameters
----------
enable
Whether to enable *Colour* caching.
Examples
--------
>>> with caching_enable(True):
... print(is_caching_enabled())
... set_caching_enable(False)
... print(is_caching_enabled())
True
False
"""
global _CACHING_ENABLED # noqa: PLW0603
_CACHING_ENABLED = enable
[docs]
class caching_enable:
"""
Define a context manager and decorator temporarily setting *Colour* caching
enabled state.
Parameters
----------
enable
Whether to enable or disable *Colour* caching.
"""
[docs]
def __init__(self, enable: bool) -> None:
self._enable = enable
self._previous_state = is_caching_enabled()
def __enter__(self) -> caching_enable:
"""
Set the *Colour* caching enabled state upon entering the context
manager.
"""
set_caching_enable(self._enable)
return self
def __exit__(self, *args: Any):
"""
Set the *Colour* caching enabled state upon exiting the context
manager.
"""
set_caching_enable(self._previous_state)
def __call__(self, function: Callable) -> Callable:
"""Call the wrapped definition."""
@functools.wraps(function)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with self:
return function(*args, **kwargs)
return wrapper
[docs]
class CacheRegistry:
"""
A registry for mapping-based caches.
Attributes
----------
- :attr:`~colour.utilities.CacheRegistry.registry`
Methods
-------
- :meth:`~colour.SpectralShape.__init__`
- :meth:`~colour.SpectralShape.__str__`
- :meth:`~colour.SpectralShape.register_cache`
- :meth:`~colour.SpectralShape.unregister_cache`
- :meth:`~colour.SpectralShape.clear_cache`
- :meth:`~colour.SpectralShape.clear_all_caches`
Examples
--------
>>> cache_registry = CacheRegistry()
>>> cache_a = cache_registry.register_cache("Cache A")
>>> cache_a["Foo"] = "Bar"
>>> cache_b = cache_registry.register_cache("Cache B")
>>> cache_b["John"] = "Doe"
>>> cache_b["Luke"] = "Skywalker"
>>> print(cache_registry)
{'Cache A': '1 item(s)', 'Cache B': '2 item(s)'}
>>> cache_registry.clear_cache("Cache A")
>>> print(cache_registry)
{'Cache A': '0 item(s)', 'Cache B': '2 item(s)'}
>>> cache_registry.unregister_cache("Cache B")
>>> print(cache_registry)
{'Cache A': '0 item(s)'}
>>> print(cache_b)
{}
"""
[docs]
def __init__(self) -> None:
self._registry: dict = {}
@property
def registry(self) -> dict:
"""
Getter property for the cache registry.
Returns
-------
:class:`dict`
Cache registry.
"""
return self._registry
[docs]
def __str__(self) -> str:
"""
Return a formatted string representation of the cache registry.
Returns
-------
:class:`str`
Formatted string representation.
"""
return pformat(
{
name: f"{len(self._registry[name])} item(s)"
for name in sorted(self._registry)
}
)
[docs]
def register_cache(self, name: str) -> dict:
"""
Register a new cache with given name in the registry.
Parameters
----------
name
Cache name for the registry.
Returns
-------
:class:`dict`
Registered cache.
Examples
--------
>>> cache_registry = CacheRegistry()
>>> cache_a = cache_registry.register_cache("Cache A")
>>> cache_a["Foo"] = "Bar"
>>> cache_b = cache_registry.register_cache("Cache B")
>>> cache_b["John"] = "Doe"
>>> cache_b["Luke"] = "Skywalker"
>>> print(cache_registry)
{'Cache A': '1 item(s)', 'Cache B': '2 item(s)'}
"""
self._registry[name] = {}
return self._registry[name]
[docs]
def unregister_cache(self, name: str):
"""
Unregister cache with given name in the registry.
Parameters
----------
name
Cache name in the registry.
Notes
-----
- The cache is cleared before being unregistered.
Examples
--------
>>> cache_registry = CacheRegistry()
>>> cache_a = cache_registry.register_cache("Cache A")
>>> cache_a["Foo"] = "Bar"
>>> cache_b = cache_registry.register_cache("Cache B")
>>> cache_b["John"] = "Doe"
>>> cache_b["Luke"] = "Skywalker"
>>> print(cache_registry)
{'Cache A': '1 item(s)', 'Cache B': '2 item(s)'}
>>> cache_registry.unregister_cache("Cache B")
>>> print(cache_registry)
{'Cache A': '1 item(s)'}
>>> print(cache_b)
{}
"""
self.clear_cache(name)
del self._registry[name]
[docs]
def clear_cache(self, name: str):
"""
Clear the cache with given name.
Parameters
----------
name
Cache name in the registry.
Examples
--------
>>> cache_registry = CacheRegistry()
>>> cache_a = cache_registry.register_cache("Cache A")
>>> cache_a["Foo"] = "Bar"
>>> print(cache_registry)
{'Cache A': '1 item(s)'}
>>> cache_registry.clear_cache("Cache A")
>>> print(cache_registry)
{'Cache A': '0 item(s)'}
"""
self._registry[name].clear()
[docs]
def clear_all_caches(self):
"""
Clear all the caches in the registry.
Examples
--------
>>> cache_registry = CacheRegistry()
>>> cache_a = cache_registry.register_cache("Cache A")
>>> cache_a["Foo"] = "Bar"
>>> cache_b = cache_registry.register_cache("Cache B")
>>> cache_b["John"] = "Doe"
>>> cache_b["Luke"] = "Skywalker"
>>> print(cache_registry)
{'Cache A': '1 item(s)', 'Cache B': '2 item(s)'}
>>> cache_registry.clear_all_caches()
>>> print(cache_registry)
{'Cache A': '0 item(s)', 'Cache B': '0 item(s)'}
"""
for key in self._registry:
self.clear_cache(key)
CACHE_REGISTRY: CacheRegistry = CacheRegistry()
"""
*Colour* cache registry referencing all the caches used for repetitive or long
processes.
"""
[docs]
def handle_numpy_errors(**kwargs: Any) -> Callable:
"""
Decorate a function to handle *Numpy* errors.
Other Parameters
----------------
kwargs
Keywords arguments.
Returns
-------
Callable
References
----------
:cite:`Kienzle2011a`
Examples
--------
>>> import numpy
>>> @handle_numpy_errors(all="ignore")
... def f():
... 1 / numpy.zeros(3)
>>> f()
"""
keyword_arguments = kwargs
def wrapper(function: Callable) -> Callable:
"""Wrap given function wrapper."""
@functools.wraps(function)
def wrapped(*args: Any, **kwargs: Any) -> Any:
"""Wrap given function."""
with np.errstate(**keyword_arguments):
return function(*args, **kwargs)
return wrapped
return wrapper
ignore_numpy_errors = handle_numpy_errors(all="ignore")
raise_numpy_errors = handle_numpy_errors(all="raise")
print_numpy_errors = handle_numpy_errors(all="print")
warn_numpy_errors = handle_numpy_errors(all="warn")
[docs]
def ignore_python_warnings(function: Callable) -> Callable:
"""
Decorate a function to ignore *Python* warnings.
Parameters
----------
function
Function to decorate.
Returns
-------
Callable
Examples
--------
>>> @ignore_python_warnings
... def f():
... warnings.warn("This is an ignored warning!")
>>> f()
"""
@functools.wraps(function)
def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Wrap given function."""
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return function(*args, **kwargs)
return wrapper
[docs]
def attest(condition: bool | DTypeBoolean, message: str = ""):
"""
Provide the `assert` statement functionality without being disabled by
optimised Python execution.
Parameters
----------
condition
Condition to attest/assert.
message
Message to display when the assertion fails.
"""
if not condition:
raise AssertionError(message)
[docs]
def batch(sequence: Sequence, k: int | Literal[3] = 3) -> Generator:
"""
Return a batch generator from given sequence.
Parameters
----------
sequence
Sequence to create batches from.
k
Batch size.
Yields
------
Generator
Batch generator.
Examples
--------
>>> batch(tuple(range(10)), 3) # doctest: +ELLIPSIS
<generator object batch at 0x...>
"""
for i in range(0, len(sequence), k):
yield sequence[i : i + k]
_MULTIPROCESSING_ENABLED: bool = True
"""*Colour* multiprocessing state."""
[docs]
class disable_multiprocessing:
"""
Define a context manager and decorator to temporarily disabling *Colour*
multiprocessing state.
"""
def __enter__(self) -> disable_multiprocessing:
"""
Disable *Colour* multiprocessing state upon entering the context
manager.
"""
global _MULTIPROCESSING_ENABLED # noqa: PLW0603
_MULTIPROCESSING_ENABLED = False
return self
def __exit__(self, *args: Any):
"""
Enable *Colour* multiprocessing state upon exiting the context
manager.
"""
global _MULTIPROCESSING_ENABLED # noqa: PLW0603
_MULTIPROCESSING_ENABLED = True
def __call__(self, function: Callable) -> Callable:
"""Call the wrapped definition."""
@functools.wraps(function)
def wrapper(*args: Any, **kwargs: Any) -> Any:
"""Wrap given function."""
with self:
return function(*args, **kwargs)
return wrapper
def _initializer(kwargs: Any):
"""
Initialize a multiprocessing pool.
It is used to ensure that processes on *Windows* inherit correctly from the
current domain-range scale.
Parameters
----------
kwargs
Initialisation arguments.
"""
# NOTE: No coverage information is available as this code is executed in
# sub-processes.
import colour.utilities.array # pragma: no cover
colour.utilities.array._DOMAIN_RANGE_SCALE = kwargs.get(
"scale", "reference"
) # pragma: no cover
import colour.algebra.common # pragma: no cover
colour.algebra.common._SDIV_MODE = kwargs.get(
"sdiv_mode", "Ignore Zero Conversion"
) # pragma: no cover
colour.algebra.common._SPOW_ENABLED = kwargs.get(
"spow_enabled", True
) # pragma: no cover
[docs]
@contextmanager
def multiprocessing_pool(*args: Any, **kwargs: Any) -> Generator:
"""
Define a context manager providing a multiprocessing pool.
Other Parameters
----------------
args
Arguments.
kwargs
Keywords arguments.
Yields
------
Generator
Multiprocessing pool.
Examples
--------
>>> from functools import partial
>>> def _add(a, b):
... return a + b
>>> with multiprocessing_pool() as pool:
... pool.map(partial(_add, b=2), range(10))
... # doctest: +SKIP
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
"""
from colour.algebra import get_sdiv_mode, is_spow_enabled
from colour.utilities import get_domain_range_scale
class _DummyPool:
"""
A dummy multiprocessing pool that does not perform multiprocessing.
Other Parameters
----------------
args
Arguments.
kwargs
Keywords arguments.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
pass
def map(self, func, iterable, chunksize=None): # noqa: ARG002
"""Apply given function to each element of given iterable."""
return [func(a) for a in iterable]
def terminate(self):
"""Terminate the process."""
kwargs["initializer"] = _initializer
kwargs["initargs"] = (
{
"scale": get_domain_range_scale(),
"sdiv_mode": get_sdiv_mode(),
"spow_enabled": is_spow_enabled(),
},
)
pool_factory: Callable
if _MULTIPROCESSING_ENABLED:
import multiprocessing
pool_factory = multiprocessing.Pool
else:
pool_factory = _DummyPool
pool = pool_factory(*args, **kwargs)
try:
yield pool
finally:
pool.terminate()
[docs]
def is_iterable(a: Any) -> bool:
"""
Return whether given variable :math:`a` is iterable.
Parameters
----------
a
Variable :math:`a` to check the iterability.
Returns
-------
:class:`bool`
Whether variable :math:`a` is iterable.
Examples
--------
>>> is_iterable([1, 2, 3])
True
>>> is_iterable(1)
False
"""
return isinstance(a, str) or (bool(getattr(a, "__iter__", False)))
[docs]
def is_numeric(a: Any) -> bool:
"""
Return whether given variable :math:`a` is a :class:`Real`-like
variable.
Parameters
----------
a
Variable :math:`a` to test.
Returns
-------
:class:`bool`
Whether variable :math:`a` is a :class:`Real`-like variable.
Examples
--------
>>> is_numeric(1)
True
>>> is_numeric((1,))
False
"""
return isinstance(
a,
(
int,
float,
complex,
np.integer,
np.int8,
np.int8,
np.int16,
np.int32,
np.int64,
np.uint8,
np.uint16,
np.uint32,
np.uint64,
np.floating,
np.float16,
np.float32,
np.float64,
np.csingle,
np.cdouble,
), # pyright: ignore
)
[docs]
def is_integer(a: Any) -> bool:
"""
Return whether given variable :math:`a` is an :class:`numpy.integer`-like
variable under given threshold.
Parameters
----------
a
Variable :math:`a` to test.
Returns
-------
:class:`bool`
Whether variable :math:`a` is an :class:`numpy.integer`-like variable.
Notes
-----
- The determination threshold is defined by the
:attr:`colour.algebra.common.THRESHOLD_INTEGER` attribute.
Examples
--------
>>> is_integer(1)
True
>>> is_integer(1.01)
False
"""
return abs(a - np.around(a)) <= THRESHOLD_INTEGER
[docs]
def is_sibling(element: Any, mapping: Mapping) -> bool:
"""
Return whether given element type is present in given mapping types.
Parameters
----------
element
Element to check whether its type is present in the mapping types.
mapping
Mapping types.
Returns
-------
:class:`bool`
Whether given element type is present in given mapping types.
"""
return isinstance(element, tuple({type(element) for element in mapping.values()}))
[docs]
def filter_kwargs(function: Callable, **kwargs: Any) -> dict:
"""
Filter keyword arguments incompatible with the given function signature.
Parameters
----------
function
Callable to filter the incompatible keyword arguments.
Other Parameters
----------------
kwargs
Keywords arguments.
Returns
-------
dict
Filtered keyword arguments.
Examples
--------
>>> def fn_a(a):
... return a
>>> def fn_b(a, b=0):
... return a, b
>>> def fn_c(a, b=0, c=0):
... return a, b, c
>>> fn_a(1, **filter_kwargs(fn_a, b=2, c=3))
1
>>> fn_b(1, **filter_kwargs(fn_b, b=2, c=3))
(1, 2)
>>> fn_c(1, **filter_kwargs(fn_c, b=2, c=3))
(1, 2, 3)
"""
kwargs = copy(kwargs)
try:
args = list(inspect.signature(function).parameters.keys())
except ValueError: # pragma: no cover
return {}
for key in set(kwargs.keys()) - set(args):
kwargs.pop(key)
return kwargs
[docs]
def filter_mapping(mapping: Mapping, names: str | Sequence[str]) -> dict:
"""
Filter given mapping with given names.
Parameters
----------
mapping
Mapping to filter.
names
Name for given mapping elements or a list of names.
Returns
-------
dict
Filtered mapping elements.
Notes
-----
- If the mapping passed is a :class:`colour.utilities.CanonicalMapping`
class instance, then the lower, slugified and canonical keys are also
used for matching.
- To honour the filterers ordering, the return value is a :class:`dict`
class instance.
Examples
--------
>>> class Element:
... pass
>>> mapping = {
... "Element A": Element(),
... "Element B": Element(),
... "Element C": Element(),
... "Not Element C": Element(),
... }
>>> filter_mapping(mapping, "Element A") # doctest: +ELLIPSIS
{'Element A': <colour.utilities.common.Element object at 0x...>}
"""
def filter_mapping_with_name(mapping: Mapping, name: str) -> dict:
"""
Filter given mapping with given name.
Parameters
----------
mapping
Mapping to filter.
name
Name for given mapping elements.
Returns
-------
dict
Filtered mapping elements.
"""
keys = list(mapping.keys())
if isinstance(mapping, CanonicalMapping):
keys += list(mapping.lower_keys())
keys += list(mapping.slugified_keys())
keys += list(mapping.canonical_keys())
elements = [mapping[key] for key in keys if name == key]
lookup = Lookup(mapping)
return {lookup.first_key_from_value(element): element for element in elements}
names = [str(names)] if isinstance(names, str) else names
filtered_mapping = {}
for filterer in names:
filtered_mapping.update(filter_mapping_with_name(mapping, filterer))
return filtered_mapping
[docs]
def first_item(a: Iterable) -> Any:
"""
Return the first item of given iterable.
Parameters
----------
a
Iterable to get the first item from.
Returns
-------
:class:`object`
Raises
------
:class:`StopIteration`
If the iterable is empty.
Examples
--------
>>> a = range(10)
>>> first_item(a)
0
"""
return next(iter(a))
[docs]
def copy_definition(definition: Callable, name: str | None = None) -> Callable:
"""
Copy a definition using the same code, globals, defaults, closure, and
name.
Parameters
----------
definition
Definition to be copied.
name
Optional definition copy name.
Returns
-------
Callable
Definition copy.
"""
copy = types.FunctionType(
definition.__code__,
definition.__globals__,
str(name or definition.__name__),
definition.__defaults__,
definition.__closure__,
)
copy.__dict__.update(definition.__dict__)
return copy
[docs]
@functools.cache
def validate_method(
method: str,
valid_methods: tuple,
message: str = '"{0}" method is invalid, it must be one of {1}!',
as_lowercase: bool = True,
) -> str:
"""
Validate whether given method exists in the given valid methods and
optionally returns the method lower cased.
Parameters
----------
method
Method to validate.
valid_methods
Valid methods.
message
Message for the exception.
as_lowercase
Whether to convert the given method to lower case or not.
Returns
-------
:class:`str`
Method optionally lower cased.
Raises
------
:class:`ValueError`
If the method does not exist.
Examples
--------
>>> validate_method("Valid", ("Valid", "Yes", "Ok"))
'valid'
>>> validate_method("Valid", ("Valid", "Yes", "Ok"), as_lowercase=False)
'Valid'
"""
valid_methods = tuple([str(valid_method) for valid_method in valid_methods])
method_lower = method.lower()
if method_lower not in [valid_method.lower() for valid_method in valid_methods]:
raise ValueError(message.format(method, valid_methods))
return method_lower if as_lowercase else method
T = TypeVar("T")
[docs]
def optional(value: T | None, default: T) -> T:
"""
Handle optional argument value by providing a default value.
Parameters
----------
value
Optional argument value.
default
Default argument value if ``value`` is *None*.
Returns
-------
T
Argument value.
Examples
--------
>>> optional("Foo", "Bar")
'Foo'
>>> optional(None, "Bar")
'Bar'
"""
if value is None:
return default
else:
return value
[docs]
def slugify(object_: Any, allow_unicode: bool = False) -> str:
"""
Generate a *SEO* friendly and human-readable slug from given object.
Convert to ASCII if ``allow_unicode`` is *False*. Convert spaces or
repeated dashes to single dashes. Remove characters that aren't
alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip
leading and trailing whitespace, dashes, and underscores.
Parameters
----------
object_
Object to convert to a slug.
allow_unicode
Whether to allow unicode characters in the generated slug.
Returns
-------
:class:`str`
Generated slug.
References
----------
:cite:`DjangoSoftwareFoundation2022`
Examples
--------
>>> slugify(" Jack & Jill like numbers 1,2,3 and 4 and silly characters ?%.$!/")
'jack-jill-like-numbers-123-and-4-and-silly-characters'
"""
value = str(object_)
if allow_unicode:
value = unicodedata.normalize("NFKC", value)
else:
value = (
unicodedata.normalize("NFKD", value)
.encode("ascii", "ignore")
.decode("ascii")
)
value = re.sub(r"[^\w\s-]", "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
if is_xxhash_installed():
import xxhash
from colour.utilities.documentation import is_documentation_building
int_digest = xxhash.xxh3_64_intdigest
if is_documentation_building(): # pragma: no cover
import array
[docs]
def int_digest(
args: ( # noqa: ARG001
str | bytes | bytearray | memoryview | array.ArrayType[int]
),
seed: int = 0, # noqa: ARG001
) -> int:
"""
Generate an integer digest for given argument using *xxhash* if
available or falling back to :func:`hash` if not.
Parameters
----------
args
Argument to generate the int digest of.
seed
Seed used to alter result predictably.
Returns
-------
:class:`int`
Integer digest.
"""
return -1
else:
int_digest = hash # pyright: ignore # pragma: no cover