OPC UA 方法調用:動態檢測與轉換自定義輸入參數

OPC UA 方法調用:動態檢測與轉換自定義輸入參數

本文檔旨在指導開發者在使用 OPC UA 客戶端調用方法時,如何動態檢測并正確轉換自定義輸入參數。通過讀取方法的 InputArguments 屬性,獲取參數的數據類型信息,并將其映射到相應的 python 類,從而實現參數的自動轉換,避免手動指定數據類型的繁瑣過程,提高代碼的靈活性和可維護性。

在 OPC UA 通信中,調用方法時,需要將參數轉換為服務端期望的數據類型。對于標準數據類型,這通常不是問題。但當涉及到自定義數據類型時,客戶端需要預先知道參數的具體結構,這在動態客戶端場景下變得困難。本文將介紹如何利用 asyncua 庫動態獲取方法參數類型,并進行相應的類型轉換

獲取方法參數類型信息

OPC UA 方法的參數信息存儲在其 0:InputArguments 變量中。通過讀取該變量的值,我們可以獲得一個 Argument 對象的列表,每個對象包含了參數的名稱、數據類型等信息。

from asyncua import Client from asyncua.ua import ObjectIds  async def get_method_input_arguments(client, method_node):     """     獲取 OPC UA 方法的輸入參數信息。      Args:         client: asyncua.Client 實例。         method_node: 方法節點的 Node 對象。      Returns:         一個包含 Argument 對象的列表。     """     input_arguments_node = await client.get_node(method_node.nodeid.to_string() + ".InputArguments")     arguments = await input_arguments_node.read_value()     return arguments  # 示例用法 async def main():     async with Client("opc.tcp://your_opcua_server:4840") as client:         # 假設 method_node 是你要調用的方法節點         method_node = await client.get_node("ns=2;i=1234")  # 替換為你的方法節點 ID         arguments = await get_method_input_arguments(client, method_node)         for arg in arguments:             print(f"參數名稱: {arg.Name}, 數據類型: {arg.DataType}")  if __name__ == "__main__":     import asyncio     asyncio.run(main())

將 DataType NodeId 映射到 Python 類

Argument 對象中的 DataType 字段是一個 NodeId,它標識了參數的數據類型。我們需要將這個 NodeId 映射到相應的 Python 類,以便創建正確類型的參數值。

from asyncua.ua import ObjectIds from asyncua import ua  base_type_list = {     ua.NodeId(ObjectIds.Null): None,     ua.NodeId(ObjectIds.Boolean): bool,     ua.NodeId(ObjectIds.SByte): ua.SByte,     ua.NodeId(ObjectIds.Byte): ua.Byte,     ua.NodeId(ObjectIds.Int16): ua.Int16,     ua.NodeId(ObjectIds.UInt16): ua.UInt16,     ua.NodeId(ObjectIds.Int32): ua.Int32,     ua.NodeId(ObjectIds.UInt32): ua.UInt32,     ua.NodeId(ObjectIds.Int64): ua.Int64,     ua.NodeId(ObjectIds.UInt64): ua.UInt64,     ua.NodeId(ObjectIds.Float): ua.Float,     ua.NodeId(ObjectIds.Double): ua.Double,     ua.NodeId(ObjectIds.String): ua.String,     ua.NodeId(ObjectIds.DateTime): ua.DateTime,     ua.NodeId(ObjectIds.Guid): ua.Guid,     ua.NodeId(ObjectIds.ByteString): ua.ByteString,     ua.NodeId(ObjectIds.XmlElement): ua.XmlElement,     ua.NodeId(ObjectIds.NodeId): ua.NodeId,     ua.NodeId(ObjectIds.ExpandedNodeId): ua.ExpandedNodeId,     ua.NodeId(ObjectIds.StatusCode): ua.StatusCode,     ua.NodeId(ObjectIds.QualifiedName): ua.QualifiedName,     ua.NodeId(ObjectIds.LocalizedText): ua.LocalizedText,     ua.NodeId(ObjectIds.Structure): ua.ExtensionObject,     ua.NodeId(ObjectIds.DataValue): ua.DataValue,     ua.NodeId(ObjectIds.BaseVariableType): ua.Variant,     ua.NodeId(ObjectIds.DiagnosticInfo): ua.DiagnosticInfo, }   def get_class_from_nodeid(datatype_nodeid):     # check all type dicts       sources = [base_type_list, ua.basetype_by_datatype, ua.extension_objects_by_datatype, ua.enums_by_datatype]    cls = None    for src in sources:        cls = src.get(datatype_nodeid, None)        if cls is not None:           return cls           return cls

get_class_from_nodeid 函數首先查找 base_type_list,然后依次查找 ua.basetype_by_datatype、ua.extension_objects_by_datatype 和 ua.enums_by_datatype。這些字典包含了 NodeId 到 Python 類的映射關系。

動態構建參數并調用方法

有了參數類型信息和類型映射,我們可以動態構建參數并調用方法。

async def call_method_with_dynamic_args(client, method_node, arg_values):     """     使用動態參數調用 OPC UA 方法。      Args:         client: asyncua.Client 實例。         method_node: 方法節點的 Node 對象。         arg_values: 參數值的列表。      Returns:         方法調用的結果。     """     arguments = await get_method_input_arguments(client, method_node)     args = []     for i, arg in enumerate(arguments):         data_type_class = get_class_from_nodeid(arg.DataType)         if data_type_class is None:             raise ValueError(f"不支持的數據類型: {arg.DataType}")          # 將參數值轉換為正確的類型         try:             converted_value = data_type_class(arg_values[i]) if data_type_class != bool else bool(arg_values[i]) # bool類型轉換需要特殊處理         except Exception as e:             raise ValueError(f"參數轉換失敗: {e}")          args.append(converted_value)      # 調用方法     result = await method_node.call_method(*args)     return result  # 示例用法 async def main():     async with Client("opc.tcp://your_opcua_server:4840") as client:         method_node = await client.get_node("ns=2;i=1234")  # 替換為你的方法節點 ID         arg_values = [99, 100, 199]  # 替換為你的參數值         try:             result = await call_method_with_dynamic_args(client, method_node, arg_values)             print(f"方法調用結果: {result}")         except Exception as e:             print(f"方法調用失敗: {e}")  if __name__ == "__main__":     import asyncio     asyncio.run(main())

在這個例子中,call_method_with_dynamic_args 函數首先獲取方法的參數信息,然后根據參數類型將傳入的參數值轉換為正確的類型,最后調用方法并返回結果。

注意事項

  • 錯誤處理: 在實際應用中,需要添加更完善的錯誤處理機制,例如處理不支持的數據類型、參數轉換失敗等情況。
  • 自定義數據類型: 對于復雜的自定義數據類型,可能需要更復雜的轉換邏輯。
  • 性能優化 頻繁調用 get_method_input_arguments 可能會影響性能,可以考慮緩存參數信息。

總結

通過動態檢測 OPC UA 方法的輸入參數類型,并進行相應的類型轉換,我們可以編寫更加靈活和通用的 OPC UA 客戶端程序。這對于需要與各種不同的 OPC UA 服務端交互的應用程序來說尤其重要。 本文提供了一種實現動態參數類型轉換的基本方法,開發者可以根據自己的實際需求進行擴展和優化。

? 版權聲明
THE END
喜歡就支持一下吧
點贊10 分享