modm_data.kg

 1# Copyright 2022, Niklas Hauser
 2# SPDX-License-Identifier: MPL-2.0
 3
 4from .store import Store
 5from .identifier import DeviceIdentifier
 6
 7__all__ = [
 8    "stmicro",
 9    "Store",
10    "DeviceIdentifier",
11]
class Store:
10class Store:
11    def __init__(self, vendor, device):
12        self.vendor = vendor
13        self.device = device
14        self._path = ext_path(f"{vendor}/kg-archive/{device.string}")
15        self.db = kuzu.Database(":memory:")
16        self.conn = kuzu.Connection(self.db)
17
18    def execute(self, command):
19        # from textwrap import dedent
20        # print(dedent(command))
21        self.conn.execute(command)
22
23    def load(self):
24        self.conn.execute(f"IMPORT DATABASE '{self._path}';")
25
26    def save(self, overwrite=True) -> bool:
27        if self._path.exists():
28            if not overwrite:
29                return False
30            shutil.rmtree(self._path)
31        self.conn.execute(f"EXPORT DATABASE '{self._path}' (format='csv');")
32        return True
33
34    def __repr__(self) -> str:
35        return f"Store({self.vendor}/{self.device})"
Store(vendor, device)
11    def __init__(self, vendor, device):
12        self.vendor = vendor
13        self.device = device
14        self._path = ext_path(f"{vendor}/kg-archive/{device.string}")
15        self.db = kuzu.Database(":memory:")
16        self.conn = kuzu.Connection(self.db)
vendor
device
db
conn
def execute(self, command):
18    def execute(self, command):
19        # from textwrap import dedent
20        # print(dedent(command))
21        self.conn.execute(command)
def load(self):
23    def load(self):
24        self.conn.execute(f"IMPORT DATABASE '{self._path}';")
def save(self, overwrite=True) -> bool:
26    def save(self, overwrite=True) -> bool:
27        if self._path.exists():
28            if not overwrite:
29                return False
30            shutil.rmtree(self._path)
31        self.conn.execute(f"EXPORT DATABASE '{self._path}' (format='csv');")
32        return True
class DeviceIdentifier:
10class DeviceIdentifier:
11    def __init__(self, naming_schema=None):
12        self.naming_schema = naming_schema
13        self._properties = OrderedDict()
14        self.__string = None
15        self.__ustring = None
16        self.__hash = None
17
18    @property
19    def _ustring(self):
20        if self.__ustring is None:
21            self.__ustring = "".join([k + self._properties[k] for k in sorted(self._properties.keys())])
22            if self.naming_schema:
23                self.__ustring += self.naming_schema
24        return self.__ustring
25
26    def copy(self):
27        identifier = DeviceIdentifier(self.naming_schema)
28        identifier._properties = copy.deepcopy(self._properties)
29        identifier.__string = self.__string
30        identifier.__ustring = self.__ustring
31        identifier.__hash = self.__hash
32        return identifier
33
34    def keys(self):
35        return self._properties.keys()
36
37    @property
38    def string(self):
39        # if no naming schema is available, throw up
40        if self.naming_schema is None:
41            raise ValueError("Naming schema is missing!")
42        # Use the naming schema to generate the string
43        if self.__string is None:
44            self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties))
45        return self.__string
46
47    def set(self, key, value):
48        self.__hash = None
49        self.__string = None
50        self.__ustring = None
51        self._properties[key] = value
52
53    def get(self, key, default=None):
54        return self._properties.get(key, default)
55
56    def __getitem__(self, key):
57        return self.get(key, None)
58
59    def __getattr__(self, attr):
60        val = self.get(attr, None)
61        if val is None:
62            raise AttributeError(f"'{self!r}' has no property '{attr}'")
63        return val
64
65    def __eq__(self, other):
66        return self._ustring == other._ustring
67
68    def __ne__(self, other):
69        return not self == other
70
71    def __hash__(self):
72        if self.__hash is None:
73            self.__hash = hash(self._ustring)
74        return self.__hash
75
76    def __str__(self):
77        return self.string
78
79    def __repr__(self):
80        return self.string if self.naming_schema else f"DeviceId({self._ustring})"
DeviceIdentifier(naming_schema=None)
11    def __init__(self, naming_schema=None):
12        self.naming_schema = naming_schema
13        self._properties = OrderedDict()
14        self.__string = None
15        self.__ustring = None
16        self.__hash = None
naming_schema
def copy(self):
26    def copy(self):
27        identifier = DeviceIdentifier(self.naming_schema)
28        identifier._properties = copy.deepcopy(self._properties)
29        identifier.__string = self.__string
30        identifier.__ustring = self.__ustring
31        identifier.__hash = self.__hash
32        return identifier
def keys(self):
34    def keys(self):
35        return self._properties.keys()
string
37    @property
38    def string(self):
39        # if no naming schema is available, throw up
40        if self.naming_schema is None:
41            raise ValueError("Naming schema is missing!")
42        # Use the naming schema to generate the string
43        if self.__string is None:
44            self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties))
45        return self.__string
def set(self, key, value):
47    def set(self, key, value):
48        self.__hash = None
49        self.__string = None
50        self.__ustring = None
51        self._properties[key] = value
def get(self, key, default=None):
53    def get(self, key, default=None):
54        return self._properties.get(key, default)