Contents
Cirrus Link Resources
Chariot MQTT Server v1 (previous version)
Cirrus Link Modules for Ignition
Contact Us (Sales/Support)
Forum
...
...
The script requires the Python requests package to be installed.
We have included a download link to third party distribution here which should be unzipped into your <IgnitionInstallationDirectory>user-lib/pylib/site-packages folder.
Code Block | ||||
---|---|---|---|---|
| ||||
import requests import base64 #HTTP call method def httpCall(method,url,headers,data): if method == "POST": resp = requests.post(url, headers=headers, data=data) if method == "GET": resp = requests.get(url, headers=headers, data=data) if method == "PUT": resp = requests.put(url, headers=headers, data=data) #check for failures on httpCall if resp.status_code != 200: print "HTTPCall error code: " + str(resp.json()) print "HTTPCall error detail: " + data return resp.json() #Set URL for you Chariot installation #chariotUrl = "http://localhost:8080" #Create Chariot username:password dictionary for your MQTT Distributor users #userpassword = {"admin":"password","test2":"password2","test3":"password3"} ############################################################################# headers = {} #Accept Chariot EULA url = chariotUrl + "/eula" headers["accept"] = "application/json;api-version=1.0" headers["content-type"] = "application/json" data = "{\"isAccepted\": true}" http_response = httpCall("POST",url,headers,data) #Chariot login using admin user to get bearer token valid for 90 minutes url = chariotUrl + "/login" headers["accept"] = "application/json;api-version=1.0" headers["Authorization"] = "Basic " + base64.b64encode("admin:password") data = {} http_response = httpCall("POST",url,headers,data) accessToken = http_response["access_token"] #Read Distributor users users = system.cirruslink.distributor.readConfig("Users") #Create new format Access Control Lists (ACLs) for i in range(len(users)): subscribeList = "" publishList = "" userName = users[i]["Username"] password = list(userpassword.values())[list(userpassword.keys()).index(userName)] acls = users[i]["ACLs"] acl_list = acls.split(",") for i in range(len(acl_list)): #check for leading spaces if acl_list[i][:1] == " ": acl_list[i] = acl_list[i][1:len(acl_list[i])] #check for R subscriptions if acl_list[i][:2] == "R ": subscription = acl_list[i][2:len(acl_list[i])] subscription = "\"" + subscription + "\"" subscribeList = subscribeList + subscription + "," #check for W subscriptions if acl_list[i][:2] == "W ": publish = acl_list[i][2:len(acl_list[i])] publish = "\"" + publish + "\"" publishList = publishList + publish + "," #check for RW subscription if acl_list[i][:2] == "RW": subscription = acl_list[i][3:len(acl_list[i])] subscription = "\"" + subscription + "\"" subscribeList = subscribeList + subscription + "," publish = acl_list[i][3:len(acl_list[i])] publish = "\"" + publish + "\"" publishList = publishList + publish + "," #create Chariot user url = chariotUrl + "/mqttusers" headers["accept"] = "application/json;api-version=1.0" headers["content-type"] = "application/json" headers["Authorization"] = "Bearer " + accessToken data = "[{\"username\":\"" + userName + "\"," + "\"password\":\"" + password + "\"," + "\"acl\":{\"publishTopics\":[" + publishList[:-1] +"],\"subscribeTopics\":[" + subscribeList[:-1] + "]}}]" http_response = httpCall("POST",url,headers,data) #Read Chariot MQtt Users url = chariotUrl + "/mqttusers" headers["accept"] = "application/json;api-version=1.0" headers["Authorization"] = "Bearer " + accessToken data = {} chariotMqttUsers = httpCall("GET",url,headers,data) print "Chariot MQTT Credentials: " + str(chariotMqttUsers) ############################################################################# |
...