modm_data.kg
class
Store:
10class Store: 11 def __init__(self, vendor, device): 12 self.vendor = vendor 13 self.device = device 14 self._path = ext_path(f"{vendor}/kg-archive/{device.string}") 15 self.db = kuzu.Database(":memory:") 16 self.conn = kuzu.Connection(self.db) 17 18 def execute(self, command): 19 # from textwrap import dedent 20 # print(dedent(command)) 21 self.conn.execute(command) 22 23 def load(self): 24 self.conn.execute(f"IMPORT DATABASE '{self._path}';") 25 26 def save(self, overwrite=True) -> bool: 27 if self._path.exists(): 28 if not overwrite: 29 return False 30 shutil.rmtree(self._path) 31 self.conn.execute(f"EXPORT DATABASE '{self._path}' (format='csv');") 32 return True 33 34 def __repr__(self) -> str: 35 return f"Store({self.vendor}/{self.device})"
class
DeviceIdentifier:
10class DeviceIdentifier: 11 def __init__(self, naming_schema=None): 12 self.naming_schema = naming_schema 13 self._properties = OrderedDict() 14 self.__string = None 15 self.__ustring = None 16 self.__hash = None 17 18 @property 19 def _ustring(self): 20 if self.__ustring is None: 21 self.__ustring = "".join([k + self._properties[k] for k in sorted(self._properties.keys())]) 22 if self.naming_schema: 23 self.__ustring += self.naming_schema 24 return self.__ustring 25 26 def copy(self): 27 identifier = DeviceIdentifier(self.naming_schema) 28 identifier._properties = copy.deepcopy(self._properties) 29 identifier.__string = self.__string 30 identifier.__ustring = self.__ustring 31 identifier.__hash = self.__hash 32 return identifier 33 34 def keys(self): 35 return self._properties.keys() 36 37 @property 38 def string(self): 39 # if no naming schema is available, throw up 40 if self.naming_schema is None: 41 raise ValueError("Naming schema is missing!") 42 # Use the naming schema to generate the string 43 if self.__string is None: 44 self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties)) 45 return self.__string 46 47 def set(self, key, value): 48 self.__hash = None 49 self.__string = None 50 self.__ustring = None 51 self._properties[key] = value 52 53 def get(self, key, default=None): 54 return self._properties.get(key, default) 55 56 def __getitem__(self, key): 57 return self.get(key, None) 58 59 def __getattr__(self, attr): 60 val = self.get(attr, None) 61 if val is None: 62 raise AttributeError(f"'{self!r}' has no property '{attr}'") 63 return val 64 65 def __eq__(self, other): 66 return self._ustring == other._ustring 67 68 def __ne__(self, other): 69 return not self == other 70 71 def __hash__(self): 72 if self.__hash is None: 73 self.__hash = hash(self._ustring) 74 return self.__hash 75 76 def __str__(self): 77 return self.string 78 79 def __repr__(self): 80 return self.string if self.naming_schema else f"DeviceId({self._ustring})"
string
37 @property 38 def string(self): 39 # if no naming schema is available, throw up 40 if self.naming_schema is None: 41 raise ValueError("Naming schema is missing!") 42 # Use the naming schema to generate the string 43 if self.__string is None: 44 self.__string = string.Formatter().vformat(self.naming_schema, (), defaultdict(str, **self._properties)) 45 return self.__string