Utilizing the SensorThings API¶
Preliminary Steps¶
Login as editor¶
Replace IST_SOS_ENDPOINT
in the following script with your istSOS base URL (http://localhost:8018/istsos4/v1.1 or https://istsos.org/v4/v1.1).
import json
import re
from datetime import datetime
import requests
from IPython.display import Markdown, display
IST_SOS_ENDPOINT = "http://localhost:8018/istsos4/v1.1"
username = input("Enter your username: ")
password = input("Enter your password: ")
if not username or not password:
print("Username or password is empty")
else:
data = {
"username": username,
"password": password,
"grant_type": "password",
}
response = requests.post(IST_SOS_ENDPOINT + "/Login", data=data)
if response.status_code == 200:
token = response.json()["access_token"]
print(
f"Token expires at: { datetime.fromtimestamp(response.json()['expires_in'])}"
)
prefix = username + "-"
print("Your station name will be prefixed with: " + prefix)
else:
result = json.dumps(response.json(), indent=2)
display(Markdown(f"```json\n{result}\n```"))
Create Datastream¶
body = {
"unitOfMeasurement": {"name": "", "symbol": "RSSI", "definition": ""},
"description": "",
"name": f"{prefix}RSSI_FIU_VALL",
"observationType": "",
"ObservedProperty": {
"name": "ground:water:signal_strength",
"definition": "{}",
"description": "Ground water signal_strength",
},
"Sensor": {
"name": f"{prefix}Ecolog 10000",
"description": "",
"properties": {},
"encodingType": "application/json",
"metadata": '{"brand": "OTT", "type": "Pressure, temperature, electrical conductivity sensor"}',
},
"Thing": {
"name": f"{prefix}FIU_VALL",
"description": "Water level, water temperature and water electrical conductivity recorder Ticino river",
"properties": {
"keywords": "water,river,height,temperature,conductivity,ACSOT",
"description": "River level, water temperature and water electrical conductivity fiume Ticino valle",
},
"Locations": [
{
"name": f"{prefix}fiume Ticino",
"description": "",
"encodingType": "application/vnd.geo+json",
"location": {
"type": "Point",
"coordinates": [8.956099, 46.172245],
},
}
],
},
}
response = requests.post(
IST_SOS_ENDPOINT + "/Datastreams",
data=json.dumps(body),
headers={
"Content-type": "application/json",
"Authorization": f"Bearer {token}",
"Commit-message": "Create Datastrem and related Observations",
},
)
if response.status_code == 201:
print(f"Datastream created successfully ({response.headers['location']})")
match = re.search(r"\((\d+)\)", response.headers["location"])
if match:
datastream_id = int(match.group(1))
else:
print("No number found in parentheses.")
else:
result = json.dumps(response.json(), indent=2)
display(Markdown(f"```json\n{result}\n```"))
To complete this step, download the fakedata2sta.py
file from GitHub.
This script collects weather data and sends fake observations to an istSOS endpoint.
To start the service, navigate to the directory containing fakedata2sta.py
file and execute the following command to run it in the background and begin generating data::
python3 fakedata2sta.py
The script ask you these parameters:
istsos_endpoint
: The istSOS base URL (e.g.IST_SOS_ENDPOINT
).istsos_username
: The username of an istSOS user with privileges to send data (typically an editor-level account).istsos_password
: The password of the istsos user.datastream_id
: The identifier of the datastream to which observations will be linked (e.g. ID of the previously created Datastream).frequency
: The frequency of the data to be sent.latitude
: The latitude of the position.longitude
: The longitude of the position.
Jupyter notebook¶
You can now use the Jupyter Notebook to visualize the data.
In the following script, replace IST_SOS_ENDPOINT
variable with your istSOS base URL (http://localhost:8018/istsos4/v1.1 or https://istsos.org/v4/v1.1).
import requests
from matplotlib import pyplot as plt
from dateutil import parser
import requests
import time
import matplotlib.pyplot as plt
from dateutil import parser
from IPython.display import clear_output
IST_SOS_ENDPOINT = "http://localhost:8018/istsos4/v1.1"
Login as editor¶
The script ask you these parameters:
istsos_username
: The username of an istSOS user with privileges to send data (typically an editor-level account).istsos_password
: The password of the istsos user.datastream_id
: The identifier of the datastream to which observations will be linked (e.g. ID of the previously created Datastream).frequency
: The frequency of the data to be sent.
# Set variables from user input
istsos_username = input("Enter your istsos username: ")
if istsos_username.strip() == "":
print("You must enter a username")
exit()
istsos_password = input("Enter your istsos password: ")
if istsos_password.strip() == "":
print("You must enter a password")
exit()
datastream_id = input("Enter the datastream ID: ")
if datastream_id.strip() == "":
print("You must enter a datastream ID")
exit()
else:
try:
datastream_id = int(datastream_id)
except ValueError:
print("Datastream ID must be an integer")
exit()
frequency = input(
"Enter the frequency of the stream in seconds (default: 5): "
)
if frequency.strip() == "":
frequency = 5
else:
try:
frequency = int(frequency)
except ValueError:
print("Frequency must be an integer")
exit()
# Login to istsos and get token
req = requests.post(
f"{IST_SOS_ENDPOINT}/Login",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={"username": istsos_username, "password": istsos_password},
)
if req.status_code != 200:
print("Login failed")
raise Exception(req.text)
token_obj = req.json()
Retrieve data¶
# Initial data fetch
req = requests.get(
f"{IST_SOS_ENDPOINT}/Datastreams({datastream_id})/Observations?$orderby=phenomenonTime desc",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token_obj['access_token']}",
},
)
if req.status_code != 200:
print("Failed to get datastream")
raise Exception(req.text)
else:
data = req.json()["value"]
dt = [parser.parse(i["phenomenonTime"]) for i in data]
values = [i["result"] for i in data]
# Reverse the order for chronological plotting
dt.reverse()
values.reverse()
while True:
# Fetch the latest observation
req = requests.get(
f"{IST_SOS_ENDPOINT}/Datastreams({datastream_id})/Observations?$orderby=phenomenonTime desc",
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token_obj['access_token']}",
},
)
if req.status_code == 200:
clear_output(wait=True)
fig, ax = plt.subplots(figsize=(20, 10))
ax.set_title("Real-Time Sensor Data")
ax.set_xlabel("Time")
ax.set_ylabel("Values")
ax.grid(True)
new_data = req.json()["value"]
for obs in new_data:
obs_time = parser.parse(obs["phenomenonTime"])
if obs_time not in dt:
dt.append(obs_time)
values.append(obs["result"])
ax.plot(dt, values, label="Sensor Data")
plt.pause(0.1)
else:
print("Failed to fetch the latest observation")
print(req.text)
time.sleep(frequency)