modm_data.kg.stmicro
1# Copyright 2022, Niklas Hauser 2# SPDX-License-Identifier: MPL-2.0 3 4from .identifier import did_from_string 5from .schema import kg_create 6from .model import ( 7 kg_from_cubemx, 8 kg_from_header, 9) 10 11__all__ = [ 12 "did_from_string", 13 "kg_create", 14 "kg_from_cubemx", 15 "kg_from_header", 16]
8def did_from_string(string) -> DeviceIdentifier: 9 string = string.lower() 10 11 if string.startswith("stm32"): 12 schema = "{platform}{family}{name}{pin}{size}{package}{temperature}{variant}" 13 if "@" in string: 14 schema += "@{core}" 15 i = DeviceIdentifier(schema) 16 i.set("platform", "stm32") 17 i.set("family", string[5:7]) 18 i.set("name", string[7:9]) 19 i.set("pin", string[9]) 20 i.set("size", string[10]) 21 i.set("package", string[11]) 22 i.set("temperature", string[12]) 23 if "@" in string: 24 string, core = string.split("@") 25 i.set("core", core) 26 if len(string) >= 14: 27 i.set("variant", string[13]) 28 else: 29 i.set("variant", "") 30 return i 31 32 raise ValueError(f"Unknown identifier '{string}'!")
def
kg_create(did):
9def kg_create(did): 10 db = Store("stmicro", did) 11 schema = "" 12 13 # --------------------------- DEVICE IDENTIFIER --------------------------- 14 schema += "CREATE NODE TABLE DeviceIdentifier(full STRING PRIMARY KEY);\n" 15 16 # ------------------------------ PERIPHERALS ------------------------------ 17 schema += "CREATE NODE TABLE Peripheral(name STRING PRIMARY KEY, type STRING, instance UINT8, version STRING);\n" 18 19 # --------------------------------- PINS ---------------------------------- 20 schema += "CREATE NODE TABLE Package(name STRING PRIMARY KEY, pins UINT16);\n" 21 schema += "CREATE NODE TABLE Pin(name STRING PRIMARY KEY, type STRING, variant STRING, port STRING, pin UINT8);\n" 22 schema += "CREATE REL TABLE hasPin(FROM Package TO Pin, location STRING);\n" 23 24 # -------------------------------- SIGNALS -------------------------------- 25 schema += "CREATE NODE TABLE Signal(name STRING PRIMARY KEY);\n" 26 schema += "CREATE REL TABLE hasSignal(FROM Peripheral TO Signal, shift UINT8, mask UINT8);\n" 27 schema += "CREATE REL TABLE hasSignalPin(FROM Signal TO Pin, peripheral STRING, index UINT8);\n" 28 29 # ------------------------------- MEMORIES -------------------------------- 30 schema += "CREATE NODE TABLE Memory(name STRING PRIMARY KEY, address UINT32, size UINT32, access STRING);\n" 31 32 # ------------------------------ INTERRUPTS ------------------------------- 33 schema += "CREATE NODE TABLE InterruptVector(name STRING PRIMARY KEY, position INT16);\n" 34 schema += "CREATE REL TABLE hasInterruptVector(FROM Peripheral TO InterruptVector);\n" 35 36 # ----------------------------- FLASH LATENCY ----------------------------- 37 schema += "CREATE NODE TABLE FlashWaitState(minVoltage UINT16 PRIMARY KEY, maxFrequency UINT32[]);\n" 38 39 # Enforce unique name+type combinations due to Kuzu limitations 40 members = {} 41 for name, typename in re.findall(r"[\(,] ?([a-z]\w+) ([A-Z0-9\[\]]+)", schema): 42 assert members.get(name, typename) == typename, ( 43 f"Member '{name} {typename}' has different type than '{name} {members[name]}'!" 44 ) 45 members[name] = typename 46 47 db.execute(schema) 48 return db
def
kg_from_cubemx(db, data):
8def kg_from_cubemx(db, data): 9 did = data["id"] 10 print(did.string) 11 # core = f", core: '{did.core}'" if did.get("core", False) else "" 12 variant = f", variant: '{did.variant}'" if did.get("variant", False) else "" 13 db.execute(f"CREATE (:DeviceIdentifier {{full: '{did.string.lower()}'}});") 14 # db.execute(f""" 15 # CREATE (:DeviceIdentifier {{ 16 # full: '{did.string.lower()}', 17 # schema: '{did.naming_schema}', 18 # platform: '{did.platform}', 19 # family: '{did.family}', 20 # name: '{did.name}', 21 # pin: '{did.pin}', 22 # size: '{did.size}', 23 # package: '{did.package}', 24 # temperature: '{did.temperature}' 25 # {variant} 26 # {core} 27 # }}); 28 # """) 29 30 # Add internal memories 31 for memory in sorted(data["memories"], key=lambda m: m["start"]): 32 db.execute(f""" 33 CREATE (:Memory {{ 34 name: '{memory["name"].upper()}', 35 address: {memory["start"]}, 36 size: {memory["size"]}, 37 access: '{memory["access"]}' 38 }}); 39 """) 40 41 # Add the peripherals and their type 42 for pbase, name, version, ptype, features, stype in sorted(data["modules"]): 43 instance = "NULL" 44 if pbase != name: 45 try: 46 instance = int(name.replace(pbase, "")) 47 except ValueError: 48 pass 49 db.execute(f""" 50 CREATE (:Peripheral {{ 51 name: '{name.upper()}', 52 type: '{pbase.upper()}', 53 instance: {instance}, 54 version: '{ptype}' 55 }}); 56 """) 57 58 # Add package 59 db.execute(f""" 60 CREATE (:Package {{ 61 name: '{data["package"]}', 62 pins: {int(data["pin-count"])} 63 }}); 64 """) 65 66 # Add pinout for package 67 for pin in sorted(data["pinout"], key=lambda p: p["name"]): 68 variant, port, number = "", "", "" 69 if pin["type"] == "I/O" and (pnumber := re.search(r"P\w(\d+)", pin["name"])): 70 port = f", port: '{pin['name'][1]}'" 71 number = f", pin: {int(pnumber.group(1))}" 72 if var := pin.get("variant"): 73 variant = f", variant: '{var}'" 74 db.execute(f""" 75 MERGE (:Pin {{ 76 name: '{pin["name"]}', 77 type: '{pin["type"].lower()}' 78 {variant} 79 {port} 80 {number} 81 }}); 82 MATCH (pa:Package), (pi:Pin {{name: '{pin["name"]}'}}) 83 CREATE (pa)-[:hasPin {{location: '{pin["position"]}'}}]->(pi); 84 """) 85 86 # Add signals 87 for signal in sorted(data["signals"]): 88 db.execute(f"CREATE (:Signal {{name: '{signal.upper()}'}});") 89 90 # Add alternate and additional functions to pins 91 for port, number, signals in sorted(data["gpios"]): 92 for signal in sorted(signals, key=lambda s: s["name"]): 93 peripheral = (signal["driver"] or "").upper() + (signal["instance"] or "") 94 name = signal["name"].upper() 95 db.execute(f""" 96 MATCH (p:Peripheral {{name: '{peripheral}'}}), (s:Signal {{name: '{name}'}}) 97 MERGE (p)-[:hasSignal]->(s); 98 """) 99 index = "" 100 if af := signal["af"]: 101 index = f", index: {af}" 102 db.execute(f""" 103 MATCH (p:Pin {{port: '{port.upper()}', pin: {number}}}), (s:Signal {{name: '{name}'}}) 104 MERGE (s)-[:hasSignalPin {{peripheral: '{peripheral}'{index}}}]->(p); 105 """) 106 107 for peripheral, remap in data["remaps"].items(): 108 for index, groups in remap["groups"].items(): 109 for signal in groups: 110 sname = signal["name"].upper() 111 pname = peripheral.upper() 112 db.execute(f""" 113 MATCH (p:Peripheral {{name: '{pname}'}}), (s:Signal {{name: '{sname}'}}) 114 MERGE (p)-[:hasSignal {{shift: {remap["position"]}, mask: {remap["mask"]}}}]->(s); 115 116 MATCH (p:Pin {{port: '{signal["port"].upper()}', pin: {signal["pin"]}}}), (s:Signal {{name: '{sname}'}}) 117 MERGE (s)-[:hasSignalPin {{peripheral: '{pname}', index: {index}}}]->(p); 118 """) 119 120 for mV, freqs in data["flash_latency"].items(): 121 db.execute(f""" 122 CREATE (:FlashWaitState {{ 123 minVoltage: {mV}, 124 maxFrequency: [{", ".join(map(str, freqs))}] 125 }}); 126 """)
def
kg_from_header(db, header):
129def kg_from_header(db, header): 130 # Add interrupt vector table 131 for position, name in sorted(header["irqs"]): 132 db.execute(f""" 133 CREATE (:InterruptVector {{ 134 name: '{name}', 135 position: {position} 136 }}); 137 MATCH (p:Peripheral), (i:InterruptVector {{name: '{name}'}}) 138 WHERE '{name}' CONTAINS p.name 139 CREATE (p)-[:hasInterruptVector]->(i); 140 """)