Spaces:
Runtime error
Runtime error
| from numpy import newaxis | |
| import requests | |
| import json | |
| import time | |
| import pandas as pd | |
| import sys | |
| import datetime | |
| import pathlib | |
| import os | |
| import numpy as np | |
| from Date_Manipulation import convertStrToDateTime | |
| class NpEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| return super(NpEncoder, self).default(obj) | |
| class Eversense4_API: | |
| """ | |
| Main class for Eversense4 Thingsboard REST API Driver. | |
| """ | |
| def __init__(self, host = 'https://eversense.forbesmarshall.com') -> None: | |
| """host = eversense4 thingsboard host url ... (format : http[s]://host) | |
| """ | |
| self.host = host | |
| def login(self, username, password): | |
| """ | |
| This function will log into ES4 with provided credentials and return the token. | |
| It will also save the token as object internal variable for further use | |
| username : username | |
| password : password | |
| """ | |
| loginUrl = self.host + '/api/auth/login' | |
| loginData = {'username' : username, | |
| 'password' : password} | |
| loginHeaders = {'Content-Type' : 'application/json', | |
| 'Accept': 'application/json'} | |
| resp_login = requests.post(loginUrl, data = json.dumps(loginData), headers=loginHeaders) | |
| resp = json.loads(resp_login.text) | |
| token = resp['token'] | |
| self.token = token | |
| return token | |
| def searchDeviceByName(self, devName : str): | |
| """ | |
| This function returns object of device with provided name else false | |
| """ | |
| Url = self.host + f"/api/tenant/devices?deviceName={devName}" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| if resp.status_code == 404: | |
| return False | |
| else: | |
| return resp.json() | |
| def getDevicesOfType(self, deviceType): | |
| """ | |
| This function will return array of devices of provided device profile | |
| deviceType : str : device profile | |
| """ | |
| Url = self.host + '/api/user/devices?type=' + deviceType + '&textSearch&sortProperty&sortOrder&pageSize=1000&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getCustomerInfo(self, customerId): | |
| """ | |
| This function returns detailed dict of customer info of provided customer id | |
| customerId : customer id | |
| """ | |
| Url = self.host + '/api/customer/'+ customerId +'/' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getDeviceKeys(self, deviceId): | |
| """ | |
| This function returns list of telemetry keys sent by provided device id to ES4 server | |
| deviceId : Device Id | |
| """ | |
| Url = self.host + '/api/plugins/telemetry/DEVICE/'+ deviceId +'/keys/timeseries' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getDeviceAttrKeys(self, deviceId): | |
| """ | |
| This function returns list of attributes keys sent by provided device id to ES4 server | |
| deviceId : Device Id | |
| """ | |
| Url = self.host + '/api/plugins/telemetry/DEVICE/'+ deviceId +'/keys/attributes' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getEntityGroupCustomers(self, entityGroupId): | |
| """ | |
| This function returns list of customers under provided package Id | |
| entityGroupId : package Id | |
| """ | |
| Url = self.host + '/api/entityGroup/'+ entityGroupId +'/customers?pageSize=500&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getCustomerDevices(self, customerId): | |
| """ | |
| This function returns list of devices owned by provided customer id | |
| customerId : customer id | |
| """ | |
| Url = self.host + '/api/customer/'+ customerId +'/devices?pageSize=500&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.get(Url, headers=Headers) | |
| respData = json.loads(resp.text) | |
| return respData | |
| def getDeviceLatestTelemetary(self, deviceId, keys): | |
| """ | |
| This function returns map of latest data for provided deviceId and provided keys | |
| deviceId : device id | |
| keys : list of keys | |
| """ | |
| keys_param = ",".join(keys) | |
| Url = self.host + '/api/plugins/telemetry/' + 'DEVICE' + '/' + deviceId + '/values/timeseries' + "?" + "keys=" + keys_param | |
| # print(Url) | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getDeviceDataValues(self, deviceId, keys, startTS, endTS, interval, limit,cnt=0, halfData = {}): | |
| """ | |
| This function returns list of maps of latest data for provided deviceId and provided keys | |
| deviceId : device id | |
| keys : list of keys | |
| startTS : millisecond epoch starting time | |
| endTS : millisecond epoch ending time | |
| interval : parmater needed by TB at times of aggragation, keep 60000 by default | |
| limit : max number of records for each key to retrieve ... keep 500000 by default | |
| cnt : for recursive purpose, stopping counter | |
| halfData : for recursive purpose, in case data to retrieve is too heavy, function will try to divide and merge vertically (based on timestamps) | |
| """ | |
| keys_param = ",".join(keys) | |
| Url = self.host + '/api/plugins/telemetry/' + 'DEVICE' + '/' + deviceId + '/values/timeseries' + "?" + "keys=" + keys_param + "&" + "startTs=" + str(startTS) + "&" + "endTs=" + str(endTS) + "&" + "interval=" + str(interval) + "&" + "agg=" + "NONE" + "&" + "limit=" + str(limit) | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| if(len(data.text) == 0 and cnt < 3): | |
| # print("********************* CAUTION ... Calling again ______________________") | |
| respDataJson1 = self.getDeviceDataValues(deviceId, keys, startTS, int((startTS + endTS) / 2), interval, limit,cnt+1) | |
| respDataJson2 = self.getDeviceDataValues(deviceId, keys, int((startTS + endTS) / 2), endTS, interval, limit,cnt+1) | |
| final_dictionary = {x: respDataJson1.get(x, []) + respDataJson2.get(x, []) | |
| for x in set(respDataJson1).union(respDataJson2)} | |
| return final_dictionary | |
| else: | |
| try: | |
| respDataJson = json.loads(data.text) | |
| except: | |
| # Raise exception for devname from time to time | |
| raise Exception("Unable to get data from {} to {} for device {}".format(startTS, endTS, deviceId)) | |
| respDataJson={} | |
| return respDataJson | |
| def getDeviceDataValuesNew(self, deviceId, keys, startTS, endTS, interval, limit, NUM_KEYS = 5): | |
| """ | |
| This function returns list of maps of latest data for provided deviceId and provided keys | |
| deviceId : device id | |
| keys : list of keys | |
| startTS : millisecond epoch starting time | |
| endTS : millisecond epoch ending time | |
| interval : parmater needed by TB at times of aggragation, keep 60000 by default | |
| limit : max number of records for each key to retrieve ... keep 500000 by default | |
| NUM_KEYS : in case data to retrieve is too heavy, function will try to divide and merge horizontally (based on columns) | |
| """ | |
| df1 = pd.DataFrame({'ts' : []}) | |
| for i in range((len(keys) // NUM_KEYS) + 1): | |
| # print(i*NUM_KEYS,(i + 1)*(NUM_KEYS)) | |
| if i == (len(keys) // NUM_KEYS): | |
| values = self.getDeviceDataValues(deviceId, keys[i*NUM_KEYS:i*(NUM_KEYS) + (len(keys) % 10)], startTS, endTS, interval, limit) | |
| print(keys[i*NUM_KEYS:i*(NUM_KEYS) + (len(keys) % 10)]) | |
| else: | |
| values = self.getDeviceDataValues(deviceId, keys[i*NUM_KEYS:(i+1)*(NUM_KEYS)], startTS, endTS, interval, limit) | |
| print(keys[i*NUM_KEYS:(i+1)*(NUM_KEYS)]) | |
| df = self.processData(values) | |
| df1 = pd.merge(df1, df, how='outer', on='ts', suffixes=('', '_###_DeleteThis')) | |
| for col in df1.columns: | |
| if '###_DeleteThis' in col: | |
| df1.drop(columns=col, inplace=True) | |
| df1.set_index('Timestamp', drop = False, inplace = True) | |
| return df1 | |
| def getAssetDataValues(self, deviceId, keys, startTS, endTS, interval, limit,cnt=0, halfData = {}): | |
| """ | |
| This function returns list of maps of latest data for asset with provided deviceId and provided keys | |
| deviceId : device id | |
| keys : list of keys | |
| startTS : millisecond epoch starting time | |
| endTS : millisecond epoch ending time | |
| interval : parmater needed by TB at times of aggragation, keep 60000 by default | |
| limit : max number of records for each key to retrieve ... keep 500000 by default | |
| cnt : for recursive purpose, stopping counter | |
| halfData : for recursive purpose, in case data to retrieve is too heavy, function will try to divide and merge vertically (based on timestamps) | |
| """ | |
| keys_param = ",".join(keys) | |
| Url = self.host + '/api/plugins/telemetry/' + 'ASSET' + '/' + deviceId + '/values/timeseries' + "?" + "keys=" + keys_param + "&" + "startTs=" + str(startTS) + "&" + "endTs=" + str(endTS) + "&" + "interval=" + str(interval) + "&" + "agg=" + "NONE" + "&" + "limit=" + str(limit) | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| if(len(data.text) == 0 and cnt < 3): | |
| # print("********************* CAUTION ... Calling again ______________________") | |
| respDataJson1 = self.getAssetDataValues(deviceId, keys, startTS, int((startTS + endTS) / 2), interval, limit,cnt+1) | |
| respDataJson2 = self.getAssetDataValues(deviceId, keys, int((startTS + endTS) / 2), endTS, interval, limit,cnt+1) | |
| final_dictionary = {x: respDataJson1.get(x, []) + respDataJson2.get(x, []) | |
| for x in set(respDataJson1).union(respDataJson2)} | |
| return final_dictionary | |
| else: | |
| try: | |
| respDataJson = json.loads(data.text) | |
| except: | |
| # Raise exception for devname from time to time | |
| raise Exception("Unable to get data from {} to {} for device {}".format(startTS, endTS, deviceId)) | |
| respDataJson={} | |
| return respDataJson | |
| def getDeviceAttrValues(self, deviceId, keys): | |
| """ | |
| This function returns map of device server attributes for provided keys | |
| deviceId : device id | |
| keys : list of keys | |
| """ | |
| keys_param = ",".join(keys) | |
| Url = self.host + '/api/plugins/telemetry/' + 'DEVICE' + '/' + deviceId + '/values/attributes' + "?" + "keys=" + keys_param | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| dataDict = {i['key'] : i['value'] for i in respDataJson} | |
| return dataDict | |
| def getAssetAttrValues(self, assetId, keys): | |
| """ | |
| This function returns map of asset server attributes for provided keys | |
| assetId : asset id | |
| keys : list of keys | |
| """ | |
| keys_param = ",".join(keys) | |
| Url = self.host + '/api/plugins/telemetry/' + 'ASSET' + '/' + assetId + '/values/attributes' + "?" + "keys=" + keys_param | |
| # print(Url) | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| dataDict = {i['key'] : i['value'] for i in respDataJson} | |
| return dataDict | |
| def getCustomerAttrValues(self, custId, keys): | |
| """ | |
| This function returns map of asset server attributes for provided keys | |
| assetId : asset id | |
| keys : list of keys | |
| """ | |
| keys_param = ",".join(keys) | |
| #https://eversense.forbesmarshall.com/api/plugins/telemetry/CUSTOMER/784f394c-42b6-435a-983c-b7beff2784f9/values/attributes?keys=test | |
| Url = self.host + '/api/plugins/telemetry/' + 'CUSTOMER' + '/' + custId + '/values/attributes' + "?" + "keys=" + keys_param | |
| # print(Url) | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| dataDict = {i['key'] : i['value'] for i in respDataJson} | |
| return dataDict | |
| def processData(self, values): | |
| """ | |
| This function takes values array (list of dicts) as input and transforms it into a pandas dataframe for further use | |
| Send output sent by getDeviceDataValues / getAssetDataValues to this function | |
| values : output sent by above mentioned functions | |
| """ | |
| dfFinal = pd.DataFrame({'ts' : []}) | |
| for key in values.keys(): | |
| df2 = pd.DataFrame(values[key]) | |
| dfFinal = pd.merge(dfFinal, df2, how='outer') | |
| dfFinal.rename({'value' : key}, axis=1, inplace=True) | |
| dfFinal['Timestamp'] = dfFinal.apply(lambda row : datetime.datetime.fromtimestamp(row['ts'] / 1000), axis=1) | |
| dfFinal.set_index('Timestamp', inplace=True, drop = False) | |
| dfFinal.sort_index(ascending=True, inplace=True) | |
| return dfFinal | |
| def dumpDataToFile(self, deviceDataFrame, deviceName, customerName, fromTS, toTS,orignal_path): | |
| """ | |
| Not used | |
| """ | |
| currentTime = datetime.now() | |
| date=currentTime.strftime("%d.%b.%Y") | |
| new_path=deviceName+"/"+date | |
| if not os.path.exists(new_path): | |
| os.makedirs(new_path) | |
| os.chdir(new_path) | |
| fileName = deviceName + "_" +"("+ str(fromTS) + " to " + str(toTS)+")" + ".csv" | |
| deviceDataFrame.to_csv(fileName, sep=';', index=False) | |
| os.chdir(orignal_path) | |
| def getCustomerDetails(self, customerId): | |
| """ | |
| This function returns details of provided customer id as a dictionary | |
| """ | |
| Url = self.host + '/api/customer/' + customerId | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getCustomerHierarchy(self): | |
| """ | |
| This funnction returns list of all entity groups which are customer groups | |
| In short , it will return all packages in customer hierarchy | |
| """ | |
| Url = self.host + '/api/entityGroups/CUSTOMER' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getCustomerDevices(self, customerId): | |
| """ | |
| This function returns devices of customer with provided id | |
| """ | |
| Url = self.host + '/api/customer/' + customerId + '/devices?pageSize=500&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getCustomerAssets(self, customerId): | |
| """ | |
| This function returns assets of customer with provided id | |
| """ | |
| Url = self.host + '/api/customer/' + customerId + '/assets?pageSize=500&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getDeviceDetails(self, deviceId): | |
| """ | |
| This function returns details of device with provided id as a dictionary | |
| """ | |
| Url = self.host + '/api/device/' + deviceId | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getDeviceProfiles(self): | |
| """ | |
| This function returns list of all device profiles | |
| """ | |
| Url = self.host + '/api/device/types' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getDeviceCredentials(self, deviceId): | |
| """ | |
| This function returns the accessToken of device with provided id | |
| """ | |
| Url = self.host + '/api/device/' + deviceId + '/credentials' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def sendDeviceData(self, deviceId, dictToSend): | |
| """ | |
| This function will take device id as an input and send input dictToSend as JSON to it | |
| """ | |
| deviceCredentials = self.getDeviceCredentials(deviceId) | |
| accessToken = deviceCredentials['credentialsId'] | |
| Url = self.host + "/api/v1/" + accessToken + "/telemetry" | |
| Headers = {'Content-Type' : 'application/json', | |
| # 'X-Authorization': 'Bearer ' + self.token | |
| } | |
| resp = requests.post(Url, data = json.dumps(dictToSend, cls = NpEncoder), headers=Headers) | |
| def sendDeviceDataWithAccessToken(self, deviceAccessToken, dictToSend): | |
| """ | |
| This function will take device access token as an input and send input dictToSend as JSON to it | |
| """ | |
| Url = self.host + "/api/v1/" + deviceAccessToken + "/telemetry" | |
| Headers = {'Content-Type' : 'application/json', | |
| # 'X-Authorization': 'Bearer ' + self.token | |
| } | |
| resp = requests.post(Url, data = json.dumps(dictToSend, cls = NpEncoder), headers=Headers) | |
| def sendAssetData(self, assetId, dictToSend): | |
| """ | |
| This function will take asset id as an input and send input dictToSend as JSON to it | |
| """ | |
| Url = self.host + "/api/plugins/telemetry/ASSET/" + assetId + "/timeseries/timeseries" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token | |
| } | |
| resp = requests.post(Url, data = json.dumps(dictToSend, cls = NpEncoder), headers=Headers) | |
| print(resp.text) | |
| def getRelatedAssetToDevice(self, deviceId): | |
| """ | |
| This funciton returns asset that contains the device with provided id | |
| """ | |
| Url = self.host + "/api/relations?relationTypeGroup=COMMON&toId=" + deviceId + "&toType=DEVICE&fromType=ASSET" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getRelatedDevicesToAsset(self, assetId): | |
| """ | |
| This funciton returns devices that are contained in the asset with provided id | |
| """ | |
| Url = self.host + "/api/relations?relationTypeGroup=COMMON&fromId=" + assetId + "&toType=DEVICE&fromType=ASSET" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getRelatedDevicesToDevice(self, deviceId): | |
| """ | |
| This funciton returns devices that are contained in the device with provided id | |
| """ | |
| Url = self.host + "/api/relations?relationTypeGroup=COMMON&fromId=" + deviceId + "&toType=DEVICE&fromType=DEVICE" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getRelatedAssetsToAsset(self, assetId): | |
| """ | |
| This funciton returns assets that are contained in the asset with provided id | |
| """ | |
| Url = self.host + "/api/relations?relationTypeGroup=COMMON&fromId=" + assetId + "&toType=ASSET&fromType=ASSET" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getAssetInfo(self, assetId): | |
| """ | |
| This funciton returns asset info of asset with provided id as a dictionary | |
| """ | |
| Url = self.host + "/api/asset/" + assetId | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getUserDevices(self, numberOfDevices): | |
| """ | |
| This function returns array of devices which currnet user has access to | |
| """ | |
| Url = self.host + "/api/user/devices?pageSize=" + str(numberOfDevices) + "&page=0" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getGatewayDevices(self, n): | |
| """ | |
| This function returns list of all gateway devices which current user has access to | |
| """ | |
| devices = self.getUserDevices(str(n))['data'] | |
| gws = list(filter(lambda x : x['additionalInfo']['gateway'] if x['additionalInfo'] is not None else False, devices)) | |
| return gws | |
| def deleteEntityTimeseriesTelemetry(self, entityType, entityId, keys, startTS, endTS): | |
| """ | |
| This function is used to delete telemetry of provided entity, keys between startTS and endTS | |
| """ | |
| keysJoined = ','.join(keys) | |
| Url = self.host + f'/api/plugins/telemetry/{entityType}/{entityId}/timeseries/delete?keys={keysJoined}&deleteAllDataForKeys=false&startTs={startTS}&endTs={endTS}&rewriteLatestIfDeleted=false' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.delete(Url, headers=Headers) | |
| return data | |
| def pullData(self, fromTime, toTime, packageName = 'All', customerName = None, deviceName = None, dateTimeFormat = '%Y-%m-%d %H:%M:%S', keys = None, deviceProfile = None): | |
| """ | |
| Generic pipeline function for pulling data. | |
| fromTime : date with format as mentioned in `dateTimeFormat` | |
| toTime : date with format as mentioned in `dateTimeFormat` | |
| packageName : Package name in customer hierarchy | |
| customerName : Name of customer to download data from | |
| deviceName : Name of device | |
| dateTimeFormat : as mentioned above | |
| keys : list of tags | |
| deviceProfile : deviceProfile to download data from | |
| """ | |
| retDict = {} | |
| startTs = convertStrToDateTime(fromTime, formatStr=dateTimeFormat) | |
| endTs = convertStrToDateTime(toTime, formatStr=dateTimeFormat) | |
| if customerName is None and deviceName is None and deviceProfile is None: | |
| custHier = self.getCustomerHierarchy() | |
| packageObj = list(filter(lambda x : x['name'] == packageName, custHier))[0] | |
| packageCustomers = self.getEntityGroupCustomers(entityGroupId=packageObj['id']['id'])['data'] | |
| for packageCustomer in packageCustomers: | |
| customerId = packageCustomer['id']['id'] | |
| customerDevices = self.getCustomerDevices(customerId=customerId)['data'] | |
| for customerDevice in customerDevices: | |
| deviceId = customerDevice['id']['id'] | |
| if keys is None: | |
| deviceKeys = self.getDeviceKeys(deviceId=deviceId) | |
| keys = deviceKeys | |
| df = self.getDeviceDataValuesNew(deviceId = deviceId, keys= keys, interval=60000, limit=500000, startTS = startTs, endTS = endTs) | |
| retDict[customerDevice['name']] = df | |
| if deviceName is not None: | |
| deviceList = [] | |
| if isinstance(deviceName, list): | |
| devices = deviceName | |
| else: | |
| devices = [deviceName] | |
| for device in devices: | |
| searchResult = self.getDevicesByName(deviceName=device)['data'] | |
| deviceList.extend(searchResult) | |
| for device in deviceList: | |
| deviceId = device['id']['id'] | |
| if keys is None: | |
| deviceKeys = self.getDeviceKeys(deviceId=deviceId) | |
| keys = deviceKeys | |
| df = self.getDeviceDataValuesNew(deviceId = deviceId, keys= keys, interval=60000, limit=500000, startTS = startTs, endTS = endTs) | |
| retDict[device['name']] = df | |
| if deviceProfile is not None: | |
| devicesOfProfile = self.getDevicesOfType(deviceProfile)['data'] | |
| for device in devicesOfProfile: | |
| deviceId = device['id']['id'] | |
| if keys is None: | |
| deviceKeys = self.getDeviceKeys(deviceId=deviceId) | |
| keys = deviceKeys | |
| df = self.getDeviceDataValuesNew(deviceId = deviceId, keys= keys, interval=60000, limit=500000, startTS = startTs, endTS = endTs) | |
| retDict[device['name']] = df | |
| if customerName is not None: | |
| customerList = [] | |
| if isinstance(customerName, list): | |
| customers = customerName | |
| else: | |
| customers = [customerName] | |
| for customer in customers: | |
| searchResult = self.getCustomersByTitle(customerTitle = customer) | |
| customerList.append(searchResult) | |
| for packageCustomer in customerList: | |
| customerId = packageCustomer['id']['id'] | |
| customerDevices = self.getCustomerDevices(customerId=customerId)['data'] | |
| for customerDevice in customerDevices: | |
| deviceId = customerDevice['id']['id'] | |
| if keys is None: | |
| deviceKeys = self.getDeviceKeys(deviceId=deviceId) | |
| keys = deviceKeys | |
| df = self.getDeviceDataValuesNew(deviceId = deviceId, keys= keys, interval=60000, limit=500000, startTS = startTs, endTS = endTs) | |
| retDict[customerDevice['name']] = df | |
| return retDict | |
| def getDevicesByName(self, deviceName): | |
| """ | |
| This function returns list of devices which match given name as pattern | |
| """ | |
| Url = self.host + "/api/user/devices?pageSize=200&page=0&textSearch=" + deviceName | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getCustomersByTitle(self, customerTitle): | |
| """ | |
| This function returns list of customers which match given title as pattern | |
| """ | |
| Url = self.host + "/api/tenant/customers?customerTitle=" + customerTitle | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def createAlarm(self, alarmDict): | |
| """ | |
| This function creates alarm in tb dashboards | |
| """ | |
| Url = self.host + "/api/alarm" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.post(Url, data = json.dumps(alarmDict, cls=NpEncoder), headers=Headers) | |
| def getDevicesOf(self, packageName): | |
| """ | |
| This function will return list of devices under provided package name | |
| packageName : str : pakage name under customer hierarchy (special function for codel ... generalized) | |
| """ | |
| custHier = self.getCustomerHierarchy() | |
| Package_id = [item for item in custHier if item['name'] == packageName] | |
| Package_id = Package_id[0]['id']['id'] | |
| codelCust = self.getEntityGroupCustomers(Package_id) | |
| customers = map(lambda customerObj : {"id" : customerObj['id']['id'], "title" : customerObj['title']}, codelCust['data']) | |
| deviceIdList = [] | |
| for customer in list(customers): | |
| devices = self.getCustomerDevices(customer['id']) | |
| deviceIds = map(lambda deviceObj : deviceObj['id']['id'], devices['data']) | |
| deviceIdList.extend(devices['data']) | |
| return(deviceIdList) | |
| def setDeviceAttributes(self, deviceId, attributesDict, attrScope): | |
| """ | |
| This function will set attributes of 'deviceId' as specified in 'attributesDict' for the specified scope 'attrScope' | |
| """ | |
| Url = self.host + f"/api/plugins/telemetry/{deviceId}/{attrScope}" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| resp = requests.post(Url, data = json.dumps(attributesDict, cls = NpEncoder), headers=Headers) | |
| def deleteAlarms(self, alarmId): | |
| """ | |
| This function is used to delete alarm with mentioned alarmId | |
| """ | |
| Url = self.host + f'/api/alarm/{alarmId}' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.delete(Url, headers=Headers) | |
| return data | |
| def getEntityAlarms(self, entityType, entityId): | |
| """ | |
| This function is used to get alarms of specified entity | |
| """ | |
| Url = self.host + f"/api/alarm/{entityType}/{entityId}?pageSize=100000&page=0" | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| respDataJson = json.loads(data.text) | |
| return respDataJson | |
| def getEntityGroupEntities(self, entityGroupId): | |
| """ | |
| This function sends all entities in an entity group | |
| """ | |
| Url = self.host + f'/api/entityGroup/{entityGroupId}/entities?pageSize=200&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| return data.json() | |
| def getAllCustomers(self, pageSize = 5000): | |
| """ | |
| This function sends all customers under the current user | |
| """ | |
| Url = self.host + f'/api/customerInfos/all?pageSize={pageSize}&page=0' | |
| Headers = {'Content-Type' : 'application/json', | |
| 'X-Authorization': 'Bearer ' + self.token} | |
| data = requests.get(Url, headers=Headers) | |
| return data.json() |