modm_data.owl

 1# Copyright 2022, Niklas Hauser
 2# SPDX-License-Identifier: MPL-2.0
 3
 4from .store import Store
 5from .identifier import DeviceIdentifier
 6
 7__all__ = [
 8    "stmicro",
 9    "Store",
10    "DeviceIdentifier",
11]
class Store:
39class Store:
40    def __init__(self, vendor, device):
41        self.base_url = "https://data.modm.io/kg"
42        self.vendor = vendor
43        self.device = device
44        self._path = ext_path(f"{vendor}/owl")
45        # Add the directory to the search path
46        owl.onto_path.append(self._path)
47        self.ontology = owl.get_ontology(f"{self.base_url}/{vendor}/{device}")
48
49    # def set_device_iri(self, device):
50    #     self.ontology.set_base_iri(f"{self.base_url}/{self.vendor}/{device}", rename_entities=False)
51
52    def namespace(self, name):
53        return self.ontology.get_namespace(f"{self.vendor}/{name}")
54
55    def load(self, name=None):
56        if name is None:
57            name = "ontology"
58        fileobj = open(self._path / f"{name}.owl", "rb")
59        self.ontology.load(only_local=True, fileobj=fileobj, reload=True)
60
61    def save(self, name=None):
62        self._path.mkdir(exist_ok=True, parents=True)
63        if name is None:
64            name = "ontology"
65        file = str(self._path / f"{name}.owl")
66        self.ontology.save(file=file)
67
68        # dom = ET.parse(file)
69        # xslt = ET.XML(XSLT_SORT)
70        # transform = ET.XSLT(xslt)
71        # newdom = transform(dom)
72        # Path(file).write_bytes(ET.tostring(newdom, pretty_print=True))
73
74        # owl.default_world.set_backend(filename=str(self._path / f"{name}.sqlite3"))
75        # owl.default_world.save()
76
77    def clear(self):
78        # self.ontology._destroy_cached_entities()
79        self.ontology.world._destroy_cached_entities()
80        # self.ontology.destroy()
81        # self.ontology = owl.get_ontology(f"{self.base_url}/{self.vendor}")
82
83    def __repr__(self) -> str:
84        return f"Store({self.vendor}/{self.device})"
Store(vendor, device)
40    def __init__(self, vendor, device):
41        self.base_url = "https://data.modm.io/kg"
42        self.vendor = vendor
43        self.device = device
44        self._path = ext_path(f"{vendor}/owl")
45        # Add the directory to the search path
46        owl.onto_path.append(self._path)
47        self.ontology = owl.get_ontology(f"{self.base_url}/{vendor}/{device}")
base_url
vendor
device
ontology
def namespace(self, name):
52    def namespace(self, name):
53        return self.ontology.get_namespace(f"{self.vendor}/{name}")
def load(self, name=None):
55    def load(self, name=None):
56        if name is None:
57            name = "ontology"
58        fileobj = open(self._path / f"{name}.owl", "rb")
59        self.ontology.load(only_local=True, fileobj=fileobj, reload=True)
def save(self, name=None):
61    def save(self, name=None):
62        self._path.mkdir(exist_ok=True, parents=True)
63        if name is None:
64            name = "ontology"
65        file = str(self._path / f"{name}.owl")
66        self.ontology.save(file=file)
67
68        # dom = ET.parse(file)
69        # xslt = ET.XML(XSLT_SORT)
70        # transform = ET.XSLT(xslt)
71        # newdom = transform(dom)
72        # Path(file).write_bytes(ET.tostring(newdom, pretty_print=True))
73
74        # owl.default_world.set_backend(filename=str(self._path / f"{name}.sqlite3"))
75        # owl.default_world.save()
def clear(self):
77    def clear(self):
78        # self.ontology._destroy_cached_entities()
79        self.ontology.world._destroy_cached_entities()
80        # self.ontology.destroy()
81        # self.ontology = owl.get_ontology(f"{self.base_url}/{self.vendor}")
class DeviceIdentifier:
10class DeviceIdentifier:
11    def __init__(self, naming_schema=None):
12        self.naming_schema = naming_schema
13        self._properties = OrderedDict()
14        self.__string = None
15        self.__ustring = None
16        self.__hash = None
17
18    @property
19    def _ustring(self):
20        if self.__ustring is None:
21            self.__ustring = "".join([k + self._properties[k] for k in sorted(self._properties.keys())])
22            if self.naming_schema:
23                self.__ustring += self.naming_schema
24        return self.__ustring
25
26    def copy(self):
27        identifier = DeviceIdentifier(self.naming_schema)
28        identifier._properties = copy.deepcopy(self._properties)
29        identifier.__string = self.__string
30        identifier.__ustring = self.__ustring
31        identifier.__hash = self.__hash
32        return identifier
33
34    def keys(self):
35        return self._properties.keys()
36
37    @property
38    def string(self):
39        # if no naming schema is available, throw up
40        if self.naming_schema is None:
41            raise ValueError("Naming schema is missing!")
42        # Use the naming schema to generate the string
43        if self.__string is None:
44            self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties))
45        return self.__string
46
47    def set(self, key, value):
48        self.__hash = None
49        self.__string = None
50        self.__ustring = None
51        self._properties[key] = value
52
53    def get(self, key, default=None):
54        return self._properties.get(key, default)
55
56    def __getitem__(self, key):
57        return self.get(key, None)
58
59    def __getattr__(self, attr):
60        val = self.get(attr, None)
61        if val is None:
62            raise AttributeError(f"'{self!r}' has no property '{attr}'")
63        return val
64
65    def __eq__(self, other):
66        return self._ustring == other._ustring
67
68    def __ne__(self, other):
69        return not self == other
70
71    def __hash__(self):
72        if self.__hash is None:
73            self.__hash = hash(self._ustring)
74        return self.__hash
75
76    def __str__(self):
77        return self.string
78
79    def __repr__(self):
80        return self.string if self.naming_schema else f"DeviceId({self._ustring})"
DeviceIdentifier(naming_schema=None)
11    def __init__(self, naming_schema=None):
12        self.naming_schema = naming_schema
13        self._properties = OrderedDict()
14        self.__string = None
15        self.__ustring = None
16        self.__hash = None
naming_schema
def copy(self):
26    def copy(self):
27        identifier = DeviceIdentifier(self.naming_schema)
28        identifier._properties = copy.deepcopy(self._properties)
29        identifier.__string = self.__string
30        identifier.__ustring = self.__ustring
31        identifier.__hash = self.__hash
32        return identifier
def keys(self):
34    def keys(self):
35        return self._properties.keys()
string
37    @property
38    def string(self):
39        # if no naming schema is available, throw up
40        if self.naming_schema is None:
41            raise ValueError("Naming schema is missing!")
42        # Use the naming schema to generate the string
43        if self.__string is None:
44            self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties))
45        return self.__string
def set(self, key, value):
47    def set(self, key, value):
48        self.__hash = None
49        self.__string = None
50        self.__ustring = None
51        self._properties[key] = value
def get(self, key, default=None):
53    def get(self, key, default=None):
54        return self._properties.get(key, default)