Starting to move vitalsd out of the early prototyping phase

This commit is contained in:
Innovation 2024-04-12 10:33:19 +01:00
parent 0f500a97c5
commit 2a004cdb93
2 changed files with 194 additions and 66 deletions

View file

@ -0,0 +1,177 @@
import subprocess
import requests
import sys
from flask import jsonify
class Infinitime:
heartrateCmd = ['itctl', 'get', 'heart']
stepsCmd = ['itctl', 'get', 'steps']
batteryCmd = ['itctl', 'get', 'battery']
def getHeartrate(self):
proc = subprocess.Popen(self.heartrateCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
if(proc.returncode != 0):
return None # Inform that cyberware is inoperative
o = o.decode("utf-8")
bpm = o.split(' ')
return bpm[0]
def getSteps(self):
proc = subprocess.Popen(self.stepsCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
if(proc.returncode != 0):
return None # Inform that cyberware is inoperative
o = o.decode("utf-8")
steps = o.split(' ')
return steps[0]
def getBattery(self):
proc = subprocess.Popen(self.batteryCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
if(proc.returncode != 0):
return None # Inform that cyberware is inoperative
o = o.decode("utf-8")
battery = o.split('%')
return battery[0]
#heartrate = 0
#steps = 0
#battery = 0
urlBase = 'http://localhost:5000'
urlHeartrate = urlBase + '/api/vitals/heartrate'
urlSteps = urlBase + '/api/fitness/steps'
urlAddCyberware = urlBase + '/api/cyberware/add'
urlRemoveCyberware = urlBase + '/api/cyberware/remove'
urlBattery = urlBase + '/api/cyberware/battery'
uuid = None
def __init__(self):
print("Created object")
itdFailMsg = "Couldn't get value. Is itd running?"
apiFailMsg = "Couldn't send value. Is Nightcall running?"
def sendHeartrate(self):
if(self.uuid==None):
print("Cyberware not connected to Nightcall.")
return
try:
heartrate = self.getHeartrate()
except:
print(self.itdFailMsg)
return 1 # 1 = itd fail
try:
heartrateRequest = requests.post(self.urlHeartrate, json={ 'heartrate': heartrate, 'uuid': self.uuid })
except:
print(self.apiFailMsg)
return 2 # 2 = No API contact.
if heartrateRequest.status_code > 399:
return 3 # 3 = API refused request.
if heartrateRequest.status_code > 299:
return 4 # 4 = API encountered an internal error.
return 0 # 0 = Success
def sendSteps(self):
if(self.uuid==None):
print("Cyberware not connected to Nightcall.")
return
try:
steps = self.getSteps()
except:
print(self.itdFailMsg)
return 1 # 1 = itd fail
try:
stepsRequest = requests.post(self.urlSteps, json={ 'steps': steps, 'uuid': self.uuid})
except:
print(self.apiFailMsg)
return 2 # 2 = No API contact.
if stepsRequest.status_code > 399:
return 3 # 3 = API refused request.
if stepsRequest.status_code > 299:
return 4 # 4 = API encountered an internal error.
return 0 # 0 = Success
def sendBattery(self):
if(self.uuid==None):
print("Cyberware not connected to Nightcall.")
return
try:
battery = self.getBattery()
except:
print(itdFailMsg)
return 1 # 1 = itd fail
try:
batteryRequest = requests.post(self.urlSteps, json={ 'battery': battery, 'uuid': self.uuid})
except:
print(self.apiFailMsg)
return 2 # 2 = No API contact.
if batteryRequest.status_code > 399:
return 3 # 3 = API refused request.
if batteryRequest.status_code > 299:
return 4 # 4 = API encountered an internal error.
return 0 # 0 = Success
def connectCyberware(self):
global uuid
# Add to Cyberware and get UUID
try:
uuidRequest = requests.post(self.urlAddCyberware, json={ 'name': "PineTime", 'hotpluggable': True, 'canSet': [ '/api/vitals/heartrate', '/api/fitness/steps' ] })
except:
return 2 # 2 = No API contact.
if(uuidRequest.status_code > 399):
print("Nightcall refused the request to add Cyberware.")
return 3 # 3 = API refused request.
if(uuidRequest.status_code > 299):
print("Nightcall has encountered an internal error.")
return 2 # 4 = API encountered an internal error.
self.uuid = uuidRequest.json()[0]['uuid']
print("Successfully added cyberware. UUID: " + self.uuid)
def disconnectCyberware(self):
global uuid
try:
disconnectRequest = requests.post(self.urlRemoveCyberware, json={ 'uuid': self.uuid })
except:
print(self.apiFailMsg)
self.uuid = None
# while True:
# try:
# heartrate = getHeartrate()
# steps = getSteps()
#battery = getBattery()
# heartrateRequest = requests.post(urlHeartrate, json={ 'heartrate': heartrate, 'uuid': uuid } )
# stepsRequest = requests.post(urlSteps, json={ 'steps': steps, 'uuid': uuid })
# except:
# print("An exception occured. TODO: Exception report to frontend.")
# time.sleep(1)
# Disconnect Cyberware
# requests.post(urlRemoveCyberware, json={ 'uuid': uuid })

View file

@ -1,74 +1,25 @@
import subprocess from infinitime import Infinitime
import time import time
import requests
import sys
from flask import jsonify
heartrateCmd = ['itctl', 'get', 'heart'] pinetimeEnabled = True
stepsCmd = ['itctl', 'get', 'steps']
batteryCmd = ['itctl', 'get', 'battery']
def getHeartrate(): if(pinetimeEnabled):
proc = subprocess.Popen(heartrateCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) pinetime = Infinitime()
o, e = proc.communicate() pinetime.connectCyberware()
if(proc.returncode != 0): continueFlag = True
return -1 # Inform that cyberware is inoperative
o = o.decode("utf-8")
bpm = o.split(' ')
return bpm[0]
def getSteps(): while continueFlag:
proc = subprocess.Popen(stepsCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
if(proc.returncode != 0):
return -1 # Inform that cyberware is inoperative
o = o.decode("utf-8")
steps = o.split(' ')
return steps[0]
def getBattery():
proc = subprocess.Popen(batteryCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
o, e = proc.communicate()
if(proc.returncode != 0):
return -1 # Inform that cyberware is inoperative
o = o.decode("utf-8")
battery = o.split('%')
return battery[0]
heartrate = 0
steps = 0
battery = 0
urlBase = 'http://localhost:5000'
urlHeartrate = urlBase + '/api/vitals/heartrate'
urlSteps = urlBase + '/api/fitness/steps'
urlAddCyberware = urlBase + '/api/cyberware/add'
#urlBattery = urlBase + # Cyberware management not yet implemented
# Add to Cyberware and get UUID
uuidRequest = requests.post(urlAddCyberware, json={ 'name': "PineTime", 'hotpluggable': True, 'canSet': [ '/api/vitals/heartrate', '/api/fitness/steps' ] })
if(uuidRequest.status_code == 200):
uuid = uuidRequest.json()[0]['uuid']
print(uuid)
else:
sys.exit("Failed to get UUID")
while True:
try: try:
heartrate = getHeartrate() if(pinetimeEnabled):
steps = getSteps() pinetime.sendHeartrate()
#battery = getBattery() pinetime.sendSteps()
pinetime.sendHeartrate()
heartrateRequest = requests.post(urlHeartrate, json={ 'heartrate': heartrate, 'uuid': uuid } ) time.sleep(1)
stepsRequest = requests.post(urlSteps, json={ 'steps': steps, 'uuid': uuid }) except KeyboardInterrupt:
except: print("Stopping vitalsd")
print("An exception occured. TODO: Exception report to frontend.") continueFlag = False
time.sleep(1) if(pinetimeEnabled):
pinetime.disconnectCyberware()
# Disconnect Cyberware
requests.post(urlRemoveCyberware, json={ 'uuid': uuid })