Utilizing Python for Data Curation and Quality
Preliminary Steps¶
This section contains the preliminary steps to set up the base URL and import necessary libraries.
Replace IST_SOS_ENDPOINT
in the following script with your istSOS base URL (http://localhost:8018/istsos4/v1.1 or https://istsos.org/v4/v1.1).
!pip install saqc
!pip install folium
import json
import os
import re
import random
from datetime import datetime
import requests
from IPython.display import Markdown, display
IST_SOS_ENDPOINT = "http://localhost:8018/istsos4/v1.1"
Login as editor¶
username = input("Enter your username: ")
password = input("Enter your password: ")
if not username or not password:
print("Username or password is empty")
else:
data = {
"username": username,
"password": password,
"grant_type": "password",
}
response = requests.post(IST_SOS_ENDPOINT + "/Login", data=data)
if response.status_code == 200:
token = response.json()["access_token"]
print(
f"Token expires at: { datetime.fromtimestamp(response.json()['expires_in'])}"
)
prefix = username + "-"
print("Your station name will be prefixed with: " + prefix)
else:
result = json.dumps(response.json(), indent=2)
display(Markdown(f"```json\n{result}\n```"))
Creating multiple related entities with deep insert¶
coords = [
random.choice([8.509085, 8.513438]),
random.choice([46.504235, 46.507165]),
]
body = {
"description": "Meteo station recording temperature, humidity, and pressure",
"name": prefix + "BED",
"properties": {
"keywords": "weather,station,temperature,humidity,pressure",
"description": "Meteo station recording temperature, humidity, and pressure",
},
"Locations": [
{
"description": "Location of the BED meteo station",
"name": prefix + "BED",
"location": {
"type": "Point",
"coordinates": coords,
},
"encodingType": "application/vnd.geo+json",
}
],
"Datastreams": [
{
"unitOfMeasurement": {
"name": prefix + "Celsius degree",
"symbol": "°C",
"definition": "",
},
"description": "Temperature measurement",
"name": prefix + "T_BED",
"observationType": "",
"ObservedProperty": {
"name": prefix + "atmosphere:temperature",
"definition": "",
"description": "Air temperature",
},
"Sensor": {
"description": "Temperature sensor",
"name": prefix + "TempSensor",
"encodingType": "application/json",
"metadata": '{"brand": "SensorBrand", "type": "Temperature sensor"}',
},
},
{
"unitOfMeasurement": {
"name": prefix + "Percentage",
"symbol": "%",
"definition": "",
},
"description": "Humidity measurement",
"name": prefix + "H_BED",
"observationType": "",
"ObservedProperty": {
"name": prefix + "atmosphere:humidity",
"definition": "",
"description": "Air humidity",
},
"Sensor": {
"description": "Humidity sensor",
"name": prefix + "HumiditySensor",
"encodingType": "application/json",
"metadata": '{"brand": "SensorBrand", "type": "Humidity sensor"}',
},
},
{
"unitOfMeasurement": {
"name": prefix + "Millimiters",
"symbol": "mm",
"definition": "",
},
"description": "Pressure measurement",
"name": prefix + "P_BED",
"observationType": "",
"ObservedProperty": {
"name": prefix + "atmosphere:rain",
"definition": "",
"description": "Rain quantity",
},
"Sensor": {
"description": "Pluviometer sensor",
"name": prefix + "PluviometerSensor",
"encodingType": "application/json",
"metadata": '{"brand": "SensorBrand", "type": "Pluviometer sensor"}',
},
},
],
}
response = requests.post(
IST_SOS_ENDPOINT + "/Things",
data=json.dumps(body),
headers={
"Content-type": "application/json",
"Authorization": f"Bearer {token}",
"Commit-message": "Create the BED station",
},
)
if response.status_code == 201:
print(f"Thing created successfully ({response.headers['location']})")
match = re.search(r"\((\d+)\)", response.headers["location"])
if match:
thing_id = int(match.group(1))
else:
print("No number found in parentheses.")
else:
result = json.dumps(response.json(), indent=2)
display(Markdown(f"```json\n{result}\n```"))
response = requests.get(
IST_SOS_ENDPOINT + f"/Things({thing_id})/Datastreams",
headers={
"Authorization": f"Bearer {token}",
},
)
datastreams = response.json()["value"]
Insert Observations from CSV¶
To complete this step, first create a /data
directory on your machine. Then, download the CSV files from GitHub and save them into this directory
Procedure to insert Observations from csv files¶
import csv
import json
import requests
def csv2sta(
csv_file: str,
datastream_id: int,
token: str,
step=10000,
head=True,
commit_message="Insert observations",
max_rows=None,
):
"""
Insert observations from a CSV file to a datastream
Args:
csv_file (str): The path to the CSV file
datastream_id (int): The ID of the datastream
token (str): The token to authenticate the request
step (int): The number of rows to insert in a single request
head (bool): Whether to skip the first row of the CSV file
commit_message (str): The commit message for the request
max_rows (Optional[int]): The maximum number of rows to insert
"""
max_rows = max_rows
tmp_rows = 0
tmp_headers = {
"Content-type": "application/json",
"Authorization": f"Bearer {token}",
"Commit-message": commit_message,
}
with open(csv_file, "r") as f:
data = csv.reader(f, delimiter=",")
i = 0
post_data = [
{
"Datastream": {"@iot.id": datastream_id},
"components": [
"result",
"phenomenonTime",
"resultTime",
"resultQuality",
],
"dataArray": [],
}
]
for r in data:
if head and i == 0:
i += 1
continue
else:
ob = [
float(r[2]),
r[0],
r[0],
r[3],
]
post_data[0]["dataArray"].append(ob)
i += 1
tmp_rows += 1
if i == step:
req = requests.post(
f"{IST_SOS_ENDPOINT}/BulkObservations",
data=json.dumps(post_data),
headers=tmp_headers,
)
if req.status_code == 201:
print(f"Observation created successfully ({i})")
else:
print(req.text)
break
i = 0
post_data = [
{
"Datastream": {"@iot.id": datastream_id},
"components": [
"result",
"phenomenonTime",
"resultTime",
"resultQuality",
],
"dataArray": [],
}
]
if max_rows:
if tmp_rows >= max_rows:
if i > 0:
req = requests.post(
f"{IST_SOS_ENDPOINT}/BulkObservations",
data=json.dumps(post_data),
headers=tmp_headers,
)
if req.status_code == 201:
print(f"Observation created successfully ({i})")
else:
print(req.text)
break
i = 0
tmp_rows = 0
break
if i > 0:
req = requests.post(
f"{IST_SOS_ENDPOINT}/BulkObservations",
data=json.dumps(post_data),
headers=tmp_headers,
)
if req.status_code == 201:
print(f"Observation created successfully ({i})")
else:
print(req.text)
tmp_rows = 0
files = os.listdir("data")
for _file in files:
if "sta_" not in _file:
print("data" + os.sep + f"{_file}")
for ds in datastreams:
if _file.split(".")[0] in ds["name"]:
print(ds["name"], _file)
dt = ds["@iot.id"]
break
csv2sta("data" + os.sep + f"{_file}", dt, token, max_rows=30000)
From SensorThings API to Dataframe¶
Procedure for obtaining dataframes¶
import pandas as pd
from saqc import SaQC
def get_dfs_by_datastreams(
datastreams, top=15000, orderby="phenomenonTime asc"
):
"""
Get dataframes by datastreams
Args:
filter (str): The filter to apply
top (int): The number of results to return
orderby (str): The order of the results
Returns:
List[pd.DataFrame]: The dataframes
"""
dfs = {}
qcs = {}
for datastream in datastreams:
print(f"Datastream: {datastream['name']}")
print(f"Description: {datastream['description']}")
print(
f"Unit of measurement: {datastream['unitOfMeasurement']['name']} ({datastream['unitOfMeasurement']['symbol']})"
)
response = requests.get(
f"{IST_SOS_ENDPOINT}/Datastream({datastream['@iot.id']})/Observations?$top={top}&$orderby={orderby}",
headers={
"Authorization": f"Bearer {token}",
},
)
observations = response.json()
print(f"Number of observations: {len(observations['value'])}")
if len(observations["value"]) == 0:
print("\n")
print("--------------------")
continue
print(f"As of: {observations['@iot.as_of']}")
# Create a DataFrame
df = pd.DataFrame(observations["value"])
df.index = pd.to_datetime(df["phenomenonTime"])
df["result"] = pd.to_numeric(df["result"])
dfs[datastream["name"]] = df
# Create a QC object
qcs[datastream["name"]] = SaQC(data=df, scheme="float")
df["ylabel"] = f"{datastream['unitOfMeasurement']['symbol']}"
print("\n")
print("--------------------")
return dfs, qcs
dfs, qcs = get_dfs_by_datastreams(datastreams)
Plot Observations¶
for df in dfs:
dfs[df].plot(
y="result",
title=df,
figsize=(10, 4),
grid=True,
ylabel=f"{dfs[df]['ylabel'][0]}"
)
Some quality checks examples¶
To conduct quality controls we are using the SaQC - System for automated Quality Control library that provides all the building blocks to comfortably run data quality controls.
for _qc in qcs:
if "-T_" in _qc:
print(f"QC for {_qc} datastream temperature")
qc = (
qcs[_qc]
.flagMissing("result", flag=90) # flag missing values
.flagConstants("result", thresh=0.1, window="1D", flag=91) # flag constant values (10% variation max allowed)
.flagZScore("result", window="1D", thresh=3, flag=92) # flag z-score bigger than 3-sigma
.flagRange("result", min=-20, max=50, flag=93) # flag range outside range
)
dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
dfs[_qc].loc[qc.flags["result"] == 92, "resultQuality"] = 92
dfs[_qc].loc[qc.flags["result"] == 93, "resultQuality"] = 93
qc.plot(
"result",
ax_kwargs={
"ylabel": f"Temperature (°C)",
},
)
dfs[_qc].update
elif "-H_" in _qc:
print(f"QC for {_qc} datastream humidity")
qc = (
qcs[_qc]
.flagMissing("result", flag=90)
.flagConstants("result", thresh=0.1, window="1D", flag=91)
.flagZScore("result", window="1D", flag=92)
.flagRange("result", min=0, max=100, flag=93)
)
dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
dfs[_qc].loc[qc.flags["result"] == 92, "resultQuality"] = 92
dfs[_qc].loc[qc.flags["result"] == 93, "resultQuality"] = 93
qc.plot(
"result",
ax_kwargs={
"ylabel": f"Humidity (%)",
},
)
else:
print(f"QC for {_qc} datastream rain")
qc = (
qcs[_qc]
.flagMissing("result", flag=90)
.flagRange("result", min=0.2, max=0.2, flag=91)
)
dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
qc.plot(
"result",
ax_kwargs={
"ylabel": f"Rain (mm)",
},
)
Plots quality flags for each Datastream¶
for _df in dfs:
dfs[_df].plot(y="resultQuality", title=_df, figsize=(10, 4), grid=True)
Update Observation outlier¶
datetime_obs_update = datetime.now()
for df in dfs:
for row in dfs[df].loc[dfs[df]["resultQuality"]<100].itertuples():
body = {
"resultQuality": str(row[8])
}
response = requests.patch(
f"{IST_SOS_ENDPOINT}/Observations({row[1]})",
data=json.dumps(body),
headers={
"Content-type": "application/json",
"Authorization": f"Bearer {token}",
"Commit-message": "Quality flagging",
},
)
print(f"Observations of Datastream {df} updated")
Plot Observations after update¶
for df in dfs:
dfs[df].loc[dfs[df]["resultQuality"] >= 100].plot(
y="result",
title=df,
figsize=(10, 4),
grid=True,
ylabel=f"{dfs[df]['ylabel'][0]}",
)
dfs[df].loc[dfs[df]["resultQuality"] < 100].plot.scatter(
x="phenomenonTime",
y="result",
grid=True,
)
Plot the things on a Map
# STA.get_token(username, password)
# t = STA.query_api("Things?$expand=Locations")
# STA.map_things(t)
response = requests.get(
IST_SOS_ENDPOINT + "/Things?$expand=Locations",
headers={"Authorization": f"Bearer {token}"},
)
# things = json.dumps(response.json(), indent=2)
# display(Markdown(f"```json\n{things}\n```"))
things = response.json()['value']
import folium
map_center = [46.505978, 8.511378]
mymap = folium.Map(location=map_center, zoom_start=16)
if things:
for thing in things:
if thing["Locations"]:
# print(thing["Locations"])
for location in thing["Locations"]:
coords = location["location"]["coordinates"]
lat, lon = (
coords[1],
coords[0],
)
folium.Marker(
location=[lat, lon],
tooltip=thing["name"],
).add_to(mymap)
mymap