Python连接opcua的两个测试,python,两种

发表时间:2021-05-11

自写脚本测试(代码Copy的, 侵删)

服务端代码如下:

import sys
sys.path.insert(0, "..")
import time


from opcua import ua, Server


if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # get Objects node, this is where we should put our nodes
    objects = server.get_objects_node()
    print(objects)
    # populating our address space
    myobj = objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    myvar.set_writable()    # Set MyVariable to be writable by clients

    # starting!
    server.start()

    try:
        count = 0
        while True:
            time.sleep(1)
            count += 0.1
            myvar.set_value(count)
    finally:
        #close connection, remove subcsriptions, etc
        server.stop()

客户端脚本如下:

import sys
sys.path.insert(0, "..")


from opcua import Client


if __name__ == "__main__":

    client = Client("opc.tcp://localhost:4840/freeopcua/server/")
    # client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
    try:
        client.connect()

        # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
        root = client.get_root_node()
        print("Objects node is: ", root)

        # Node objects have methods to read and write node attributes as well as browse or populate address space
        print("Children of root are: ", root.get_children())

        # get a specific node knowing its node id
        #var = client.get_node(ua.NodeId(1002, 2))
        #var = client.get_node("ns=3;i=2002")
        #print(var)
        #var.get_data_value() # get value of node as a DataValue object
        #var.get_value() # get value of node as a python builtin
        #var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
        #var.set_value(3.9) # set node value using implicit data type

        # Now getting a variable node using its browse path
        myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
        obj = root.get_child(["0:Objects", "2:MyObject"])
        print("myvar is: ", myvar)
        print("myobj is: ", obj)

        # Stacked myvar access
        # print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())

    finally:
        client.disconnect()

基于KEPServerEX的测试

首先得去安装相关的软件, 这个资源问问度娘还是有很多的。这是我下的版本 http://www.opdown.com/soft/108809.html , 中文汉化版

安装完成后, 软件自身有demo配置在里面, 可以基于里面的模拟器(Simulator)进行调试

首先对OPC-UA进行设置, 找到右下角的图标, 右键点击,选择OPCUA设置(截不了图, 已放弃治疗)

然后可以再弹出界面中可以找到服务端的URL, 以及它的安全策略

因为是测试, 所以我关闭了安全性, 有需要的可以自行开启, 并进行对接

然后点击编辑, 选择属性, OPC UA, 选择允许匿名登录

然后在软件界面上选择运行时, 然后开启服务器连接, 需要注意每次修改OPC-UA都需要重新建立服务端连接

然后就是开始撸代码, 代码比较简单, 主要是要找到你测试的节点以及参数

import time

from opcua import Client, ua

class SubHandler(object):
    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val, data)

client = Client('opc.tcp://127.0.0.1:49320/', timeout=10)
# client.set_user('will')
# client.set_password('123456') # 需要在连接前调用
# client.set_security_string(r"Basic256,SignAndEncrypt,C:\Users\42082\Desktop\certificate-example.der,C:\Users\42082\Desktop\private-key-example.pem") # 只支持Basic128Rsa15, Basic256 or Basic256Sha256三种生成证书的方法
client.connect()
objects = client.get_objects_node()
print(objects)
root = client.get_root_node()
print(root.get_children()[0].get_children())
namespaces = client.get_namespace_array()
print(namespaces)
endpoints = client.get_endpoints()
print(endpoints)
snode = client.get_server_node()
print(snode)
try:
    node = client.get_node('ns=2;s=通道 1.设备 1.test')
    print(node.get_value())

    handler = SubHandler()
    sub = client.create_subscription(500, handler)
    sub.subscribe_data_change(node)
    time.sleep(1000)
except Exception as e:
    print(e)
finally:
    client.disconnect()

根据目前了解的, 可以通过root节点, 获取所有子节点, 然后找到自己想要找的节点, 再进行处理

附get_node的参数解释, ns为第几个namespace, 后面的s为你寻找参数的路径

Tips, 这个代码是在windows进行测试的, 安装opcua时有些依赖包不会自动安装, 比如cryptography,  安装方法如下:

pip install cryptography==2.4.2 --only-binary=:all:

文章来源互联网,如有侵权,请联系管理员删除。邮箱:417803890@qq.com / QQ:417803890

微配音

Python Free

邮箱:417803890@qq.com
QQ:417803890

皖ICP备19001818号-4
© 2019 copyright www.pythonf.cn - All rights reserved

微信扫一扫关注公众号:

联系方式

Python Free