from __future__ import annotations
import functools
import importlib
import json
from collections import OrderedDict, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Tuple, Type, Union
from datalookup.exceptions import FieldNotFound, FilterDoesNotExist, ObjectNotFound
from datalookup.fields import DatasetField, Field, get_field_class
from datalookup.utils import LOOKUP_SEP, REPR_OUTPUT_SIZE, hash_list
if TYPE_CHECKING: # pragma: no cover
from datalookup.lookup import Lookup
__all__ = ["Dataset", "Node", "__version__", "version_tuple"]
try:
from ._version import version as __version__
from ._version import version_tuple
except ImportError: # pragma: no cover
# broken installation, we don't even try
# unknown only works because we do poor mans version compare
__version__ = "unknown"
version_tuple = (0, 0, "unknown") # type:ignore[assignment]
@dataclass
class Filter:
"""Store information to retrieve nodes when filtering"""
value: str
field_name: str
lookup_name: str
[docs]class Dataset:
"""Base class that store a set of nodes that can be manipulated and filtered"""
def __init__(self, data: Union[dict, list]) -> None:
self.nodes: list[Node] = []
self._populate(data)
def __repr__(self):
data = list(self[: REPR_OUTPUT_SIZE + 1])
if len(data) > REPR_OUTPUT_SIZE:
data[-1] = "...(remaining elements truncated)..."
return "<{} {}>".format(self.__class__.__name__, repr(data))
def __eq__(self, __o: object) -> bool:
if not isinstance(__o, Dataset):
return False
if set(self.nodes) != set(__o.nodes):
return False
return True
def __hash__(self) -> int:
return hash_list(self.nodes)
def __len__(self):
return len(self.nodes)
def __iter__(self) -> Iterator[Node]:
for node in self.nodes:
yield node
def __getitem__(self, indices: Union[int, slice]) -> Union[Node, list[Node]]:
"""Retrieve an item or slice from the set of results."""
if not isinstance(indices, (int, slice)):
raise TypeError(
"Dataset indices must be integers or slices, not {}.".format(
type(indices).__name__
)
)
if (isinstance(indices, int) and indices < 0) or (
isinstance(indices, slice)
and (
(indices.start is not None and indices.start < 0)
or (indices.stop is not None and indices.stop < 0)
)
):
raise ValueError("Negative indexing is not supported.")
return list(self)[indices]
def __or__(self, other: Dataset):
"""Combine two or more dataset"""
dataset = self.distinct()
for node in other.nodes:
if node in dataset:
continue
dataset.nodes.append(node)
return dataset
[docs] @classmethod
def from_json(cls, file: Union[str, Path]):
"""Create a Dataset based on a json file"""
with open(str(file), "r") as fp:
content = json.load(fp)
return cls(content)
[docs] @classmethod
def from_nodes(cls, nodes: list[Node]):
"""Create a Dataset based on a list of Node"""
if not isinstance(nodes, list):
raise TypeError("'nodes' must be a list")
if not all([isinstance(n, Node) for n in nodes]):
raise TypeError("'nodes' must be a list of Node")
dataset = cls([])
dataset.nodes = nodes
return dataset
def _populate(self, data: Union[dict, list]) -> None:
"""Populate the dataset with Nodes"""
if isinstance(data, dict):
self.nodes.append(Node(data))
elif isinstance(data, list):
if not all([isinstance(d, dict) for d in data]):
raise ValueError(
"Cannot create a Dataset based on a list where all the "
"element are not a dictionary"
)
else:
self.nodes = [Node(d) for d in data]
else:
raise ValueError("'data' must be of type 'dict' or 'list'")
[docs] def values(self) -> list:
"""Returns a list of dictionaries instead of a Dataset."""
values = []
for node in self:
values.append(node.values())
return values
@staticmethod
def _filter(nodes: list[Node], **kwargs: Any) -> Dataset:
data: list[Node] = []
for node in nodes:
try:
filtered_node = node.filter(**kwargs)
except ObjectNotFound:
continue
else:
data.append(filtered_node)
# Update the current dataset
return Dataset.from_nodes(data)
[docs] def filter(self, **kwargs) -> Dataset:
"""
Returns a new Dataset containing objects that match the given filter parameters.
The filter parameters (``**kwargs``) should be in the following format
``field__lookuptype=value``. Example::
data = [
{
"author": "J. K. Rowling",
"books": {
"genre": "Fantasy"
}
}
]
books = Dataset(data)
books.filter(author__exact="J. K. Rowling")
books.filter(books__genre__in=["Fantasy"])
"""
return self._filter(self.nodes, **kwargs)
def _search_related_node(self, name: str, nodes: list[Node]) -> list[Node]:
"""
Search in the dataset a related node with the given name. This method
is recursive and will search in every dataset. Return a a list of Nodes
"""
related_node: list[Node] = []
fields_name = name.split(".")
for node in nodes:
field = node.get_field(fields_name[0])
field_nodes = getattr(field, "related_node", [])
if len(fields_name) == 1:
related_node.extend(field_nodes)
else:
related_node.extend(
self._search_related_node(".".join(fields_name[1:]), field_nodes)
)
return related_node
[docs] def exclude(self, **kwargs) -> Dataset:
"""
Returns a new Dataset containing objects that do not match the given filter
parameters.
The filter parameters (``**kwargs``) should be in the following format
``field__lookuptype=value``. Example::
data = [
{
"author": "J. K. Rowling",
"books": {
"genre": "Fantasy"
}
}
]
books = Dataset(data)
books.exclude(author__exact="J. K. Rowling")
books.exclude(books__genre__in=["Fantasy"])
"""
if not kwargs:
return self
data: list[Node] = []
for node in self:
try:
node.filter(**kwargs)
except ObjectNotFound:
data.append(node)
return Dataset.from_nodes(data)
[docs] def distinct(self) -> Dataset:
"""Returns a new Dataset without duplicate entry."""
distinct_nodes = []
for node in self.nodes:
if node in distinct_nodes:
continue
distinct_nodes.append(node)
return Dataset.from_nodes(distinct_nodes)
[docs] def on_cascade(self) -> Dataset:
"""
Must be followed by :meth:`filter()`, :meth:`exclude()` or other filtering
methods (like books.on_cascade().filter(...)). This method will not only
filter the current dataset but also the related field dataset. Example::
# Filter the author but also the books of the author
authors = books.on_cascade().filter(
books__name="Harry Potter and the Chamber of Secrets"
)
"""
for node in self.nodes:
node.activate_on_cascade()
return self
[docs]class Node:
"""
Base class that represent a dictionary where the value of a
key is a specific Field
"""
def __init__(self, data: dict) -> None:
self.fields: OrderedDict[str, Field] = OrderedDict()
self.on_cascade = False
self._populate(data)
def __eq__(self, __o: object) -> bool:
if not isinstance(__o, Node):
return False
if set(self.fields.values()) != set(__o.fields.values()):
return False
return True
def __hash__(self) -> int:
hash_list = []
for element in self.fields.values():
hash_list.append(hash(element))
return hash("-".join([str(i) for i in sorted(hash_list)]))
def __getattr__(self, name: str) -> Any:
try:
return self.get_field(name).get_value()
except FieldNotFound:
raise AttributeError(f"{name} attribute does not exist")
def __repr__(self):
return "<{}: {}>".format(self.__class__.__name__, self)
def __str__(self):
try:
field_value = list(self.fields.values())[0].get_value()
except IndexError:
field_value = "None"
return "{} object ({})".format(self.__class__.__name__, field_value)
def activate_on_cascade(self):
"""Activate filtering on cascade"""
self.on_cascade = True
for field in self.fields.values():
for node in getattr(field, "related_node", []):
node.activate_on_cascade()
def _populate(self, data: dict) -> None:
"""Populate the node with the given data"""
if not isinstance(data, dict):
raise ValueError("'data' must be of type dict")
for key, value in data.items():
self.fields[key] = get_field_class(key, value)
def get_field(self, name: str) -> Field:
"""Return a field of the current node"""
for field in self.fields.values():
if field.get_name() == name:
return field
raise FieldNotFound("{} field not found in {}".format(name, self.values()))
def get_lookup(self, field: Field, lookup_name: str) -> Type[Lookup]:
"""Return the lookup class of a field"""
lookup = field.get_lookup(lookup_name)
if not lookup:
raise LookupError(
"'{}' lookup not found in: {}".format(lookup_name, field.class_lookups)
)
return lookup
@staticmethod
def get_node_filters(node: Node, parent: str) -> set[str]:
"""Return a set of filters for the current node"""
filters = set()
for child_filter in node.get_filters():
filters.add(f"{parent}{LOOKUP_SEP}{child_filter}")
return filters
@functools.lru_cache(maxsize=None)
def get_filters(self) -> set[str]:
filters = set()
for key, field in self.fields.items():
filters.add(key)
for node in getattr(field, "related_node", []):
filters = filters.union(self.get_node_filters(node, key))
return filters
def get_match_filter(self, param: str) -> str:
"""Return a filter if the value starts with an existing filter"""
partition = {}
for filter in self.get_filters():
rpart = param.rpartition(filter)
if not rpart[1] or rpart[0]:
continue
partition.update({rpart[1]: len(rpart[1].split(LOOKUP_SEP))})
return max(partition, key=lambda k: partition[k])
def get_lookup_name(self, param: str, filter: str) -> str:
"""Return the lookup based on a param and it's filter"""
_, _, lookup_name = param.rpartition(filter)
if not lookup_name:
lookup_name = "exact"
else:
lookup_name = lookup_name.strip(LOOKUP_SEP)
return lookup_name
def _parser_filters(self, **kwargs: Any) -> Tuple[list[Filter], dict]:
"""
Return a list of Filter that are use to filter the current
instance and a dictionary of filters for it's childs.
"""
current_filters = []
next_filters: defaultdict[str, dict] = defaultdict(dict)
for param, value in kwargs.items():
filter = self.get_match_filter(param)
splitted_filter = filter.split(LOOKUP_SEP)
if len(splitted_filter) == 1:
lookup_name = self.get_lookup_name(param, filter)
current_filters.append(Filter(value, splitted_filter[0], lookup_name))
else:
# Remove the parent node from the kwargs attribute. This is for
# related next filtering
parent_node = splitted_filter[0]
new_param = param.replace(parent_node + LOOKUP_SEP, "", 1)
next_filters[parent_node].update({new_param: value})
return current_filters, next_filters
[docs] def filter(self, **kwargs: Any) -> Node:
"""
Returns the current :class:`Node` or raise an ``ObjectNotFound`` exception.
The filter parameters (``**kwargs``) should be in the following format
``field__lookuptype=value``. Example::
data = {
"author": "J. K. Rowling",
"books": {
"genre": "Fantasy"
}
}
node = Node(data)
node.filter(author__exact="J. K. Rowling")
node.filter(books__genre__in=["Fantasy"])
"""
# If there is no filter given. There is no need to proceed.
# We can just return the actual node
if not kwargs:
return self
filters = self.get_filters()
for key in kwargs.keys():
if not any(key.startswith(f) for f in filters):
raise FilterDoesNotExist("{} filter does not exist".format(key))
# Get attribute filter. This is to filter current instance
current_filters, next_filters = self._parser_filters(**kwargs)
# Check if one of the field of this node correspond to the current filter
for filter in current_filters:
field = self.get_field(filter.field_name)
lookup_class = self.get_lookup(field, filter.lookup_name)
lookup = lookup_class(field.get_value(), filter.value)
if not lookup.resolve():
raise ObjectNotFound()
for parent, new_params in next_filters.items():
field = self.get_field(parent)
node_or_dataset = field.get_value()
data = node_or_dataset.filter(**new_params)
if isinstance(field, DatasetField):
if len(data) == 0:
raise ObjectNotFound()
if self.on_cascade is True:
setattr(field, field.get_name(), data)
return self
[docs] def values(self) -> dict:
"""
Return a dictionary, with the keys corresponding to the attribute
names of the node object.
"""
data = {}
for key, field in self.fields.items():
data[key] = field.deserialize()
return data
# Import lookup here to avoid import recursion and load all fields lookup
importlib.import_module("datalookup.lookup")