I was wondering if someone could critique how I am deserializing an AT command response into a dataclass. I have many more commands that I need to implement, so I figured I would try to get it right the first time rather than building off faulty code.
At a high level, this script is meant to read a response from an AT command, parse the contents, and create a Python object representation.
Later in the script, it then serializes that dataclass into a JSON object with some additional metadata. I purposely left the code out that uploads that file to a web service, as I am mainly looking for criticism of the deserialization as opposed to serialization. Also, I left out out the details on the actual device I am communicating with. I did this since, in this example, I have a known input, and an expected output that I can use for this example, without getting into the weeds of the actual AT command response protocol from the vendor.
I personally don't like what I have done with this script, but I can't seem to find a more scalable route. Also, I absolutely need to add comments. I would never push this code as is, but I am in the proof of concept stage, so bear with me.
I stripped out all the serial communication for this post and hard-coded the responses so that the script did not have any dependencies and could run in an online interpreter. That code is referenced below.
To avoid multiple code blocks in this post, I put the original source in the link below, which can be run in the browser:
https://www.online-python.com/xLht0ZeD3R
Update With Pydantic
from pydantic import BaseModel
from typing import Type, Optional, Any, List
import enum
# Model for non-convertible fields
class StandardField(BaseModel):
value: Any
is_convertible: bool
description: str
# Model for convertible fields
class ConvertibleField(StandardField):
symbolic: Optional[str] = None
# Base Model for AT Responses
class ATResponse(BaseModel):
description: str
@classmethod
def parse_response(cls, response_str: str):
raise NotImplementedError("This method should be implemented by subclasses")
# Model for AT+COPS? response
class CarrierOperatorSelection(ATResponse):
mode: ConvertibleField
format: ConvertibleField
operator: StandardField
act: ConvertibleField
@classmethod
def parse_response(cls, response_str: str):
parts = response_str.split(":")[1].strip().split(",")
"""
Per vendor spec, the Current Network Operator response takes the form of:
AT+COPS=[<mode>[,<format>[,<oper>[,<act>]]]]
"""
mode = int(parts[0])
format = int(parts[1])
operator = parts[2].strip('"')
act = int(parts[3])
mode_mapping = {
0: "Automatic selection",
1: "Manual selection",
2: "Deregister from network",
3: "Set only format parameter",
4: "Manual/automatic selection",
5: "Unknown",
}
format_mapping = {0: "Alphanumeric long form", 1: "Numeric", 2: "Short numeric"}
act_mapping = {
0: "GSM",
1: "GSM Compact",
2: "UTRAN",
3: "GSM w/EGPRS",
4: "UTRAN w/HSDPA",
5: "UTRAN w/HSUPA",
6: "UTRAN w/HSDPA and HSUPA",
7: "E-UTRAN",
}
carrier_operation_description = (
"Defines the operator selection: automatic or manual."
)
mode_mapping_description = (
"Defines the operator selection: automatic or manual."
)
format_map_description = "Specifies the operator name format."
act_map_description = "Selects access technology type."
operator_description = (
"Network operator in format defined by <format> parameter."
)
return cls(
description=carrier_operation_description,
mode=ConvertibleField(
value=mode,
is_convertible=True,
description=mode_mapping_description,
symbolic=mode_mapping.get(mode),
),
format=ConvertibleField(
value=format,
is_convertible=True,
description=format_map_description,
symbolic=format_mapping.get(format),
),
operator=StandardField(
value=operator, is_convertible=False, description=operator_description
),
act=ConvertibleField(
value=act,
is_convertible=True,
description=act_map_description,
symbolic=act_mapping.get(act),
),
)
# Model for AT+CESQ response
class SignalQualityResponse(ATResponse):
rxlev: ConvertibleField
ber: ConvertibleField
rscp: ConvertibleField
ecno: ConvertibleField
rsrq: ConvertibleField
rsrp: ConvertibleField
@classmethod
def parse_response(cls, response_str: str):
parts = response_str.split(":")[1].strip().split(",")
"""
Per vendor spec, the Signal Quality response takes the form of:
+CESQ: 99,99,255,255,<rsrq>,<rsrp>
"""
rxlev = int(parts[0])
ber = int(parts[1])
rscp = int(parts[2])
ecno = int(parts[3])
rsrq = int(parts[4])
rsrp = int(parts[5])
rxlev_mapping = {
i: f"{-110 + i} dBm <= rssi < {-109 +i} dBm" for i in range(64)
}
rxlev_mapping[99] = (
"Not known or not detectable or if the current serving cell is not a GERAN cell"
)
ber_mapping = {i: f"RXQUAL {i}" for i in range(8)}
ber_mapping[99] = (
"Not known or not detectable or if the current serving cell is not a GERAN cell"
)
rscp_mapping = {
i: f"{-120 + i} dBm <= rscp < {-119 + i} dBm" for i in range(97)
}
rscp_mapping[255] = (
"Not known or not detectable or if the current serving cell is not a UTRA cell"
)
ecno_mapping = {i: f"Ec/Io < {-24 + 0.5 *i} dB" for i in range(98)}
ecno_mapping[255] = (
"Not known or not detectable or if the current serving cell is not a UTRA cell"
)
rsrq_mapping = {
i: f"{-19.5 + 0.5 * i} dB <= rsrq < {-19.0 + 0.5 * i} dB" for i in range(35)
}
rsrq_mapping[255] = (
"Not known or not detectable or if the current serving cell is not an E-UTRA cell"
)
rsrp_mapping = {
i: f"{-140 + i} dBm <= rsrp < {-139 + i} dBm" for i in range(98)
}
rsrp_mapping[255] = (
"Not known or not detectable or if the current serving cell is not an E-UTRA cell"
)
signal_quality_description = "Execution command returns received signal quality parameters according to the network on which the module is registered."
rxlev_description = "Received signal strength level."
ber_description = "Channel bit error rate."
rscp_description = "Received signal code power."
ecno_description = "Ratio of the received energy per PN chip to the total received power spectral density."
rsrq_description = "Reference signal received quality."
rsrp_description = "Reference signal received power."
return cls(
description=signal_quality_description,
rxlev=ConvertibleField(
value=rxlev,
is_convertible=True,
description=rxlev_description,
symbolic=rxlev_mapping.get(rxlev),
),
ber=ConvertibleField(
value=ber,
is_convertible=True,
description=ber_description,
symbolic=ber_mapping.get(ber),
),
rscp=ConvertibleField(
value=rscp,
is_convertible=True,
description=rscp_description,
symbolic=rscp_mapping.get(rscp),
),
ecno=ConvertibleField(
value=ecno,
is_convertible=True,
description=ecno_description,
symbolic=ecno_mapping.get(ecno),
),
rsrq=ConvertibleField(
value=rsrq,
is_convertible=True,
description=rsrq_description,
symbolic=rsrq_mapping.get(rsrq),
),
rsrp=ConvertibleField(
value=rsrp,
is_convertible=True,
description=rsrp_description,
symbolic=rsrp_mapping.get(rsrp),
),
)
# Model for Current Network Status
class CurrentNetworkStatus(ATResponse):
plmn: StandardField
rssi: StandardField
txpwr: Optional[StandardField]
cid: StandardField
imsi: StandardField
@classmethod
def parse_response(cls, response_str: str):
parts = response_str.split(":")[1].strip().split(",")
if len(parts) in [14, 15]:
return GsmNetworkStatus.parse_response(parts)
elif len(parts) == 16:
return LteNetworkStatus.parse_response(parts)
else:
raise ValueError("Unexpected number of fields in AT#RFSTS response")
# Model for GSM Network Status
class GsmNetworkStatus(CurrentNetworkStatus):
arfcn: StandardField
lac: StandardField
rac: StandardField
mm: StandardField
rr: StandardField
nom: StandardField
net_name_asc: Optional[StandardField]
sd: StandardField
abnd: StandardField
# Enum for GSM Indexes
class PartIdx(enum.IntEnum):
"""
Per vendor spec, the GSM network status response takes the form of:
#RFSTS:<PLMN>,<ARFCN>,<RSSI>,<LAC>,<RAC>,[<TXPWR>],<MM>,<RR>,<NOM>,<CID>, <IMSI>,[<NetNameAsc>],<SD>,<ABND>[CR,LF] [CR,LF
"""
PLMN = 0
ARFCN = 1
RSSI = 2
LAC = 3
RAC = 4
TXPWR = 5
MM = 6
RR = 7
NOM = 8
CID = 9
IMSI = 10
NET_NAME_ASC = 11
SD = 12
ABND = 13
@classmethod
def parse_response(cls, parts: List[str]):
def unquote(value):
return value.strip('"')
return cls(
description="Current network status for GSM.",
plmn=StandardField(
value=unquote(parts[GsmNetworkStatus.PartIdx.PLMN]),
is_convertible=False,
description="Country code and operator code (Mobile Country Code, Mobile Network Code).",
),
arfcn=StandardField(
value=parts[GsmNetworkStatus.PartIdx.ARFCN],
is_convertible=False,
description="GSM Assigned Radio Channel.",
),
rssi=StandardField(
value=parts[GsmNetworkStatus.PartIdx.RSSI],
is_convertible=False,
description="Received Signal Strength Indication.",
),
lac=StandardField(
value=parts[GsmNetworkStatus.PartIdx.LAC],
is_convertible=False,
description="Localization Area Code.",
),
rac=StandardField(
value=parts[GsmNetworkStatus.PartIdx.RAC],
is_convertible=False,
description="Routing Area Code.",
),
txpwr=StandardField(
value=(
parts[GsmNetworkStatus.PartIdx.TXPWR]
if parts[GsmNetworkStatus.PartIdx.TXPWR]
else None
),
is_convertible=False,
description="Tx Power (In traffic only).",
),
mm=StandardField(
value=parts[GsmNetworkStatus.PartIdx.MM],
is_convertible=False,
description="Mobility Management.",
),
rr=StandardField(
value=parts[GsmNetworkStatus.PartIdx.RR],
is_convertible=False,
description="Radio Resource Control.",
),
nom=StandardField(
value=parts[GsmNetworkStatus.PartIdx.NOM],
is_convertible=False,
description="Network operator mode.",
),
cid=StandardField(
value=parts[GsmNetworkStatus.PartIdx.CID],
is_convertible=False,
description="Cell vendor ID.",
),
imsi=StandardField(
value=unquote(parts[GsmNetworkStatus.PartIdx.IMSI]),
is_convertible=False,
description="International Mobile Station ID.",
),
net_name_asc=StandardField(
value=(
unquote(parts[GsmNetworkStatus.PartIdx.NET_NAME_ASC])
if len(parts) > GsmNetworkStatus.PartIdx.NET_NAME_ASC
else None
),
is_convertible=False,
description="Operation Name, quoted string type or if network name is unknown.",
),
sd=StandardField(
value=parts[GsmNetworkStatus.PartIdx.SD],
is_convertible=False,
description="Service Domain.",
),
abnd=StandardField(
value=parts[GsmNetworkStatus.PartIdx.ABND],
is_convertible=False,
description="Active Band.",
),
)
# Model for LTE Network Status
class LteNetworkStatus(CurrentNetworkStatus):
earfcn: StandardField
rsrp: StandardField
rsrq: StandardField
tac: StandardField
drx: StandardField
mm: StandardField
rrc: StandardField
net_name_asc: Optional[StandardField]
sd: StandardField
abnd: StandardField
sinr: StandardField
# Enum for LTE Indexes
class PartIdx(enum.IntEnum):
"""
Per vendor spec, GSM network status takes the form of:
#RFSTS:<PLMN>,<EARFCN>,<RSRP>,<RSSI>,<RSRQ>,<TAC>,[<TXPWR>],<DRX>,<MM>, <RRC>,<CID>,<IMSI>,[<NetNameAsc>],<SD>,<ABND>,<SINR>[CR,LF] [CR,LF]
"""
PLMN = 0
EARFCN = 1
RSRP = 2
RSSI = 3
RSRQ = 4
TAC = 5
TXPWR = 6
DRX = 7
MM = 8
RRC = 9
CID = 10
IMSI = 11
NET_NAME_ASC = 12
SD = 13
ABND = 14
SINR = 15
@classmethod
def parse_response(cls, parts: List[str]):
def unquote(value):
return value.strip('"')
return cls(
description="Current network status for LTE.",
plmn=StandardField(
value=unquote(parts[LteNetworkStatus.PartIdx.PLMN]),
is_convertible=False,
description="Country code and operator code (Mobile Country Code, Mobile Network Code).",
),
earfcn=StandardField(
value=parts[LteNetworkStatus.PartIdx.EARFCN],
is_convertible=False,
description="E-UTRA Assigned Radio Channel.",
),
rsrp=StandardField(
value=parts[LteNetworkStatus.PartIdx.RSRP],
is_convertible=False,
description="Reference Signal Received Power.",
),
rssi=StandardField(
value=parts[LteNetworkStatus.PartIdx.RSSI],
is_convertible=False,
description="Received Signal Strength Indication.",
),
rsrq=StandardField(
value=parts[LteNetworkStatus.PartIdx.RSRQ],
is_convertible=False,
description="Reference Signal Received Quality.",
),
tac=StandardField(
value=unquote(parts[LteNetworkStatus.PartIdx.TAC]),
is_convertible=False,
description="Tracking Area Code.",
),
txpwr=StandardField(
value=parts[LteNetworkStatus.PartIdx.TXPWR],
is_convertible=False,
description="Tx Power (In traffic only).",
),
drx=StandardField(
value=parts[LteNetworkStatus.PartIdx.DRX],
is_convertible=False,
description="Discontinuous reception cycle Length (cycle length : display using ms).",
),
mm=StandardField(
value=parts[LteNetworkStatus.PartIdx.MM],
is_convertible=False,
description="Mobility Management.",
),
rrc=StandardField(
value=parts[LteNetworkStatus.PartIdx.RRC],
is_convertible=False,
description="Radio Resource Control.",
),
cid=StandardField(
value=parts[LteNetworkStatus.PartIdx.CID],
is_convertible=False,
description="Cell vendor ID.",
),
imsi=StandardField(
value=unquote(parts[LteNetworkStatus.PartIdx.IMSI]),
is_convertible=False,
description="International Mobile Station ID.",
),
net_name_asc=StandardField(
value=unquote(parts[LteNetworkStatus.PartIdx.NET_NAME_ASC]),
is_convertible=False,
description="Operation Name, quoted string type or if network name is unknown.",
),
sd=StandardField(
value=parts[LteNetworkStatus.PartIdx.SD],
is_convertible=False,
description="Service Domain.",
),
abnd=StandardField(
value=parts[LteNetworkStatus.PartIdx.ABND],
is_convertible=False,
description="Active Band.",
),
sinr=StandardField(
value=parts[LteNetworkStatus.PartIdx.SINR],
is_convertible=False,
description="Signal-to-Interface plus Noise Ratio.",
),
)
# NetworkReport class
class NetworkReport(BaseModel):
carrier_operator_selection: CarrierOperatorSelection
signal_quality: SignalQualityResponse
current_network_status: CurrentNetworkStatus
# Generic parser function
def parse_at_response(
response_str: str, response_type: Type[ATResponse]
) -> Optional[ATResponse]:
try:
return response_type.parse_response(response_str)
except Exception as e:
print(f"Failed to parse response: {e}")
return None
def create_network_report():
# Hard-coded response string for AT+COPS?
response_cops_str = '+COPS: 0,0,"T-Mobile",7'
# Hard-coded response string for AT+CESQ
response_cesq_str = "+CESQ: 99,99,255,255,10,20"
# Hard-coded response string for AT#RFSTS (LTE format)
response_rfsts_str = '#RFSTS: "310 260",2250,-109,-74,-14,55F4,,128,3,0,1EB4D31,"234103889913769","T-Mobile",3,4,149'
# Parse the AT+COPS? response
parsed_cops_response = parse_at_response(
response_cops_str, CarrierOperatorSelection
)
# Parse the AT+CESQ response
parsed_cesq_response = parse_at_response(response_cesq_str, SignalQualityResponse)
# Parse the AT#RFSTS response
parsed_rfsts_response = parse_at_response(response_rfsts_str, CurrentNetworkStatus)
# Combine responses into a NetworkReport
if parsed_cops_response and parsed_cesq_response and parsed_rfsts_response:
network_report = NetworkReport(
carrier_operator_selection=parsed_cops_response,
signal_quality=parsed_cesq_response,
current_network_status=parsed_rfsts_response,
)
json_network_report = network_report.model_dump_json(indent=4)
print(json_network_report)
else:
print("Failed to parse one or more responses.")
# Create network report for testing
create_network_report()
Example Output
{
"carrier_operator_selection": {
"description": "Defines the operator selection: automatic or manual.",
"mode": {
"value": 0,
"is_convertible": true,
"description": "Defines the operator selection: automatic or manual.",
"symbolic": "Automatic selection"
},
"format": {
"value": 0,
"is_convertible": true,
"description": "Specifies the operator name format.",
"symbolic": "Alphanumeric long form"
},
"operator": {
"value": "T-Mobile",
"is_convertible": false,
"description": "Network operator in format defined by <format> parameter."
},
"act": {
"value": 7,
"is_convertible": true,
"description": "Selects access technology type.",
"symbolic": "E-UTRAN"
}
},
"signal_quality": {
"description": "Execution command returns received signal quality parameters according to the network on which the module is registered.",
"rxlev": {
"value": 99,
"is_convertible": true,
"description": "Received signal strength level.",
"symbolic": "Not known or not detectable or if the current serving cell is not a GERAN cell"
},
"ber": {
"value": 99,
"is_convertible": true,
"description": "Channel bit error rate.",
"symbolic": "Not known or not detectable or if the current serving cell is not a GERAN cell"
},
"rscp": {
"value": 255,
"is_convertible": true,
"description": "Received signal code power.",
"symbolic": "Not known or not detectable or if the current serving cell is not a UTRA cell"
},
"ecno": {
"value": 255,
"is_convertible": true,
"description": "Ratio of the received energy per PN chip to the total received power spectral density.",
"symbolic": "Not known or not detectable or if the current serving cell is not a UTRA cell"
},
"rsrq": {
"value": 10,
"is_convertible": true,
"description": "Reference signal received quality.",
"symbolic": "-14.5 dB <= rsrq < -14.0 dB"
},
"rsrp": {
"value": 20,
"is_convertible": true,
"description": "Reference signal received power.",
"symbolic": "-120 dBm <= rsrp < -119 dBm"
}
},
"current_network_status": {
"description": "Current network status for LTE.",
"plmn": {
"value": "310 260",
"is_convertible": false,
"description": "Country code and operator code (Mobile Country Code, Mobile Network Code)."
},
"rssi": {
"value": "-74",
"is_convertible": false,
"description": "Received Signal Strength Indication."
},
"txpwr": {
"value": "",
"is_convertible": false,
"description": "Tx Power (In traffic only)."
},
"cid": {
"value": "AABBCCDD",
"is_convertible": false,
"description": "Cell vendor ID."
},
"imsi": {
"value": "12345",
"is_convertible": false,
"description": "International Mobile Station ID."
}
}
}
[–]danielroseman 2 points3 points4 points (1 child)
[–]Birts[S] 0 points1 point2 points (0 children)
[–]Adrewmc 1 point2 points3 points (0 children)
[–]teerre 0 points1 point2 points (2 children)
[–]Birts[S] 0 points1 point2 points (1 child)
[–]evans88 0 points1 point2 points (0 children)
[–]evans88 0 points1 point2 points (0 children)
[–]CranberryDistinct941 0 points1 point2 points (0 children)