#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see .
__author__ = "João Magalhães "
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import re
import copy
import math
import json
import types
import inspect
import logging
import datetime
from . import meta
from . import util
from . import legacy
from . import common
from . import typesf
from . import observer
from . import validation
from . import exceptions
ITERABLES = tuple(list(legacy.STRINGS) + [dict])
""" The sequence defining the complete set of valid types
for direct evaluation, instead of indirect (recursive)
evaluation, this is required to avoid miss behavior """
RE = lambda v: [i for i in v if not i == ""]
""" Simple lambda function that removes any
empty element from the provided list values """
BUILDERS = {
legacy.UNICODE: lambda v: (
v.decode("utf-8") if isinstance(v, legacy.BYTES) else legacy.UNICODE(v)
),
list: lambda v: (
RE(v)
if isinstance(v, list)
else (json.loads(v) if isinstance(v, legacy.UNICODE) else RE([v]))
),
dict: lambda v: json.loads(v) if isinstance(v, legacy.UNICODE) else dict(v),
bool: lambda v: v if isinstance(v, bool) else not v in ("", "0", "false", "False"),
}
""" The map associating the various types with the
custom builder functions to be used when applying
the types function, this is relevant for the built-in
types that are meant to avoid using the default constructor """
BUILDERS_META = dict(
text=BUILDERS[legacy.UNICODE],
country=BUILDERS[legacy.UNICODE],
longtext=BUILDERS[legacy.UNICODE],
map=BUILDERS[dict],
longmap=BUILDERS[dict],
date=lambda v: float(v),
datetetime=lambda v: float(v),
file=None,
)
""" Map equivalent to the builders map but appliable
for meta based type naming, this map should be used as
an extension to the base builder map """
METAS = dict(
text=lambda v, d, c=None: v,
enum=lambda v, d, c=None: d["enum"].get(v, None),
list=lambda v, d, c=None: json.dumps(
v, ensure_ascii=False, cls=c._encoder() if c else None
),
map=lambda v, d, c=None: json.dumps(
v, ensure_ascii=False, cls=c._encoder() if c else None
),
longmap=lambda v, d, c=None: json.dumps(
v, ensure_ascii=False, cls=c._encoder() if c else None
),
date=lambda v, d, c=None: datetime.datetime.utcfromtimestamp(float(v)).strftime(
"%d %b %Y"
),
datetime=lambda v, d, c=None: datetime.datetime.utcfromtimestamp(float(v)).strftime(
"%d %b %Y %H:%M:%S"
),
)
""" The map that contains the various mapping functions
for the meta types that may be described for a field under
the current model specification, the resulting value for
each of these functions should preferably be a string """
TYPE_DEFAULTS = {
legacy.BYTES: None,
legacy.UNICODE: None,
int: None,
float: None,
bool: False,
list: lambda: [],
dict: lambda: {},
}
""" The default values to be set when a type
conversion fails for the provided string value
the resulting value may be returned when a validation
fails an so it must be used carefully """
TYPE_META = {
typesf.File: "file",
typesf.Files: "files",
typesf.Reference: "reference",
typesf.References: "references",
legacy.BYTES: "string",
legacy.UNICODE: "string",
int: "number",
float: "float",
bool: "bool",
list: "list",
dict: "map",
}
""" Dictionary that defines the default mapping for each
of the base data types against the associated default meta
values for each for them, these meta type values are going
to be used mostly for presentation purposes """
TYPE_REFERENCES = (typesf.Reference, typesf.References)
""" The various data types that are considered to be references
so that they are lazy loaded from the data source, these kind
of types should be compliant to a common interface so that they
may be used "blindly" from an external entity """
REVERSE = dict(
descending="ascending",
ascending="descending",
)
""" The reverse order dictionary that maps a certain
order direction (as a string) with the opposite one
this may be used to "calculate" the reverse value """
DIRTY_PARAMS = ("map", "rules", "meta", "build", "skip", "limit", "sort", "raise_e")
""" The set containing the complete set of parameter names for
the parameters that are considered to be dirty and that should
be cleaned from any query operation on the data source, otherwise
serious consequences may occur """
OPERATORS = {
"eq": None,
"equals": None,
"ne": "$ne",
"not_equals": "$ne",
"in": "$in",
"nin": "$nin",
"not_in": "$nin",
"like": "$regex",
"likei": "$regex",
"llike": "$regex",
"llikei": "$regex",
"rlike": "$regex",
"rlikei": "$regex",
"gt": "$gt",
"greater": "$gt",
"gte": "$gte",
"greater_equal": "$gte",
"lt": "$lt",
"lesser": "$lt",
"lte": "$lte",
"lesser_equal": "$lte",
"null": None,
"is_null": None,
"not_null": "$ne",
"is_not_null": "$ne",
"contains": "$all",
}
""" The map containing the mapping association between the
normalized version of the operators and the infra-structure
specific value for each of this operations, note that some
of the values don't have a valid mapping for this operations
the operator must be ignored and not used explicitly """
VALUE_METHODS = {
"in": lambda v, t: [t(v) for v in v.split(";")],
"not_in": lambda v, t: [t(v) for v in v.split(";")],
"like": lambda v, t: "^.*" + legacy.UNICODE(re.escape(v)) + ".*$",
"likei": lambda v, t: "^.*" + legacy.UNICODE(re.escape(v)) + ".*$",
"llike": lambda v, t: "^.*" + legacy.UNICODE(re.escape(v)) + "$",
"llikei": lambda v, t: "^.*" + legacy.UNICODE(re.escape(v)) + "$",
"rlike": lambda v, t: "^" + legacy.UNICODE(re.escape(v)) + ".*$",
"rlikei": lambda v, t: "^" + legacy.UNICODE(re.escape(v)) + ".*$",
"null": lambda v, t: None,
"is_null": lambda v, t: None,
"not_null": lambda v, t: None,
"is_not_null": lambda v, t: None,
"contains": lambda v, t: [t(v) for v in v.split(";")],
}
""" Map that associates each of the normalized operations with
an inline function that together with the data type maps the
the base string based value into the target normalized value """
INSENSITIVE = {"likei": True, "llikei": True, "rlikei": True}
""" The map that associates the various operators with the boolean
values that define if an insensitive base search should be used
instead of the "typical" sensitive search """
EXTRA_CLS = []
""" The sequence that will contain the complete set of extra classes
(mixins) to add base functionality to the Model instances """
if legacy.PYTHON_ASYNC_GEN:
from . import model_a
EXTRA_CLS.append(model_a.ModelAsync)
BUILDERS.update(BUILDERS_META)
class Model(legacy.with_meta(meta.Ordered, observer.Observable, *EXTRA_CLS)):
"""
Abstract model class from which all the models should
directly or indirectly inherit. Should provide the
basic infra-structure for the persistence of data using
a plain key value storage engine.
The data should always be store in a dictionary oriented
structure while it's not persisted in the database.
"""
_extra_methods = []
""" Special sequence of tuples (names and functions) that is
used at instance creation time to bind the provided methods
to the newly created instance, this is required for the dynamic
addition of instance methods to models """
def __new__(cls, *args, **kwargs):
instance = super(Model, cls).__new__(cls)
instance.__dict__["_events"] = {}
instance.__dict__["_extras"] = []
instance.__dict__["model"] = {}
instance.__dict__["owner"] = None
instance.__dict__["ref"] = None
return instance
def __init__(self, model=None, **kwargs):
fill = kwargs.pop("fill", True)
model = model or {}
if fill:
model = self.__class__.fill(model)
self.__dict__["model"] = model
self.__dict__["owner"] = common.base().APP or None
self.__dict__["ref"] = kwargs.pop("ref", None)
for name, value in kwargs.items():
setattr(self, name, value)
for name, method in self.__class__._extra_methods:
if legacy.PYTHON_3:
bound_method = types.MethodType(method, self)
else:
bound_method = types.MethodType(method, self, self.__class__)
setattr(self, name, bound_method)
observer.Observable.__init__(self)
def __str__(self):
cls = self.__class__
default = cls.default()
if not default:
return cls._name()
if not default in self.model:
return cls._name()
value = self.model[default]
if value == None:
value = ""
is_string = legacy.is_str(value)
return value if is_string else str(value)
def __unicode__(self):
cls = self.__class__
default = cls.default()
if not default:
return cls._name()
value = self.model[default]
if value == None:
value = ""
is_unicode = legacy.is_unicode(value)
return value if is_unicode else legacy.UNICODE(value)
def __getattribute__(self, name):
try:
model = object.__getattribute__(self, "model")
if name in model:
return model[name]
except AttributeError:
pass
cls = object.__getattribute__(self, "__class__")
definition = cls.definition()
if name in definition:
raise AttributeError("attribute '%s' is not set" % name)
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
is_base = name in self.__dict__
if is_base:
self.__dict__[name] = value
else:
self.model[name] = value
def __delattr__(self, name):
try:
model = object.__getattribute__(self, "model")
if name in model:
del model[name]
except AttributeError:
pass
def __len__(self):
return self.model.__len__()
def __getitem__(self, key):
return self.model.__getitem__(key)
def __setitem__(self, key, value):
self.model.__setitem__(key, value)
def __delitem__(self, key):
self.model.__delitem__(key)
def __contains__(self, item):
return self.model.__contains__(item)
def __nonzero__(self):
return len(self.model) > 0
def __bool__(self):
return len(self.model) > 0
@classmethod
def new(
cls,
model=None,
form=True,
safe=True,
build=False,
fill=True,
new=True,
**kwargs
):
"""
Creates a new instance of the model applying the provided model
map to it after the instantiation of the class.
The optional form flag controls if the form context data should be
used in the case of an invalid/unset model value. This is considered
unsafe in many contexts.
The optional safe flag makes sure that the model attributes marked
as safe are going to be removed and are not valid for apply.
The optional build flag may be used to define if the build operations
that may be defined by the user on override should be called for
this instance of entity (should be used only on retrieval).
The optional fill flag allows control on whenever the model should be
filled with default values before the apply operation is performed.
In order to make sure the resulting instance is safe for creation only
the new flag may be used as this will make sure that the proper identifier
attributes are not set in the instance after the apply operation is done.
:type model: Dictionary
:param model: The map containing the model that is going to be applied
to the new instance to be created.
:type form: bool
:param form: If in case the provided model is not valid (unset) the model
should be retrieved from the current model in context, this is an unsafe
operation as it may create unwanted behavior (use it carefully).
:type safe: bool
:param safe: If the attributes marked as safe in the model definition
should be removed from the instance after the apply operation.
:type build: bool
:param build: If the "custom" build operation should be performed after
the apply operation is performed so that new custom attributes may be
injected into the resulting instance.
:type fill: bool
:param fill: If the various attributes of the model should be "filled"
with default values (avoiding empty values).
:type new: bool
:param new: In case this value is valid the resulting instance is expected
to be considered as new meaning that no identifier attributes are set.
:rtype: Model
:return: The new model instance resulting from the apply of the provided
model and after the proper validations are performed on it.
"""
if model == None:
model = util.get_object() if form else dict(kwargs)
if fill:
model = cls.fill(model, safe=not new)
instance = cls(fill=False)
instance.apply(model, form=form, safe_a=safe)
if build:
cls.build(instance.model, map=False)
if new:
instance.assert_is_new()
return instance
@classmethod
def old(cls, model=None, form=True, safe=True, build=False):
return cls.new(model=model, form=form, safe=safe, build=build, new=False)
@classmethod
def wrap(cls, models, build=True, handler=None, **kwargs):
"""
"Wraps" the provided sequence (or single set) of model based data into a
sequence of models (or a single model) so that proper business logic may
be used for operations on top of that data.
In case the extra handler argument is passed it's going to be called for
each model that is going to be "wrapped" allowing an extra "layer" for
the transformation of the model.
The additional named arguments parameters allows extensibility to set
extra value in the creation of the wrapped object.
This operation is specially useful for API based environments where client
side business logic is meant to be added to the static data.
:type models: List
:param models: Sequence (or single) set of models that are going to be wrapped
around instances of the current class.
:type build: bool
:param build: If the "custom" build operation should be performed after
the wrap operation is performed so that new custom attributes may be
injected into the resulting instance.
:type handler: Function
:param handler: Handler function that is going to be called for each of the
models after the build process has been performed, allows an extra transform
operation to be performed at runtime.
:rtype: List
:return: The sequence of models (or single model) representing the provided
set of dictionary models that were sent as arguments.
"""
is_sequence = isinstance(models, (list, tuple))
if not is_sequence:
models = [models]
wrapping = []
for model in models:
if not isinstance(model, dict):
continue
_model = cls(model=model, **kwargs)
handler and handler(_model.model)
build and cls.build(_model.model, map=False)
wrapping.append(_model)
if is_sequence:
return wrapping
else:
return wrapping[0] if wrapping else None
@classmethod
def singleton(cls, model=None, form=True, safe=True, build=False, *args, **kwargs):
instance = cls.get(raise_e=False, *args, **kwargs)
if instance:
instance.apply(model, form=form, safe_a=safe)
else:
instance = cls.old(model=model, form=form, safe=safe, build=build)
return instance
@classmethod
def get(cls, *args, **kwargs):
(
fields,
eager,
eager_l,
map,
rules,
meta,
build,
fill,
resolve_a,
skip,
limit,
sort,
raise_e,
) = cls._get_attrs(
kwargs,
(
("fields", None),
("eager", None),
("eager_l", None),
("map", False),
("rules", True),
("meta", False),
("build", True),
("fill", True),
("resolve_a", None),
("skip", 0),
("limit", 0),
("sort", None),
("raise_e", True),
),
)
# in case there's a sort field and the safe search mode is enabled
# we must add sorting by the `_id` field so that the retrieval is
# considered to be deterministic, otherwise some DB implementations
# will not respect the same sorting sequence across different calls
if sort and (skip or limit):
if not isinstance(sort, list):
sort = list(sort)
sort.append(["_id", 1])
if eager_l == None:
eager_l = map
if resolve_a == None:
resolve_a = map
if eager_l:
eager = cls._eager_b(eager)
fields = cls._sniff(fields, rules=rules)
collection = cls._collection()
model = collection.find_one(kwargs, fields, skip=skip, limit=limit, sort=sort)
if not model and raise_e:
is_devel = common.is_devel()
if is_devel:
message = "%s not found for %s" % (cls.__name__, str(kwargs))
else:
message = "%s not found" % cls.__name__
raise exceptions.NotFoundError(message=message)
if not model and not raise_e:
return model
cls.types(model)
if fill:
cls.fill(model, safe=rules)
if build:
cls.build(model, map=map, rules=rules, meta=meta)
if eager:
model = cls._eager(model, eager, map=map)
if resolve_a:
model = cls._resolve_all(model, resolve=False)
return model if map else cls.old(model=model, safe=False)
@classmethod
def find(cls, *args, **kwargs):
(
fields,
eager,
eager_l,
map,
rules,
meta,
build,
fill,
resolve_a,
skip,
limit,
sort,
raise_e,
) = cls._get_attrs(
kwargs,
(
("fields", None),
("eager", None),
("eager_l", False),
("map", False),
("rules", True),
("meta", False),
("build", True),
("fill", True),
("resolve_a", None),
("skip", 0),
("limit", 0),
("sort", None),
("raise_e", False),
),
)
# in case there's a sort field and the safe search mode is enabled
# we must add sorting by the `_id` field so that the search is
# considered to be deterministic, otherwise some DB implementations
# will not respect the same sorting sequence across different calls
if sort and (skip or limit):
if not isinstance(sort, list):
sort = list(sort)
sort.append(["_id", 1])
if resolve_a == None:
resolve_a = map
if eager_l:
eager = cls._eager_b(eager)
cls._find_s(kwargs)
cls._find_d(kwargs)
fields = cls._sniff(fields, rules=rules)
collection = cls._collection()
models = collection.find(kwargs, fields, skip=skip, limit=limit, sort=sort)
if not models and raise_e:
is_devel = common.is_devel()
if is_devel:
message = "%s not found for %s" % (cls.__name__, str(kwargs))
else:
message = "%s not found" % cls.__name__
raise exceptions.NotFoundError(message=message)
models = [cls.types(model) for model in models]
if fill:
models = [cls.fill(model, safe=rules) for model in models]
if build:
[cls.build(model, map=map, rules=rules, meta=meta) for model in models]
if eager:
models = cls._eager(models, eager, map=map)
if resolve_a:
models = [cls._resolve_all(model, resolve=False) for model in models]
models = (
models if map else [cls.old(model=model, safe=False) for model in models]
)
return models
@classmethod
def count(cls, *args, **kwargs):
cls._clean_attrs(kwargs)
cls._find_s(kwargs)
cls._find_d(kwargs)
collection = cls._collection()
if kwargs:
if hasattr(collection, "count_documents"):
result = collection.count_documents(kwargs)
else:
result = collection.find(kwargs)
result = result.count()
else:
if hasattr(collection, "count_documents"):
result = collection.count_documents()
else:
result = collection.count()
return result
@classmethod
def paginate(cls, skip=0, limit=1, *args, **kwargs):
# retrieves the reference to the current global request in handling
# this is going to be used for operations on the parameters
request = common.base().get_request()
# counts the total number of references according to the
# current filter value and then uses this value together
# with the skip value to calculate both the number of pages
# available for the current filter and the current page index
# (note that the index is one index based)
total = cls.count(*args, **kwargs)
count = total / float(limit)
count = math.ceil(count)
count = int(count)
index = skip / float(limit)
index = math.floor(index)
index = int(index) + 1
# calculates the proper size of the current page being requested
# taking into account the total number of values and the limit
size = total % limit if index == count else limit
if size == 0 and total > 0:
size = limit
if total == 0:
size = 0
# creates the base structure for the page populating with the
# base values that may be used for display of the page
page = dict(
count=count,
index=index,
start=skip + 1,
end=skip + limit,
size=size,
total=total,
sorter=request.params_f.get("sorter", None),
direction=request.params_f.get("direction", "descending"),
)
def generate(**kwargs):
# creates the linear parameters list that is going to hold multiple
# key and value tuple representing the multiple parameters
params_l = []
# creates a copy of the current definition of the parameters and for each
# of the exclusion parameters removes it from the current structure
params = dict(request.params_f)
if "async" in params:
del params["async"]
# retrieves the "special" sorter keyword based argument and the equivalent
# values for the current page in handling, this values are going to be used
# for the calculus of the direction value (in case it's required)
sorter = kwargs.get("sorter", None)
_sorter = page["sorter"]
direction = page["direction"]
reverse = REVERSE.get(direction, "descending")
# verifies if the sorter value is defined in the arguments and if that's
# the case verifies if it's the same as the current one if that the case
# the direction must be reversed otherwise the default direction is set
if sorter and sorter == _sorter:
params["direction"] = reverse
elif sorter:
params["direction"] = "descending"
# "copies" the complete set of values from the provided keyword
# based arguments into the parameters map, properly converting them
# into the proper string value (avoiding possible problems)
for key, value in kwargs.items():
params[key] = str(value)
# iterates over the complete set of parameters to be sent to linearize
# them into a sequence of tuples ready to be converted into quoted string
for key, value in params.items():
is_list = isinstance(value, (list, tuple))
if not is_list:
value = [value]
for _value in value:
params_l.append((key, _value))
# converts the multiple parameters to be used into a linear
# quoted manner so they they may be used as query string values
query = [
util.quote(key) + "=" + util.quote(value) for key, value in params_l
]
query = "&".join(query)
return "?" + query if query else query
# updates the current page structure so that the (query) generation
# method is exposed in order to modify the current query
page["query"] = generate
return page
@classmethod
def delete_c(cls, *args, **kwargs):
collection = cls._collection()
collection.remove(kwargs)
@classmethod
def ordered(cls, filter=dict):
is_sequence = isinstance(filter, (list, tuple))
if not is_sequence:
filter = (filter,)
ordered = list(cls._ordered)
for name, value in cls.__dict__.items():
if name.startswith("_"):
continue
if not name == name.lower():
continue
if not isinstance(value, filter):
continue
if name in ordered:
continue
ordered.append(name)
return ordered
@classmethod
def methods(cls):
# in case the methods are already "cached" in the current
# class (fast retrieval) returns immediately
if "_methods" in cls.__dict__:
return cls._methods
# starts the list that will hold the various method names
# for the class, note that this value will be ordered
# according to the class level and the definition order
methods = []
# retrieves the complete model hierarchy for the current model
# and it's going to be used to iterated through the class levels
# in a top to bottom approach strategy
hierarchy = cls.hierarchy()
# iterates over the complete model hierarchy and retrieves the
# ordered set of attributes from it extending the retrieved methods
# list with the value for each of the model levels
for _cls in hierarchy:
ordered = _cls.ordered(filter=(types.FunctionType, classmethod))
methods.extend(ordered)
# saves the retrieved set of methods in the current model definition
# and then returns the value to the caller method as requested
cls._methods = methods
return methods
@classmethod
def fields(cls):
# in case the fields are already "cached" in the current
# class (fast retrieval) returns immediately
if "_fields" in cls.__dict__:
return cls._fields
# starts the list that will hold the various field names
# for the class, note that this value will be ordered
# according to the class level and the definition order
fields = []
# retrieves the complete model hierarchy for the current model
# and it's going to be used to iterated through the class levels
# in a top to bottom approach strategy
hierarchy = cls.hierarchy()
# iterates over the complete model hierarchy and retrieves the
# ordered set of attributes from it extending the retrieved fields
# list with the value for each of the model levels
for _cls in hierarchy:
ordered = _cls.ordered()
fields.extend(ordered)
# saves the retrieved set of fields in the current model definition
# and then returns the value to the caller method as requested
cls._fields = fields
return fields
@classmethod
def definition(cls):
# in case the definition is already "cached" in the current
# class (fast retrieval) returns immediately
if "_definition" in cls.__dict__:
return cls._definition
# creates the map that will hold the complete definition of
# the current model
definition = {}
# retrieves the complete model hierarchy for the current model
# this should allow the method to retrieve the complete set
# of fields for the current model
hierarchy = cls.hierarchy()
# iterates over all the classes in the hierarchy to creates the
# map that will contain the various names of the current model
# associated with its definition map
for _cls in hierarchy:
for name, value in _cls.__dict__.items():
if name.startswith("_"):
continue
if not name == name.lower():
continue
if not isinstance(value, dict):
continue
if name in definition:
raise exceptions.OperationalError(
message="Duplicated attribute '%s' in '%s' hierarchy"
% (name, _cls.__name__),
code=412,
)
definition[name] = value
# sets the "default" definition for the based identifier
# (underlying identifier attribute)
definition["_id"] = dict()
# saves the currently generated definition under the current
# class and then returns the contents of it to the caller method
cls._definition = definition
return definition
@classmethod
def definition_extended(cls):
# in case the definition extended is already "cached" in the current
# class (fast retrieval) returns immediately
if "_definition_extended" in cls.__dict__:
return cls._definition_extended
# retrieves the base definition dictionary and duplicated it adding
# the element present in the set of extra definition (overridable on
# a per model basis, providing extension)
definition_extended = dict(cls.definition())
definition_extended.update(cls.extra_definition())
# saves the currently generated definition extended under the current
# class and then returns the contents of it to the caller method
cls._definition_extended = definition_extended
return definition_extended
@classmethod
def links(cls):
# in case the links are already "cached" in the current
# class (fast retrieval) returns immediately
if "_links" in cls.__dict__:
return cls._links
# creates the list that will hold the complete set of method
# names for links type methods
links = []
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be link oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are links
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_link"):
continue
reference = hasattr(method, "__self__") and method.__self__
is_instance = False if reference else True
method._link["instance"] = is_instance
links.append(method._link)
# sorts the various links taking into account the name of
# the link, this is considered the pre-defined order
links.sort(key=lambda item: item["name"])
# saves the list of link method names defined under the current
# class and then returns the contents of it to the caller method
cls._links = links
return links
@classmethod
def links_m(cls):
# in case the links are already "cached" in the current
# class (fast retrieval) returns immediately
if "_links_m" in cls.__dict__:
return cls._links_m
# creates the map that will hold the complete set of method
# names for links type methods
links_m = dict()
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be link oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are links
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_link"):
continue
links_m[method.__name__] = method._link
# saves the map of link method names defined under the current
# class and then returns the contents of it to the caller method
cls._links_m = links_m
return links_m
@classmethod
def link(cls, name):
links_m = cls.links_m()
return links_m.get(name, None)
@classmethod
def operations(cls):
# in case the operations are already "cached" in the current
# class (fast retrieval) returns immediately
if "_operations" in cls.__dict__:
return cls._operations
# creates the list that will hold the complete set of method
# names for operations type methods
operations = []
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be operation oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are operations
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_operation"):
continue
reference = hasattr(method, "__self__") and method.__self__
is_instance = False if reference else True
method._operation["instance"] = is_instance
operations.append(method._operation)
# sorts the various operations taking into account the name of
# the operation, this is considered the pre-defined order
operations.sort(key=lambda item: item["name"])
# saves the list of operation method names defined under the current
# class and then returns the contents of it to the caller method
cls._operations = operations
return operations
@classmethod
def operations_m(cls):
# in case the operations are already "cached" in the current
# class (fast retrieval) returns immediately
if "_operations_m" in cls.__dict__:
return cls._operations_m
# creates the map that will hold the complete set of method
# names for operations type methods
operations_m = dict()
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be operation oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are operations
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_operation"):
continue
operations_m[method.__name__] = method._operation
# saves the map of operation method names defined under the current
# class and then returns the contents of it to the caller method
cls._operations_m = operations_m
return operations_m
@classmethod
def operation(cls, name):
operations_m = cls.operations_m()
return operations_m.get(name, None)
@classmethod
def views(cls):
# in case the views are already "cached" in the current
# class (fast retrieval) returns immediately
if "_views" in cls.__dict__:
return cls._views
# creates the list that will hold the complete set of method
# names for views type methods
views = []
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be view oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are views
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_view"):
continue
reference = hasattr(method, "__self__") and method.__self__
is_instance = False if reference else True
method._view["instance"] = is_instance
views.append(method._view)
# sorts the various views taking into account the name of
# the view, this is considered the pre-defined order
views.sort(key=lambda item: item["name"])
# saves the list of view method names defined under the current
# class and then returns the contents of it to the caller method
cls._views = views
return views
@classmethod
def views_m(cls):
# in case the views are already "cached" in the current
# class (fast retrieval) returns immediately
if "_views_m" in cls.__dict__:
return cls._views_m
# creates the map that will hold the complete set of method
# names for views type methods
views_m = dict()
# retrieves the complete set of method names for the current
# class this is going to be used to determine the ones that
# are considered to be view oriented
methods = cls.methods()
# iterates over the complete set of method names for the current
# class hierarchy to determine the ones that are views
for name in methods:
method = getattr(cls, name)
if not hasattr(method, "_view"):
continue
views_m[method.__name__] = method._view
# saves the map of view method names defined under the current
# class and then returns the contents of it to the caller method
cls._views_m = views_m
return views_m
@classmethod
def view(cls, name):
views_m = cls.views_m()
return views_m.get(name, None)
@classmethod
def definition_n(cls, name):
definition = cls.definition_extended()
if not name in definition:
return {}
return definition[name]
@classmethod
def register(cls, lazy=False):
if lazy:
return
cls.setup()
@classmethod
def unregister(cls, lazy=False):
if lazy:
return
cls.teardown()
@classmethod
def setup(cls):
cls._build_indexes()
@classmethod
def teardown(cls):
pass
@classmethod
def validate(cls):
return []
@classmethod
def validate_new(cls):
return cls.validate()
@classmethod
def base_names(cls):
names = cls.fields()
names = [name for name in names if not name.startswith("_")]
return names
@classmethod
def create_names(cls):
names = cls.base_names()
extra = cls.extra_names()
names.extend(extra)
return names
@classmethod
def update_names(cls):
return cls.create_names()
@classmethod
def show_names(cls):
_names = []
names = cls.base_names()
definition = cls.definition()
for name in names:
value = definition.get(name, None)
if value == None:
continue
is_private = value.get("private", False)
if is_private:
continue
_names.append(name)
return _names
@classmethod
def list_names(cls):
return cls.show_names()
@classmethod
def extra_names(cls):
return []
@classmethod
def unique_names(cls):
return []
@classmethod
def order_name(cls):
return None
@classmethod
def extra_definition(cls):
return {}
@classmethod
def build(cls, model, map=False, rules=True, meta=False):
if rules:
cls.rules(model, map)
cls._build(model, map)
if meta:
cls._meta(model, map)
@classmethod
def rules(cls, model, map):
for name, _value in legacy.eager(model.items()):
definition = cls.definition_n(name)
is_private = definition.get("private", False)
if not is_private:
continue
del model[name]
@classmethod
def types(cls, model):
definition = cls.definition()
for name, value in legacy.eager(model.items()):
if name == "_id":
continue
if value == None:
continue
if not name in definition:
continue
model[name] = cls.cast(name, value)
return model
@classmethod
def fill(cls, model=None, safe=False):
"""
Fills the current model with the proper values so that
no values are unset as this would violate the model definition
integrity. This is required when retrieving an object(s) from
the data source (as some of them may be incomplete).
:type model: Model
:param model: The model that is going to have its unset
attributes filled with "default" data, in case none is provided
all of the attributes will be filled with "default" data.
:type safe: bool
:param safe: If the safe mode should be used for the fill
operation meaning that under some conditions no unit fill
operation is going to be applied (eg: retrieval operations).
"""
model = model or dict()
definition = cls.definition()
for name, _definition in definition.items():
if name in model:
continue
if name in ("_id",):
continue
private = _definition.get("private", False)
increment = _definition.get("increment", False)
if private and safe:
continue
if increment:
continue
if "initial" in _definition:
initial = _definition["initial"]
model[name] = initial
else:
_type = _definition.get("type")
default = type_d(_type, None)
default = _type._default() if hasattr(_type, "_default") else default
model[name] = default
return model
@classmethod
def cast(cls, name, value, safe=True):
definition = cls.definition()
if not name in definition:
return value
if value == None:
return value
_definition = cls.definition_n(name)
_type = _definition.get("type", legacy.UNICODE)
builder = BUILDERS.get(_type, _type)
try:
return builder(value) if builder else value
except Exception:
if not safe:
raise
default = type_d(_type, None)
default = _type._default() if hasattr(_type, "_default") else default
return default
@classmethod
def to_description(cls, name):
"""
Converts the provided model attribute/field name into a
description ready to be human readable.
This conversion process may be automated (simple string
based replacement) or manually guided (through
configuration).
These strings should be english readable.
Some of the process involves caching for performance
reasons (avoids multiple description computation).
:type name: String
:param name: The name of the attribute/field to retrieve
the human readable description.
:rtype: String
:return: The human readable version of the attribute or
field name according to automated or manual rules.
"""
# runs the (possibly) recursive class resolution process
# so that in case the provided name is namespace based,
# meaning that it's meant to be used for references, the
# final leaf class is retrieved instead of the root one
cls, name = cls._res_cls(name)
# tries to retrieve the info dictionary for the attribute
# name and then uses it to retrieve the possible manual
# description value for the field, returning immediately
# the value if there's success
info = cls.definition_n(name)
description = info.get("description", None)
if description:
return description
# because there's no explicit description defined
# runs the automatic underscore to readable conversion
# and sets the value on the field info dictionary
description = util.underscore_to_readable(name, capitalize=True)
info["description"] = description
return description
@classmethod
def to_observations(cls, name):
# runs the (possibly) recursive class resolution process
# so that in case the provided name is namespace based,
# meaning that it's meant to be used for references, the
# final leaf class is retrieved instead of the root one
cls, name = cls._res_cls(name)
# tries to retrieve the info dictionary for the attribute
# name and then uses it to retrieve the observations value
# returning an invalid one in case it's not found
info = cls.definition_n(name)
return info.get("observations", None)
@classmethod
def all_parents(cls):
# in case the all parents are already "cached" in the current
# class (fast retrieval) returns immediately
if "_all_parents" in cls.__dict__:
return cls._all_parents
# creates the list to hold the various parent
# entity classes, populated recursively
all_parents = []
# retrieves the parent entity classes from
# the current class
parents = cls._bases()
# iterates over all the parents to extend
# the all parents list with the parent entities
# from the parent
for parent in parents:
# retrieves the (all) parents from the parents
# and extends the all parents list with them,
# this extension method avoids duplicates
_parents = parent.all_parents()
all_parents += _parents
# extends the all parents list with the parents
# from the current entity class (avoids duplicates)
all_parents += parents
# caches the all parents element in the class
# to provide fast access in latter access
cls._all_parents = all_parents
# returns the list that contains all the parents
# entity classes
return all_parents
@classmethod
def hierarchy(cls):
# in case the hierarchy are already "cached" in the current
# class (fast retrieval) returns immediately
if "_hierarchy" in cls.__dict__:
return cls._hierarchy
# retrieves the complete set of parents for the current class
# and then adds the current class to it
all_parents = cls.all_parents()
hierarchy = all_parents + [cls]
# saves the current hierarchy list under the class and then
# returns the sequence to the caller method
cls._hierarchy = hierarchy
return hierarchy
@classmethod
def increments(cls):
# in case the increments are already "cached" in the current
# class (fast retrieval) returns immediately
if "_increments" in cls.__dict__:
return cls._increments
# creates the list that will hold the various names that are
# meant to be automatically incremented
increments = []
# retrieves the map containing the definition of the class with
# the name of the fields associated with their definition
definition = cls.definition()
# iterate over all the names in the definition to retrieve their
# definition and check if their are of type increment
for name in definition:
_definition = cls.definition_n(name)
is_increment = _definition.get("increment", False)
if not is_increment:
continue
increments.append(name)
# saves the increment list under the class and then
# returns the sequence to the caller method
cls._increments = increments
return increments
@classmethod
def indexes(cls):
# in case the indexes are already "cached" in the current
# class (fast retrieval) returns immediately
if "_indexes" in cls.__dict__:
return cls._indexes
# creates the list that will hold the various names that are
# meant to be indexed in the data source
indexes = []
# retrieves the map containing the definition of the class with
# the name of the fields associated with their definition
definition = cls.definition()
# iterate over all the names in the definition to retrieve their
# definition and check if their are of type index
for name in definition:
_definition = cls.definition_n(name)
direction = _definition.get("index", False)
if not direction:
continue
indexes.append((name, direction))
# saves the index list under the class and then
# returns the sequence to the caller method
cls._indexes = indexes
return indexes
@classmethod
def safes(cls):
# in case the safes are already "cached" in the current
# class (fast retrieval) returns immediately
if "_safes" in cls.__dict__:
return cls._safes
# creates the list that will hold the various names that are
# meant to be safe values in the data source, the underlying
# identifier is set as the initial safe value of a model
safes = ["_id"]
# retrieves the map containing the definition of the class with
# the name of the fields associated with their definition
definition = cls.definition()
# iterate over all the names in the definition to retrieve their
# definition and check if their are of type safe
for name in definition:
_definition = cls.definition_n(name)
is_safe = _definition.get("safe", False)
if not is_safe:
continue
safes.append(name)
# saves the safes list under the class and then
# returns the sequence to the caller method
cls._safes = safes
return safes
@classmethod
def immutables(cls):
# in case the immutables are already "cached" in the current
# class (fast retrieval) returns immediately
if "_immutables" in cls.__dict__:
return cls._immutables
# creates the list that will hold the various names that are
# meant to be immutable values in the data source
immutables = []
# retrieves the map containing the definition of the class with
# the name of the fields associated with their definition
definition = cls.definition()
# iterate over all the names in the definition to retrieve their
# definition and check if their are of type immutable
for name in definition:
_definition = cls.definition_n(name)
is_immutable = _definition.get("immutable", False)
if not is_immutable:
continue
immutables.append(name)
# saves the immutables list under the class and then
# returns the sequence to the caller method
cls._immutables = immutables
return immutables
@classmethod
def eagers(cls):
# in case the eagers are already "cached" in the current
# class (fast retrieval) returns immediately
if "_eagers" in cls.__dict__:
return cls._eagers
# creates the list that will hold the various names that are
# meant to be eager values in the data source
eagers = []
# retrieves the map containing the definition of the class with
# the name of the fields associated with their definition
definition = cls.definition()
# iterate over all the names in the definition to retrieve their
# definition and check if their are of type eager
for name in definition:
_definition = cls.definition_n(name)
is_eager = _definition.get("eager", False)
if not is_eager:
continue
eagers.append(name)
# saves the eagers list under the class and then
# returns the sequence to the caller method
cls._eagers = eagers
return eagers
@classmethod
def default(cls):
# in case the default are already "cached" in the current
# class (fast retrieval) returns immediately
if "_default" in cls.__dict__:
return cls._default
# retrieves the complete hierarchy of the model to be used
# for the retrieval of the lowest possible default value for
# the current class hierarchy
hierarchy = cls.hierarchy()
# creates the value that is going to store the default value for
# the current class (in case there's one)
default = None
# iterates over all the classes in the current hierarchy, note
# that the iteration is done from the leaf nodes to the root nodes
# so that the lowest possible default value is found
for _cls in reversed(hierarchy):
for name in legacy.eager(_cls.__dict__.keys()):
# retrieves the definition map for the current name an in
# case the default value is set considers it default otherwise
# continues the loop, nothing to be done
_definition = cls.definition_n(name)
is_default = _definition.get("default", False)
if not is_default:
continue
# in case the default value is found sets its name in the
# current default value and then breaks the loop
default = name
break
# in case the default value has been found must break the external
# loop as nothing else remains to be found
if default:
break
# saves the default value (name) under the class and then
# returns the sequence to the caller method
cls._default = default
return default
@classmethod
def filter_merge(cls, name, filter, kwargs, operator=None):
# retrieves a possible previous filter defined for the
# provided name in case it does exist must concatenate
# that previous value in a join statement according to
# the currently defined operator
filter_p = kwargs.get(name, None)
if filter_p or not operator == None:
# defaults the operator for the join of the names to the
# value and then ensures that the value of the operator
# is within a valid range of values
operator = operator or "$and"
util.verify(operator in ("$and", "$or"))
# retrieves the and references for the current arguments
# and appends the two filter values (current and previous)
# then deletes the current name reference in the arguments
# and updates the name value to the and value
filter_a = kwargs.get(operator, [])
if filter_p:
filter = filter_a + [{name: filter}, {name: filter_p}]
else:
filter = filter_a + [{name: filter}]
if name in kwargs:
del kwargs[name]
name = operator
# sets the currently defined filter structures in the keyword
# based arguments map for the currently defined name
kwargs[name] = filter
@classmethod
def is_abstract(cls):
return False
@classmethod
def is_visible(cls):
return True
@classmethod
def is_attached(cls):
return True
@classmethod
def is_concrete(cls):
if not "is_abstract" in cls.__dict__:
return True
return not cls.is_abstract()
@classmethod
def is_child(cls, parent):
return issubclass(cls, parent)
@classmethod
def is_equal(cls, other):
if not cls._name() == other._name():
return False
if not cls.__name__ == other.__name__:
return False
return True
@classmethod
def assert_is_attached_g(cls):
if cls.is_attached():
return
raise exceptions.OperationalError(message="Model is not attached", code=412)
@classmethod
def assert_is_concrete_g(cls):
if cls.is_concrete():
return
raise exceptions.OperationalError(message="Model is not concrete", code=412)
@classmethod
def assert_is_child_g(cls, parent):
if cls.is_child(parent):
return
raise exceptions.OperationalError(
message="Model is not child of %s" % parent, code=412
)
@classmethod
def _build(cls, model, map):
pass
@classmethod
def _meta(cls, model, map, safe=True):
# iterates over the complete set of keys and values for the
# current model map to try to compute the associated meta value
# for all of its attributes (should use some computation)
for key, value in legacy.items(model):
# verifies if the current value in iteration is either
# unset (none) or an unloaded reference, if that's the
# case the value is considered "invalid" not to be encoded
is_reference = isinstance(value, TYPE_REFERENCES)
is_invalid = value == None or (is_reference and not value.is_resolved())
# retrieves the definition for the current key in iteration
# so that it's possible to apply the mapper
definition = cls.definition_n(key)
# resolves the proper meta value for the key, meaning that if
# the type of the current field is a class all the mro of it
# is going to be percolated trying to find the best meta value,
# and then tries to retrieve the associated mapper function
meta = cls._solve(key)
mapper = METAS.get(meta, None)
# in case there's a valid mapper function and the value is not
# invalid calls the proper mapper function otherwise runs the
# default (fallback) operation for meta retrieval
try:
if mapper and not is_invalid:
value = mapper(value, definition, cls)
else:
value = value if is_invalid else legacy.UNICODE(value)
except Exception:
if not safe:
raise
value = None
# sets the final meta value in the model, so that it may be
# latter retrieved.
model[key + "_meta"] = value
@classmethod
def _sniff(cls, fields, rules=False):
fields = fields or cls.fields()
fields = list(fields)
if not rules:
return fields
for field in list(fields):
definition = cls.definition_n(field)
is_private = definition.get("private", False)
if is_private:
fields.remove(field)
return fields
@classmethod
def _solve(cls, name):
definition = cls.definition_n(name)
type = definition.get("type", legacy.UNICODE)
for cls in type.mro():
base = TYPE_META.get(cls, None)
if base:
break
return definition.get("meta", base)
@classmethod
def _adapter(cls):
return common.base().get_adapter()
@classmethod
def _encoder(cls):
adapter = cls._adapter()
encoder = adapter.encoder()
return encoder
@classmethod
def _collection(cls, name=None):
name = name or cls._name()
adapter = cls._adapter()
collection = adapter.collection(name)
return collection
@classmethod
def _collection_a(cls, name=None):
name = name or cls._name()
adapter = cls._adapter()
collection = adapter.collection_a(name)
return collection
@classmethod
def _name(cls):
# retrieves the class object for the current instance and then
# converts it into lower case value in order to serve as the
# name of the collection to be used
name = cls.__name__.lower()
return name
@classmethod
def _under(cls, plural=True):
return cls._underscore(plural=plural)
@classmethod
def _underscore(cls, plural=True):
camel = cls._plural() if plural else cls._singular()
return util.camel_to_underscore(camel)
@classmethod
def _readable(cls, plural=False):
camel = cls._plural() if plural else cls._singular()
return util.camel_to_readable(camel, capitalize=True)
@classmethod
def _singular(cls):
return cls.__name__
@classmethod
def _plural(cls):
return cls._singular() + "s"
@classmethod
def _eager(cls, model, names, *args, **kwargs):
"""
Working at a model map/dictionary level tries to resolve the
relations described by the sequence of `.` separated names paths.
Should be able to handle both instance and map associated eager
loading relations.
:type model: Dictionary
:param model: The model map to be used as reference for the eager
loading of relations.
:type names: List
:param names: The list of dot separated name paths to "guide" the
loading of relations (references).
:rtype: Dictionary
:return: The resulting model with the required relations loaded.
"""
# verifies if the provided model instance is a sequence and if
# that's the case runs the recursive eager loading of names and
# returns the resulting sequence to the caller method
is_list = isinstance(model, (list, tuple))
if is_list:
return [cls._eager(_model, names, *args, **kwargs) for _model in model]
# iterates over the complete set of names that are meant to be
# eager loaded from the model and runs the "resolution" process
# for each of them so that they are properly eager loaded
for name in names:
_model = model
for part in name.split("."):
is_sequence = isinstance(_model, (list, tuple))
if is_sequence:
_model = [
cls._res(value, part, *args, **kwargs) for value in _model
]
else:
_model = cls._res(_model, part, *args, **kwargs)
if not _model:
break
# returns the resulting model to the caller method, most of the
# times this model should have not been touched
return model
@classmethod
def _res(cls, model, part, *args, **kwargs):
"""
Resolves a specific model part taking into account the multiple
possible resolution strategies.
Most of its logic will be associated with reference like types.
This method will also (for map based resolution strategies) change
the owner model, setting its references with the resolved maps, this
is required as maps do not allow reference objects to exist.
:type model: Dictionary
:param model: The model map to be used in the resolution process.
:type part: String
:param part: The name of the model's part to be resolved.
:rtype: Dictionary/Object
:return: The resolved part that may be either a map or an object
depending on the resolution strategy.
"""
# in case the provided is not valid returns it (no resolution is
# possible) otherwise gather the base value for resolution
if not model:
return model
value = model[part]
# check the data type of the requested name for resolution
# and in case it's not valid and not a reference returns it
# immediately, no resolution to be performed
is_reference = isinstance(value, TYPE_REFERENCES)
if not value and not is_reference:
return value
# in case the value is a reference type object then runs
# the resolve operation effectively resolving the values
# (this is considered a very expensive operation), notice
# that this operation is going to respect the map vs. instance
# kind of resolution process so the data type of the resulting
# value is going to depend on that
if is_reference:
value = value.resolve(eager_l=True, *args, **kwargs)
# in case the map resolution process was requested an explicit
# set of the resolved value is required (implicit resolution
# using `resolve()`) is not enough to ensure proper type structure
if kwargs.get("map", False):
model[part] = value
# returns the "final" (possibly resolved) value to the caller method
# ready to be used for possible merging processes
return value
@classmethod
def _res_cls(cls, name):
"""
Resolves the possibly namespace name for the current class
recurring to a recursive approach to the attribute resolution.
:type name: String
:param name: The name of the possibly namespace attribute that
should be used to retrieve the associated entity class (eg:
product.name, order.id, etc.).
:rtype: Tuple
:return: The target class that represents the reference attribute
described by the namespace (path) based name and the name of the
final attribute contained in it.
"""
# in case the provided name is a simple one this is a direct attribute
# of the current class and the expected tuple is returned immediately
if not "." in name:
return cls, name
# splits the namespace recursive name and then iterates over the multiple
# relation attributes to retrieve their respective targets
name_s = name.split(".")
for part in name_s[:-1]:
info = getattr(cls, part) if hasattr(cls, part) else dict()
part_type = info.get("type", None)
cls = part_type._target()
# returns the final tuple containing the leaf model class and the final
# leaf name that can be used to retrieve the attribute
return cls, name_s[-1]
@classmethod
def _get_attrs(cls, kwargs, attrs):
_attrs = []
for attr, value in attrs:
if not attr in kwargs:
_attrs.append(value)
continue
value = kwargs[attr]
del kwargs[attr]
_attrs.append(value)
return _attrs
@classmethod
def _clean_attrs(cls, kwargs, dirty=DIRTY_PARAMS):
for key in dirty:
if not key in kwargs:
continue
del kwargs[key]
@classmethod
def _find_s(cls, kwargs):
# tries to retrieve the find name value from the provided
# named arguments defaulting to an unset value otherwise
find_n = kwargs.pop("find_n", None)
# retrieves the kind of insensitive strategy that is going
# to be used for the resolution of regular expressions,
# this should affect all the filters and so it should be
# used with some amount of care
if "find_i" in kwargs:
find_i = kwargs["find_i"]
del kwargs["find_i"]
else:
find_i = False
# retrieves the kind of default operation to be performed
# this may be either: right, left or both and the default
# value is both so that the token is matched in case it
# appears anywhere in the search string
if "find_t" in kwargs:
find_t = kwargs["find_t"]
del kwargs["find_t"]
else:
find_t = "both"
# in case the find string is currently not defined in the
# named arguments map returns immediately as nothing is
# meant to be done on this method
if not "find_s" in kwargs:
return
# retrieves the find string into a local variable, then
# removes the find string from the named arguments map
# so that it's not going to be erroneously used by the
# underlying find infra-structure
find_s = kwargs["find_s"]
del kwargs["find_s"]
# retrieves the "name" of the attribute that is considered
# to be the default (representation) for the model in case
# there's none returns immediately, as it's not possible
# to proceed with the filter creation
default = find_n or cls.default()
if not default:
return
# constructs the proper right and left parts of the regex
# that is going to be constructed for the matching of the
# value, this is achieved by checking the find type
right = "^" if find_t == "right" else ""
left = "$" if find_t == "left" else ""
# retrieves the definition for the default attribute and uses
# it to retrieve it's target data type, defaulting to the
# string type in case none is defined in the schema
definition = cls.definition_n(default)
default_t = definition.get("type", legacy.UNICODE)
try:
# in case the target date type for the default field is
# string the both sides wildcard regex is used for the
# search otherwise the search value to be used is the
# exact match of the value (required type conversion)
if default_t in legacy.STRINGS:
find_v = {
"$regex": right + re.escape(find_s) + left,
"$options": "i" if find_i else "",
}
else:
find_v = default_t(find_s)
except Exception:
# in case there's an error in the conversion for
# the target type value sets the search value as
# invalid (not going to be used in filter)
find_v = None
# in case there's a valid find value to be used sets
# the value in the named arguments map to be used by
# the underlying find infra-structure, note that the
# set is done using a "merge" with the previous values
if not find_v == None:
cls.filter_merge(default, find_v, kwargs)
@classmethod
def _find_d(cls, kwargs):
# in case the find definition is currently not defined in the
# named arguments map returns immediately as nothing is
# meant to be done on this method
if not "find_d" in kwargs:
return
# tries to retrieve the value of the operator that is going
# to be used to "join" the multiple find parts (find values)
find_o = kwargs.pop("find_o", None)
# retrieves the find definition into a local variable, then
# removes the find definition from the named arguments map
# so that it's not going to be erroneously used by the
# underlying find infra-structure
find_d = kwargs["find_d"]
del kwargs["find_d"]
# verifies that the data type for the find definition is a
# valid sequence and in case its not converts it into one
# so that it may be used in sequence valid logic
if not isinstance(find_d, list):
find_d = [find_d]
# iterates over all the filters defined in the filter definition
# so that they may be used to update the provided arguments with
# the filter defined in each of their lines
for filter in find_d:
# in case the filter is not valid (unset or invalid) it's going
# to be ignored as no valid information is present
if not filter:
continue
# splits the filter string into its three main components
# the name, operator and value, that are going to be processed
# as defined by the specification to create the filter
result = filter.split(":", 2)
if len(result) == 2:
result.append(None)
name, operator, value = result
# retrieves the definition for the filter attribute and uses
# it to retrieve it's target data type that is going to be
# used for the proper conversion, note that in case the base
# type resolution method exists it's used (recursive resolution)
definition = cls.definition_n(name)
name_t = definition.get("type", legacy.UNICODE)
if hasattr(name_t, "_btype"):
name_t = name_t._btype()
if name in ("_id",):
name_t = cls._adapter().object_id
# determines if the current filter operation should be performed
# using a case insensitive based approach to the search, by default
# all of the operations are considered to be case sensitive
insensitive = INSENSITIVE.get(operator, False)
# retrieves the method that is going to be used for value mapping
# or conversion based on the current operator and then converts
# the operator into the domain specific operator
value_method = VALUE_METHODS.get(operator, None)
operator = OPERATORS.get(operator, operator)
# in case there's a custom value mapped retrieved uses it to convert
# the string based value into the target specific value for the query
# otherwise uses the data type for the search field for value conversion
if value_method:
value = value_method(value, name_t)
else:
try:
value = name_t(value)
except ValueError:
value = None
# constructs the custom find value using a key and value map value
# in case the operator is defined otherwise (operator not defined)
# the value is used directly, then merges this find value into the
# current set of filters for the provided (keyword) arguments
find_v = {operator: value} if operator else value
if insensitive:
find_v["$options"] = "i"
cls.filter_merge(name, find_v, kwargs, operator=find_o)
@classmethod
def _bases(cls, subclass=None):
"""
Retrieves the complete set of base (parent) classes for
the current class, this method is safe as it removes any
class that does not inherit from the entity class.
Please note that this function only retrieves the set of
direct parents of the class and not the complete hierarchy.
:type subclass: Class
:param subclass: The subclass from which the various base
classes must inherit to be considered model, in case the
value is not defined the "basic" model value is used.
:rtype: List/Tuple
:return: The set containing the various bases classes for
the current class that are considered valid.
"""
# determines the base sub class value that is going
# to be applied for proper sub class validation
subclass = subclass or Model
# retrieves the complete set of base classes for
# the current class and in case the observable is
# not in the bases set returns the set immediately
# as the top level model has not been reached yet,
# note that the various base classes are filtered
# according to their inheritance from the provided
# (model) class to be applied as filter
bases = cls.__bases__
if subclass:
bases = [base for base in bases if issubclass(base, subclass)]
if not bases == Model.__bases__:
return bases
# returns an empty tuple to the caller method as the
# top level class has been reached and the class is
# considered to have no "valid" base classes
return ()
@classmethod
def _increment(cls, name):
_name = cls._name() + ":" + name
store = cls._collection(name="counters")
value = store.find_and_modify(
{"_id": _name}, {"$inc": {"seq": 1}}, new=True, upsert=True
)
value = value or store.find_one({"_id": _name})
return value["seq"]
@classmethod
def _ensure_min(cls, name, value):
_name = cls._name() + ":" + name
store = cls._collection(name="counters")
value = store.find_and_modify(
{"_id": _name}, {"$max": {"seq": value}}, new=True, upsert=True
)
value = value or store.find_one({"_id": _name})
return value["seq"]
@classmethod
def _build_indexes(cls):
indexes = cls.indexes()
collection = cls._collection()
for index, direction in indexes:
collection.ensure_index(index, direction=direction)
@classmethod
def _destroy_indexes(cls):
collection = cls._collection()
collection.drop_indexes()
@classmethod
def _eager_b(cls, eager):
"""
Builds the provided list of eager values, preparing them
according to the current model rules.
The composition process includes the extension of the provided
sequence of eager values with the base ones defined in the
model, if not handled correctly this is an expensive operation.
:type eager: List/Tuple
:param eager: The base sequence containing the various fields
that should be eagerly loaded for the operation.
:rtype: Tuple
:return: The "final" resolved tuple that may be used for the eager
loaded operation performance.
"""
eager = list(eager) if eager else []
eagers = cls.eagers()
eager.extend(eagers)
if not eager:
return eager
eager = tuple(set(eager))
return eager
@classmethod
def _resolve_all(cls, model, *args, **kwargs):
definition = cls.definition()
for name, value in legacy.eager(model.items()):
if not name in definition:
continue
model[name] = cls._resolve(name, value, *args, **kwargs)
return model
@classmethod
def _resolve(cls, name, value, *args, **kwargs):
# verifies if the current value is an iterable one in case
# it is runs the evaluate method for each of the values to
# try to resolve them into the proper representation
is_iterable = isinstance(value, (list, tuple))
if is_iterable:
return [cls._resolve(name, value, *args, **kwargs) for value in value]
# in case the current instance is a dictionary then, and in case
# there's a target class (typical for reference like types) recursion
# steps must be token, allowing proper normalized and resolved data
# to exist in the complete deep and nested data hierarchy, this is
# relevant only when the underlying structure is map oriented
if isinstance(value, dict):
info = getattr(cls, name)
part_type = info.get("type", None)
if hasattr(part_type, "_target"):
_cls = part_type._target()
return _cls._resolve_all(value, *args, **kwargs)
# verifies if the map value recursive approach should be used
# for the element and if that's the case calls the proper method
# otherwise uses the provided (raw value)
if not hasattr(value, "map_v"):
return value
return value.map_v(*args, **kwargs)
@classmethod
def _to_meta(cls, base):
if isinstance(base, str):
return base
is_class = inspect.isclass(type)
if not is_class:
base = base.__class__
for cls in base.mro():
result = TYPE_META.get(cls, None)
if not result:
continue
return result
return base
@property
def request(self):
return common.base().get_request()
@property
def session(self):
return common.base().get_session()
@property
def logger(self):
if self.owner:
return self.owner.logger
else:
return logging.getLogger()
def val(self, name, default=None):
return self.model.get(name, default)
def json_v(self, *args, **kwargs):
return self.model
def map_v(self, *args, **kwargs):
cls = self.__class__
clone = kwargs.pop("clone", False)
resolve = kwargs.get("resolve", True)
evaluator = kwargs.get("evaluator", "map_v")
if clone:
base = self.clone(reset=False, deep=True)
else:
base = self
return cls._resolve_all(base.model, resolve=resolve, evaluator=evaluator)
def build_m(self, model=None, rules=True):
"""
Builds the currently defined model, this should run
additional computation for the current model creating
new (calculated) attributes and deleting other.
The extra rules parameters allows for the (security) rules
to be applied to the model, used this with care to avoid
any security problems.
This method should me used carefully to avoid validation
problems and other side effects.
:type model: Dictionary
:param model: The model map to be used for the build
operation in case none is specified the currently set
model is used instead.
:type rules: bool
:param rules: If the (security) rules should be applied to
the model that is going to be built.
"""
cls = self.__class__
model = model or self.model
cls.build(model, rules=rules)
def apply(self, model=None, form=True, safe=None, safe_a=True):
# calls the complete set of event handlers for the current
# apply operation, this should trigger changes in the model
self.pre_apply()
# retrieves the reference to the class associated
# with the current instance
cls = self.__class__
# creates the base safe map from the provided map or
# builds a new map that will hold these values
safe = safe or {}
# verifies if the base safe rules should be applied
# to the current map of safe attributes
if safe_a:
safes = cls.safes()
for _safe in safes:
safe[_safe] = True
# retrieves the object loading it from all the available
# sources and then iterates over all the of the model
# values setting the values in the current instance's model
# then runs the type casting/conversion operation in it
if model == None:
model = util.get_object() if form else dict()
for name, value in legacy.eager(model.items()):
is_safe = safe.get(name, False)
if is_safe:
continue
self.model[name] = value
cls = self.__class__
cls.types(self.model)
# calls the complete set of event handlers for the current
# apply operation, this should trigger changes in the model
self.post_apply()
# returns the instance that has just been used for the apply
# operation, this may be used for chaining operations
return self
def copy(self, build=False, rules=True):
cls = self.__class__
_copy = copy.deepcopy(self)
build and cls.build(_copy.model, map=False, rules=rules)
return _copy
def clone(self, reset=True, deep=False):
cls = self.__class__
model = dict(self.model) if deep else self.model
if not reset:
return cls(model=model)
indexes = cls.increments()
indexes = indexes + cls.unique_names() + ["_id"]
for index in indexes:
if not index in model:
continue
del model[index]
return cls(model=model)
def validate_extra(self, name):
"""
Adds a new validate method to the current set of
validate methods to be executed on the next validation
execution. Note this validate method will only be used
on the next validation execution, after that execution
it will be removed from the extras validations list.
:type name: String
:param name: The name of the validation method that should
be added to the extras list, note that the real name of
the method should be prefixed with the "validate" prefix
"""
cls = self.__class__
method = getattr(cls, "validate_" + name)
self._extras.append(method)
def is_new(self):
return not "_id" in self.model
def assert_is_new(self):
"""
Ensures that the current model instance is a new one meaning
that the inner identifier of the model is not currently set.
This method may be used to avoid security issues of providing
update permissions for situations where only creation is allowed.
The method raises an exception if the current instance is not
considered to be a new one causing the current control flow to
be returned to the caller method until catching of exception.
"""
if self.is_new():
return
raise exceptions.OperationalError(
message="Instance is not new, identifier is set", code=412
)
def assert_is_attached(self):
cls = self.__class__
cls.assert_is_attached_g()
def assert_is_concrete(self):
cls = self.__class__
cls.assert_is_concrete_g()
def assert_is_child(self, parent):
cls = self.__class__
cls.assert_is_child_g(parent)
def save(
self,
validate=True,
verify=True,
is_new=None,
increment_a=None,
immutables_a=None,
pre_validate=True,
pre_save=True,
pre_create=True,
pre_update=True,
post_validate=True,
post_save=True,
post_create=True,
post_update=True,
before_callbacks=[],
after_callbacks=[],
):
# ensures that the current instance is associated with
# a concrete model, ready to be persisted in database
if verify:
self.assert_is_concrete()
# checks if the instance to be saved is a new instance
# or if this is an update operation and then determines
# series of default values taking that into account
if is_new == None:
is_new = self.is_new()
if increment_a == None:
increment_a = is_new
if immutables_a == None:
immutables_a = not is_new
# runs the validation process in the current model, this
# should ensure that the model is ready to be saved in the
# data source, without corruption of it, only run this process
# in case the validate flag is correctly set
validate and self._validate(
pre_validate=pre_validate, post_validate=post_validate
)
# calls the complete set of event handlers for the current
# save operation, this should trigger changes in the model
if pre_save:
self.pre_save()
if pre_create and is_new:
self.pre_create()
if pre_update and not is_new:
self.pre_update()
# filters the values that are present in the current model
# so that only the valid ones are stored in, invalid values
# are going to be removed, note that if the operation is an
# update operation and the "immutable rules" also apply, the
# returned value is normalized meaning that for instance if
# any relation is loaded the reference value is returned instead
# of the loaded relation values (required for persistence)
model = self._filter(
increment_a=increment_a, immutables_a=immutables_a, normalize=True
)
# in case the current model is not new must create a new
# model instance and remove the main identifier from it
if not is_new:
_model = copy.copy(model)
del _model["_id"]
# calls the complete set of callbacks that should be called
# before the concrete data store save operation
for callback in before_callbacks:
callback(self, model)
# retrieves the reference to the store object to be used and
# uses it to store the current model data
store = self._get_store()
if is_new:
store.insert(model)
self.apply(model, safe_a=False)
else:
store.update({"_id": model["_id"]}, {"$set": _model})
# calls the complete set of callbacks that should be called
# after the concrete data store save operation
for callback in after_callbacks:
callback(self, model)
# calls the post save event handlers in order to be able to
# execute appropriate post operations
if post_save:
self.post_save()
if post_create and is_new:
self.post_create()
if post_update and not is_new:
self.post_update()
# returns the instance that has just been used for the save
# operation, this may be used for chaining operations
return self
def delete(
self,
verify=True,
pre_delete=True,
post_delete=True,
before_callbacks=[],
after_callbacks=[],
):
# ensures that the current instance is associated with
# a concrete model, ready to be persisted in database
if verify:
self.assert_is_concrete()
# calls the complete set of event handlers for the current
# delete operation, this should trigger changes in the model
if pre_delete:
self.pre_delete()
# calls the complete set of callbacks that should be called
# before the concrete data store delete operation
for callback in before_callbacks:
callback(self)
# retrieves the reference to the store object to be able to
# execute the removal command for the current model
store = self._get_store()
store.remove({"_id": self._id})
# calls the underlying delete handler that may be used to extend
# the default delete functionality
self._delete()
# calls the complete set of callbacks that should be called
# after the concrete data store delete operation
for callback in after_callbacks:
callback(self)
# calls the complete set of event handlers for the current
# delete operation, this should trigger changes in the model
if post_delete:
self.post_delete()
def approve(self, model=None, type=None):
# retrieves the class associated with the instance
# that is going to be sent for approval
cls = self.__class__
# determines the proper method naming suffix to be
# used for the requested type of approval/validation
suffix = "_" + type if type else ""
# retrieves the proper (target) method for validation
# and then runs the inner validate method for it
method = getattr(cls, "validate" + suffix)
self._validate(model=model, method=method)
# returns the instance that has just been used for the approve
# operation, this may be used for chaining operations
return self
def advance(self, name, delta=1):
store = self._get_store()
value = store.find_and_modify(
{"_id": self._id}, {"$inc": {name: delta}}, new=True
)
value = value or store.find_one({"_id": self._id})
_value = value[name]
setattr(self, name, _value)
return _value
def reload(self, *args, **kwargs):
is_new = self.is_new()
if is_new:
raise exceptions.OperationalError(
message="Can't reload a new model entity", code=412
)
cls = self.__class__
return cls.get(_id=self._id, *args, **kwargs)
def exists(self):
is_new = self.is_new()
if is_new:
return False
entity = self.get(_id=self._id, raise_e=False)
return True if entity else False
def map(self, increment_a=False, resolve=False, all=False, evaluator="map_v"):
model = self._filter(
increment_a=increment_a, resolve=resolve, all=all, evaluator=evaluator
)
return model
def dumps(self, encode=True):
cls = self.__class__
encoder = cls._encoder() if encode else None
return json.dumps(self.model, cls=encoder)
def unwrap(self, **kwargs):
default = kwargs.get("default", False)
if default:
return self.map()
else:
return dict()
def pre_validate(self):
self.trigger("pre_validate")
def pre_save(self):
self.trigger("pre_save")
def pre_create(self):
self.trigger("pre_create")
def pre_update(self):
self.trigger("pre_update")
def pre_delete(self):
self.trigger("pre_delete")
def post_validate(self):
self.trigger("post_validate")
def post_save(self):
self.trigger("post_save")
def post_create(self):
self.trigger("post_create")
def post_update(self):
self.trigger("post_update")
def post_delete(self):
self.trigger("post_delete")
def pre_apply(self):
self.trigger("pre_apply")
def post_apply(self):
self.trigger("post_apply")
def _get_store(self):
return self.__class__._collection()
def _get_store_a(self):
return self.__class__._collection_a()
def _delete(self):
pass
def _validate(self, model=None, method=None, pre_validate=True, post_validate=True):
# calls the event handler for the validation process this
# should setup the operations for a correct validation
if pre_validate:
self.pre_validate()
# starts the model reference with the current model in
# case none is defined
model = model or self.model
# retrieves the class associated with the current instance
# to be able to retrieve the correct validate methods
cls = self.__class__
# checks if the current model is new (create operation)
# and sets the proper validation methods retrieval method
is_new = self.is_new()
if is_new:
method = method or cls.validate_new
else:
method = method or cls.validate
# runs the validation process on the various arguments
# provided to the account and in case an error is returned
# raises a validation error to the upper layers
errors, object = validation.validate(
method, object=model, ctx=self, build=False
)
# iterates over the complete set of extra validate method
# and runs their validations for the current model context
# adding their errors to the existing map (appending) this
# will make sure that these extra validation remains transparent
# from an end-user point of view (as expected)
for extra in self._extras:
_errors, _object = validation.validate(
extra, object=model, ctx=self, build=False
)
for key, value in _errors.items():
errors_l = errors.get(key, [])
_errors_l = value
errors_l.extend(_errors_l)
errors[key] = errors_l
# empties the extras list so that the methods that have been
# set there are not going to be used in any further validation
del self._extras[:]
# in case the errors map is not empty or invalid there are
# errors and they should be encapsulated around a validation
# error and raises to the top layer for handling
if errors:
raise exceptions.ValidationError(errors, self)
# calls the event handler for the validation process this
# should finish the operations from a correct validation
if post_validate:
self.post_validate()
def _filter(
self,
increment_a=True,
immutables_a=False,
normalize=False,
resolve=False,
all=False,
evaluator="json_v",
):
# creates the model that will hold the "filtered" model
# with all the items that conform with the class specification
model = {}
# retrieves the class associated with the current instance
# to be able to retrieve the correct definition methods
cls = self.__class__
# retrieves the (schema) definition for the current model
# to be "filtered" it's going to be used to retrieve the
# various definitions for the model fields
definition = cls.definition()
# retrieves the complete list of fields that are meant to be
# automatically incremented for every save operation
increments = cls.increments()
# gather the set of elements that are considered immutables and
# that are not meant to be changed if the current operation to
# apply the filter is not a new operation (update operation)
immutables = cls.immutables()
# iterates over all the increment fields and increments their
# fields so that a new value is set on the model, note that if
# the increment apply is unset the increment operation is ignored
for name in increments:
if not increment_a:
continue
if name in self.model:
model[name] = cls._ensure_min(name, self.model[name])
else:
model[name] = cls._increment(name)
# iterates over all the model items to filter the ones
# that are not valid for the current class context
for name, value in legacy.eager(self.model.items()):
if not name in definition:
continue
if increment_a and name in increments:
continue
if immutables_a and name in immutables:
continue
value = self._evaluate(name, value, evaluator=evaluator)
model[name] = value
# in case the normalize flag is set must iterate over all
# items to try to normalize the values by calling the reference
# value this will returns the reference index value instead of
# the normal value that would prevent normalization
if normalize:
for name, value in legacy.eager(self.model.items()):
if not name in definition:
continue
if not hasattr(value, "ref_v"):
continue
model[name] = value.ref_v()
# in case the resolution flag is set, it means that a recursive
# approach must be performed for the resolution of values that
# implement the map value (recursive resolution) method, this is
# a complex (and possible computational expensive) process that
# may imply access to the base data source
if resolve:
for name, value in legacy.eager(self.model.items()):
if not name in definition:
continue
model[name] = cls._resolve(name, value)
# in case the all flag is set the extra fields (not present
# in definition) must also be used to populate the resulting
# (filtered) map so that it contains the complete set of values
# present in the base map of the current instance
if all:
for name, value in legacy.eager(self.model.items()):
if name in model:
continue
model[name] = value
# returns the model containing the "filtered" items resulting
# from the validation of the items against the model class
return model
def _evaluate(self, name, value, evaluator="json_v"):
# verifies if the current value is an iterable one in case
# it is runs the evaluate method for each of the values to
# try to resolve them into the proper representation, note
# that both base iterable values (lists and dictionaries) and
# objects that implement the evaluator method are not considered
# to be iterables and normal operation applies
is_iterable = hasattr(value, "__iter__")
is_iterable = (
is_iterable
and not isinstance(value, ITERABLES)
and (not hasattr(value, evaluator) or not evaluator)
)
if is_iterable:
return [self._evaluate(name, value, evaluator=evaluator) for value in value]
# verifies the current value's class is sub class of the model
# class and in case it's extracts the relation name from the
# value and sets it as the value in iteration
is_model = issubclass(value.__class__, Model)
if is_model:
meta = getattr(self.__class__, name)
_type = meta.get("type", legacy.BYTES)
_name = _type._name
value = getattr(value, _name)
# iterates over all the values and retrieves the map value for
# each of them in case the value contains a map value retrieval
# method otherwise uses the normal value returning it to the caller
method = (
getattr(value, evaluator)
if evaluator and hasattr(value, evaluator)
else None
)
value = method(resolve=False) if method else value
return value
def _res_entity(self, name, *args, **kwargs):
"""
Tries to retrieve the relation entity value associated with the
provided namespace based name.
This method is equivalent to ``_res_cls()`` in the sense that it
runs a namespace based name resolution and returns the final name.
This method should be used carefully as it may imply multiple
resolution of the related attributes (expensive operation).
:type name: String
:param name: The namespace based recursive name that is going to
be used to resolve the "final" leaf entity (if required).
:rtype: Tuple
:return: A tuple containing both the final leaf entity instance and
the final single level name to be used to retrieve the attribute.
"""
# in case the provided name is a simple one this is a direct attribute
# of the current entity and so the expected values are returned
if not "." in name:
return self, name
# sets the initial entity value for iteration as the current instance
# (root node) and then splits the name around its components
entity = self
name_s = name.split(".")
# runs the iterative process to obtain the final leaf entity by using
# the resolve operation around each field
for part in name_s[:-1]:
entity = entity[part].resolve(*args, **kwargs)
# returns the final tuple containing both the leaf entity and the
# final name of the attribute on the leaf node
return entity, name_s[-1]
class LocalModel(Model):
"""
Concrete model aimed at cases where data source based
persistence is not required, while the remainder model
features are useful.
Typical uses cases involve API based validation or local
transient model usage.
"""
@classmethod
def is_attached(cls):
return False
class Field(dict):
"""
Top level field class that should be used for the
definition of the various fields of the model, a
dictionary may be sued alternatively but some of the
ordering features and other are going o be lost.
"""
creation_counter = 0
""" The global static creation counter value that
will be used to create an order in the declaration
of attributes for a class "decorated" with the
ordered attributes metaclass """
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.creation_counter = Field.creation_counter
Field.creation_counter += 1
def __getattr__(self, name):
if name in self:
return self[name]
raise AttributeError("'%s' not found" % name)
class Action(dict):
"""
The abstract class that defines an action to be performed
by an end used this should be used as a placeholder for
the generic logic of any action.
The base interface should conform with the dictionary
interface in order to provide backwards compatibility.
"""
def __getattr__(self, name):
if name in self:
return self[name]
raise AttributeError("'%s' not found" % name)
def cast(self, values, keyword=False):
"""
Runs the "casting" operation for a series of provided
values, the order sequence of the provided values is
relevant and should conform with the specified order
for the operation parameters.
:type values: List
:param values: The sequence containing the various values
that are going to be casted according to the operation spec.
:type keyword: bool
:param keyword: If a keyword based dictionary of values
should be created instead for dynamic binding.
:rtype: List/Dictionary
:return: The sequence of values now casted with the proper
data types as specified in operation structure or alternatively
the same values under a named dictionary.
"""
# creates the list/map that will hold the "final" casted values
# should have the same size of the input values list
casted = {} if keyword else []
# retrieves the reference to the parameters specification
# and then uses it in the iteration for the casting of the
# various "passed" raw values
parameters = self.get("parameters", [])
for value, parameters in zip(values, parameters):
name = parameters[1]
type = parameters[2]
cast = type
is_default = value in (None, "")
cast = BUILDERS.get(cast, cast)
if cast and not is_default:
value = cast(value)
if is_default:
value = type_d(type, value)
if keyword:
casted[name] = value
else:
casted.append(value)
# returns the final list/map of casted values to the caller method
# so that it may be used safely in the context
return casted
class Link(Action):
"""
Internal link class used to encapsulate some of the
internal concepts associated with a link, providing
an easy to use structure at runtime.
"""
pass
class Operation(Action):
"""
Logical structure representing an operation, should
provide a simple interface for interaction with the
operation and inputs/outputs of it.
"""
pass
class View(Action):
"""
Definition associated with a model that represents
the information to be used for the display of associated
set of elements/items of a model.
"""
pass
def link(name=None, description=None, parameters=(), context=False, devel=False):
"""
Decorator function to be used to "annotate" the provided
function as an link (string) that is able to change the user
agent to a location of a certain interest.
Proper usage of the link definition/decoration is context
based and should vary based on application.
:type name: String
:param name: The name of the link (in plain english) so that
a better user experience is possible.
:type description: String
:param description: The description of the link (in plain english)
so that a better user experience is possible.
:type parameters: Tuple
:param parameters: The sequence containing tuples that describe
the various parameters to be send to the link.
:type context: bool
:param context: If the context (target) of models should be set
in the link redirection, this is only applicable for global links.
:type devel: bool
:param devel: If the link should only be used/available under
development like environments (eg: debugging purposes).
:rtype: Function
:return: The decorator function that is going to be used to
generated the final function to be called.
"""
def decorator(function, *args, **kwargs):
function._link = Link(
method=function.__name__,
name=name or function.__name__,
description=description,
parameters=parameters,
context=context,
devel=devel,
)
return function
return decorator
def operation(
name=None,
description=None,
parameters=(),
kwargs=None,
factory=False,
level=1,
devel=False,
):
"""
Decorator function to be used to "annotate" the provided
function as an operation that is able to change the current
entity state and behavior.
Proper usage of the operation definition/decoration is context
based and should vary based on application.
:type name: String
:param name: The name of the operation (in plain english)
so that a better user experience is possible.
:type description: String
:param description: The description of the operation (in plain english)
so that a better user experience is possible.
:type parameters: Tuple
:param parameters: The sequence containing tuples that describe
the various parameters to be send to the operation.
:type kwargs: Dictionary
:param kwargs: The keyword based arguments that are going to be used
as the basis for the building of the entity retrieval query, these
values should be compliant with the "model's query language".
:type factory: bool
:param factory: If the operation is considered to be a factory
meaning that a new entity is going to be created and/or updated.
:type level: int
:param level: The severity level of the operation, the higher
values will be considered more severe than the lower ones,
evaluation of the value will be context based.
:type devel: bool
:param devel: If the operation should only be used/available under
development like environments (eg: debugging purposes).
:rtype: Function
:return: The decorator function that is going to be used to
generated the final function to be called.
"""
_kwargs = kwargs
def decorator(function, *args, **kwargs):
function._operation = Operation(
method=function.__name__,
name=name or function.__name__,
description=description,
parameters=parameters,
kwargs=_kwargs,
factory=factory,
level=level,
devel=devel,
)
return function
return decorator
def view(name=None, description=None, parameters=(), devel=False):
"""
Decorator function to be used to "annotate" the provided
function as an view that is able to return a set of configurations
for the proper display of associated information.
Proper usage of the view definition/decoration is context
based and should vary based on application.
:type name: String
:param name: The name of the view (in plain english)
so that a better user experience is possible.
:type description: String
:param description: The description of the view (in plain english)
so that a better user experience is possible.
:type parameters: Tuple
:param parameters: The sequence containing tuples that describe
the various parameters to be send to the view.
:type devel: bool
:param devel: If the view should only be used/available under
development like environments (eg: debugging purposes).
:rtype: Function
:return: The decorator function that is going to be used to
generated the final function to be called.
"""
def decorator(function, *args, **kwargs):
function._view = View(
method=function.__name__,
name=name or function.__name__,
description=description,
parameters=parameters,
devel=devel,
)
return function
return decorator
def type_d(type, default=None):
"""
Retrieves the default (initial) value for the a certain
provided data type falling back to the provided default
value in case it's not possible to retrieve a new valid
default for value for the type.
The process of retrieval of the default value to a certain
type may include the calling of a lambda function to obtain
a new instance of the default value, this avoid the usage
of global shared structures for the default values, that
could cause extremely confusing situations.
:type type: Class
:param type: The data type object for which to retrieve its
default value.
:type default: Object
:param default: The default value to be returned in case it's
not possible to retrieve a better one.
:rtype: Object
:return: The "final" default value for the data type according
to the best possible strategy.
"""
if not type in TYPE_DEFAULTS:
return default
default = TYPE_DEFAULTS[type]
if not hasattr(default, "__call__"):
return default
return default()
def is_unset(value):
"""
Verifies if the provided value is unset trying the multiple
approaches available for such verification, notice that some
of the verifications imply resolution (data source access) and
as such are considered to be very expensive.
:type value: Object
:param value: The value that is going to be verified for valid
value, if it's a reference an expensive operation of resolution
may be performed.
:rtype: bool
:return: If the provided value is unset/invalid according to the
underlying promises of the data type of the provided value.
"""
if value == None:
return True
if isinstance(value, typesf.Reference) and not value.is_resolvable():
return True
return False
field = Field