modm_data.kg.stmicro

 1# Copyright 2022, Niklas Hauser
 2# SPDX-License-Identifier: MPL-2.0
 3
 4from .identifier import did_from_string
 5from .schema import kg_create
 6from .model import (
 7    kg_from_cubemx,
 8    kg_from_header,
 9)
10
11__all__ = [
12    "did_from_string",
13    "kg_create",
14    "kg_from_cubemx",
15    "kg_from_header",
16]
def did_from_string(string) -> modm_data.kg.DeviceIdentifier:
 8def did_from_string(string) -> DeviceIdentifier:
 9    string = string.lower()
10
11    if string.startswith("stm32"):
12        schema = "{platform}{family}{name}{pin}{size}{package}{temperature}{variant}"
13        if "@" in string:
14            schema += "@{core}"
15        i = DeviceIdentifier(schema)
16        i.set("platform", "stm32")
17        i.set("family", string[5:7])
18        i.set("name", string[7:9])
19        i.set("pin", string[9])
20        i.set("size", string[10])
21        i.set("package", string[11])
22        i.set("temperature", string[12])
23        if "@" in string:
24            string, core = string.split("@")
25            i.set("core", core)
26        if len(string) >= 14:
27            i.set("variant", string[13])
28        else:
29            i.set("variant", "")
30        return i
31
32    raise ValueError(f"Unknown identifier '{string}'!")
def kg_create(did):
 9def kg_create(did):
10    db = Store("stmicro", did)
11    schema = ""
12
13    # --------------------------- DEVICE IDENTIFIER ---------------------------
14    schema += "CREATE NODE TABLE DeviceIdentifier(full STRING PRIMARY KEY);\n"
15
16    # ------------------------------ PERIPHERALS ------------------------------
17    schema += "CREATE NODE TABLE Peripheral(name STRING PRIMARY KEY, type STRING, instance UINT8, version STRING);\n"
18
19    # --------------------------------- PINS ----------------------------------
20    schema += "CREATE NODE TABLE Package(name STRING PRIMARY KEY, pins UINT16);\n"
21    schema += "CREATE NODE TABLE Pin(name STRING PRIMARY KEY, type STRING, variant STRING, port STRING, pin UINT8);\n"
22    schema += "CREATE REL TABLE hasPin(FROM Package TO Pin, location STRING);\n"
23
24    # -------------------------------- SIGNALS --------------------------------
25    schema += "CREATE NODE TABLE Signal(name STRING PRIMARY KEY);\n"
26    schema += "CREATE REL TABLE hasSignal(FROM Peripheral TO Signal, shift UINT8, mask UINT8);\n"
27    schema += "CREATE REL TABLE hasSignalPin(FROM Signal TO Pin, peripheral STRING, index UINT8);\n"
28
29    # ------------------------------- MEMORIES --------------------------------
30    schema += "CREATE NODE TABLE Memory(name STRING PRIMARY KEY, address UINT32, size UINT32, access STRING);\n"
31
32    # ------------------------------ INTERRUPTS -------------------------------
33    schema += "CREATE NODE TABLE InterruptVector(name STRING PRIMARY KEY, position INT16);\n"
34    schema += "CREATE REL TABLE hasInterruptVector(FROM Peripheral TO InterruptVector);\n"
35
36    # ----------------------------- FLASH LATENCY -----------------------------
37    schema += "CREATE NODE TABLE FlashWaitState(minVoltage UINT16 PRIMARY KEY, maxFrequency UINT32[]);\n"
38
39    # Enforce unique name+type combinations due to Kuzu limitations
40    members = {}
41    for name, typename in re.findall(r"[\(,] ?([a-z]\w+) ([A-Z0-9\[\]]+)", schema):
42        assert members.get(name, typename) == typename, (
43            f"Member '{name} {typename}' has different type than '{name} {members[name]}'!"
44        )
45        members[name] = typename
46
47    db.execute(schema)
48    return db
def kg_from_cubemx(db, data):
  8def kg_from_cubemx(db, data):
  9    did = data["id"]
 10    print(did.string)
 11    # core = f", core: '{did.core}'" if did.get("core", False) else ""
 12    variant = f", variant: '{did.variant}'" if did.get("variant", False) else ""
 13    db.execute(f"CREATE (:DeviceIdentifier {{full: '{did.string.lower()}'}});")
 14    # db.execute(f"""
 15    #     CREATE (:DeviceIdentifier {{
 16    #         full: '{did.string.lower()}',
 17    #         schema: '{did.naming_schema}',
 18    #         platform: '{did.platform}',
 19    #         family: '{did.family}',
 20    #         name: '{did.name}',
 21    #         pin: '{did.pin}',
 22    #         size: '{did.size}',
 23    #         package: '{did.package}',
 24    #         temperature: '{did.temperature}'
 25    #         {variant}
 26    #         {core}
 27    #     }});
 28    # """)
 29
 30    # Add internal memories
 31    for memory in sorted(data["memories"], key=lambda m: m["start"]):
 32        db.execute(f"""
 33            CREATE (:Memory {{
 34                name: '{memory["name"].upper()}',
 35                address: {memory["start"]},
 36                size: {memory["size"]},
 37                access: '{memory["access"]}'
 38            }});
 39        """)
 40
 41    # Add the peripherals and their type
 42    for pbase, name, version, ptype, features, stype in sorted(data["modules"]):
 43        instance = "NULL"
 44        if pbase != name:
 45            try:
 46                instance = int(name.replace(pbase, ""))
 47            except ValueError:
 48                pass
 49        db.execute(f"""
 50            CREATE (:Peripheral {{
 51                name: '{name.upper()}',
 52                type: '{pbase.upper()}',
 53                instance: {instance},
 54                version: '{ptype}'
 55            }});
 56        """)
 57
 58    # Add package
 59    db.execute(f"""
 60        CREATE (:Package {{
 61            name: '{data["package"]}',
 62            pins: {int(data["pin-count"])}
 63        }});
 64    """)
 65
 66    # Add pinout for package
 67    for pin in sorted(data["pinout"], key=lambda p: p["name"]):
 68        variant, port, number = "", "", ""
 69        if pin["type"] == "I/O" and (pnumber := re.search(r"P\w(\d+)", pin["name"])):
 70            port = f", port: '{pin['name'][1]}'"
 71            number = f", pin: {int(pnumber.group(1))}"
 72        if var := pin.get("variant"):
 73            variant = f", variant: '{var}'"
 74        db.execute(f"""
 75            MERGE (:Pin {{
 76                name: '{pin["name"]}',
 77                type: '{pin["type"].lower()}'
 78                {variant}
 79                {port}
 80                {number}
 81            }});
 82            MATCH (pa:Package), (pi:Pin {{name: '{pin["name"]}'}})
 83            CREATE (pa)-[:hasPin {{location: '{pin["position"]}'}}]->(pi);
 84        """)
 85
 86    # Add signals
 87    for signal in sorted(data["signals"]):
 88        db.execute(f"CREATE (:Signal {{name: '{signal.upper()}'}});")
 89
 90    # Add alternate and additional functions to pins
 91    for port, number, signals in sorted(data["gpios"]):
 92        for signal in sorted(signals, key=lambda s: s["name"]):
 93            peripheral = (signal["driver"] or "").upper() + (signal["instance"] or "")
 94            name = signal["name"].upper()
 95            db.execute(f"""
 96                MATCH (p:Peripheral {{name: '{peripheral}'}}), (s:Signal {{name: '{name}'}})
 97                MERGE (p)-[:hasSignal]->(s);
 98            """)
 99            index = ""
100            if af := signal["af"]:
101                index = f", index: {af}"
102            db.execute(f"""
103                MATCH (p:Pin {{port: '{port.upper()}', pin: {number}}}), (s:Signal {{name: '{name}'}})
104                MERGE (s)-[:hasSignalPin {{peripheral: '{peripheral}'{index}}}]->(p);
105            """)
106
107    for peripheral, remap in data["remaps"].items():
108        for index, groups in remap["groups"].items():
109            for signal in groups:
110                sname = signal["name"].upper()
111                pname = peripheral.upper()
112                db.execute(f"""
113                    MATCH (p:Peripheral {{name: '{pname}'}}), (s:Signal {{name: '{sname}'}})
114                    MERGE (p)-[:hasSignal {{shift: {remap["position"]}, mask: {remap["mask"]}}}]->(s);
115
116                    MATCH (p:Pin {{port: '{signal["port"].upper()}', pin: {signal["pin"]}}}), (s:Signal {{name: '{sname}'}})
117                    MERGE (s)-[:hasSignalPin {{peripheral: '{pname}', index: {index}}}]->(p);
118                """)
119
120    for mV, freqs in data["flash_latency"].items():
121        db.execute(f"""
122            CREATE (:FlashWaitState {{
123                minVoltage: {mV},
124                maxFrequency: [{", ".join(map(str, freqs))}]
125            }});
126        """)
def kg_from_header(db, header):
129def kg_from_header(db, header):
130    # Add interrupt vector table
131    for position, name in sorted(header["irqs"]):
132        db.execute(f"""
133            CREATE (:InterruptVector {{
134                name: '{name}',
135                position: {position}
136            }});
137            MATCH (p:Peripheral), (i:InterruptVector {{name: '{name}'}})
138            WHERE '{name}' CONTAINS p.name
139            CREATE (p)-[:hasInterruptVector]->(i);
140        """)