Skip to content

Utilizing Python for Data Curation and Quality

Preliminary Steps

This section contains the preliminary steps to set up the base URL and import necessary libraries.

Replace IST_SOS_ENDPOINT in the following script with your istSOS base URL (http://localhost:8018/istsos4/v1.1 or https://istsos.org/v4/v1.1).

!pip install saqc
!pip install folium
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: saqc in /home/maxi/.local/lib/python3.10/site-packages (2.6.0)
Requirement already satisfied: outlier-utils in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (0.0.5)
Requirement already satisfied: pandas>=2.0.0 in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (2.2.2)
Requirement already satisfied: docstring-parser in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (0.16)
Requirement already satisfied: pyarrow in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (18.1.0)
Requirement already satisfied: typing-extensions in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (4.12.2)
Requirement already satisfied: matplotlib>=3.4 in /usr/lib/python3/dist-packages (from saqc) (3.5.1)
Requirement already satisfied: fancy-collections in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (0.3.0)
Requirement already satisfied: numpy in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (1.26.4)
Requirement already satisfied: Click in /usr/lib/python3/dist-packages (from saqc) (8.0.3)
Requirement already satisfied: fastdtw in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (0.3.4)
Requirement already satisfied: scipy in /usr/lib/python3/dist-packages (from saqc) (1.8.0)
Requirement already satisfied: scikit-learn in /home/maxi/.local/lib/python3.10/site-packages (from saqc) (1.5.2)
Requirement already satisfied: tzdata>=2022.7 in /home/maxi/.local/lib/python3.10/site-packages (from pandas>=2.0.0->saqc) (2024.1)
Requirement already satisfied: python-dateutil>=2.8.2 in /home/maxi/.local/lib/python3.10/site-packages (from pandas>=2.0.0->saqc) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /usr/lib/python3/dist-packages (from pandas>=2.0.0->saqc) (2022.1)
Requirement already satisfied: sliceable-dict>=0.4.1 in /home/maxi/.local/lib/python3.10/site-packages (from fancy-collections->saqc) (0.4.1)
Requirement already satisfied: threadpoolctl>=3.1.0 in /home/maxi/.local/lib/python3.10/site-packages (from scikit-learn->saqc) (3.5.0)
Requirement already satisfied: joblib>=1.2.0 in /home/maxi/.local/lib/python3.10/site-packages (from scikit-learn->saqc) (1.4.2)
Requirement already satisfied: six>=1.5 in /usr/lib/python3/dist-packages (from python-dateutil>=2.8.2->pandas>=2.0.0->saqc) (1.16.0)
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: folium in /home/maxi/.local/lib/python3.10/site-packages (0.18.0)
Requirement already satisfied: numpy in /home/maxi/.local/lib/python3.10/site-packages (from folium) (1.26.4)
Requirement already satisfied: requests in /home/maxi/.local/lib/python3.10/site-packages (from folium) (2.31.0)
Requirement already satisfied: branca>=0.6.0 in /home/maxi/.local/lib/python3.10/site-packages (from folium) (0.8.0)
Requirement already satisfied: jinja2>=2.9 in /usr/lib/python3/dist-packages (from folium) (3.0.3)
Requirement already satisfied: xyzservices in /home/maxi/.local/lib/python3.10/site-packages (from folium) (2024.9.0)
Requirement already satisfied: urllib3<3,>=1.21.1 in /home/maxi/.local/lib/python3.10/site-packages (from requests->folium) (2.1.0)
Requirement already satisfied: idna<4,>=2.5 in /usr/lib/python3/dist-packages (from requests->folium) (3.3)
Requirement already satisfied: charset-normalizer<4,>=2 in /home/maxi/.local/lib/python3.10/site-packages (from requests->folium) (3.3.2)
Requirement already satisfied: certifi>=2017.4.17 in /usr/lib/python3/dist-packages (from requests->folium) (2020.6.20)

import json
import os
import re
import random
from datetime import datetime

import requests
from IPython.display import Markdown, display

IST_SOS_ENDPOINT = "http://localhost:8018/istsos4/v1.1"

Login as editor

username = input("Enter your username: ")
password = input("Enter your password: ")

if not username or not password:
    print("Username or password is empty")

else:
    data = {
        "username": username,
        "password": password,
        "grant_type": "password",
    }

    response = requests.post(IST_SOS_ENDPOINT + "/Login", data=data)
    if response.status_code == 200:
        token = response.json()["access_token"]
        print(
            f"Token expires at: { datetime.fromtimestamp(response.json()['expires_in'])}"
        )
        prefix = username + "-"
        print("Your station name will be prefixed with: " + prefix)
    else:
        result = json.dumps(response.json(), indent=2)
        display(Markdown(f"```json\n{result}\n```"))
Token expires at: 2024-12-15 01:54:14
Your station name will be prefixed with: editor0-

coords = [
    random.choice([8.509085, 8.513438]),
    random.choice([46.504235, 46.507165]),
]
body = {
    "description": "Meteo station recording temperature, humidity, and pressure",
    "name": prefix + "BED",
    "properties": {
        "keywords": "weather,station,temperature,humidity,pressure",
        "description": "Meteo station recording temperature, humidity, and pressure",
    },
    "Locations": [
        {
            "description": "Location of the BED meteo station",
            "name": prefix + "BED",
            "location": {
                "type": "Point",
                "coordinates": coords,
            },
            "encodingType": "application/vnd.geo+json",
        }
    ],
    "Datastreams": [
        {
            "unitOfMeasurement": {
                "name": prefix + "Celsius degree",
                "symbol": "°C",
                "definition": "",
            },
            "description": "Temperature measurement",
            "name": prefix + "T_BED",
            "observationType": "",
            "ObservedProperty": {
                "name": prefix + "atmosphere:temperature",
                "definition": "",
                "description": "Air temperature",
            },
            "Sensor": {
                "description": "Temperature sensor",
                "name": prefix + "TempSensor",
                "encodingType": "application/json",
                "metadata": '{"brand": "SensorBrand", "type": "Temperature sensor"}',
            },
        },
        {
            "unitOfMeasurement": {
                "name": prefix + "Percentage",
                "symbol": "%",
                "definition": "",
            },
            "description": "Humidity measurement",
            "name": prefix + "H_BED",
            "observationType": "",
            "ObservedProperty": {
                "name": prefix + "atmosphere:humidity",
                "definition": "",
                "description": "Air humidity",
            },
            "Sensor": {
                "description": "Humidity sensor",
                "name": prefix + "HumiditySensor",
                "encodingType": "application/json",
                "metadata": '{"brand": "SensorBrand", "type": "Humidity sensor"}',
            },
        },
        {
            "unitOfMeasurement": {
                "name": prefix + "Millimiters",
                "symbol": "mm",
                "definition": "",
            },
            "description": "Pressure measurement",
            "name": prefix + "P_BED",
            "observationType": "",
            "ObservedProperty": {
                "name": prefix + "atmosphere:rain",
                "definition": "",
                "description": "Rain quantity",
            },
            "Sensor": {
                "description": "Pluviometer sensor",
                "name": prefix + "PluviometerSensor",
                "encodingType": "application/json",
                "metadata": '{"brand": "SensorBrand", "type": "Pluviometer sensor"}',
            },
        },
    ],
}

response = requests.post(
    IST_SOS_ENDPOINT + "/Things",
    data=json.dumps(body),
    headers={
        "Content-type": "application/json",
        "Authorization": f"Bearer {token}",
        "Commit-message": "Create the BED station",
    },
)

if response.status_code == 201:
    print(f"Thing created successfully ({response.headers['location']})")
    match = re.search(r"\((\d+)\)", response.headers["location"])
    if match:
        thing_id = int(match.group(1))
    else:
        print("No number found in parentheses.")
else:
    result = json.dumps(response.json(), indent=2)
    display(Markdown(f"```json\n{result}\n```"))

response = requests.get(
    IST_SOS_ENDPOINT + f"/Things({thing_id})/Datastreams",
    headers={
        "Authorization": f"Bearer {token}",
    },
)
datastreams = response.json()["value"]
Thing created successfully (http://localhost:8018/istsos4/v1.1/Things(4))

Insert Observations from CSV

To complete this step, first create a /data directory on your machine. Then, download the CSV files from GitHub and save them into this directory

Procedure to insert Observations from csv files

import csv
import json

import requests


def csv2sta(
    csv_file: str,
    datastream_id: int,
    token: str,
    step=10000,
    head=True,
    commit_message="Insert observations",
    max_rows=None,
):
    """
    Insert observations from a CSV file to a datastream
    Args:
        csv_file (str): The path to the CSV file
        datastream_id (int): The ID of the datastream
        token (str): The token to authenticate the request
        step (int): The number of rows to insert in a single request
        head (bool): Whether to skip the first row of the CSV file
        commit_message (str): The commit message for the request
        max_rows (Optional[int]): The maximum number of rows to insert
    """
    max_rows = max_rows
    tmp_rows = 0
    tmp_headers = {
        "Content-type": "application/json",
        "Authorization": f"Bearer {token}",
        "Commit-message": commit_message,
    }
    with open(csv_file, "r") as f:
        data = csv.reader(f, delimiter=",")
        i = 0
        post_data = [
            {
                "Datastream": {"@iot.id": datastream_id},
                "components": [
                    "result",
                    "phenomenonTime",
                    "resultTime",
                    "resultQuality",
                ],
                "dataArray": [],
            }
        ]
        for r in data:
            if head and i == 0:
                i += 1
                continue
            else:
                ob = [
                    float(r[2]),
                    r[0],
                    r[0],
                    r[3],
                ]
                post_data[0]["dataArray"].append(ob)
            i += 1
            tmp_rows += 1
            if i == step:
                req = requests.post(
                    f"{IST_SOS_ENDPOINT}/BulkObservations",
                    data=json.dumps(post_data),
                    headers=tmp_headers,
                )
                if req.status_code == 201:
                    print(f"Observation created successfully ({i})")
                else:
                    print(req.text)
                    break
                i = 0
                post_data = [
                    {
                        "Datastream": {"@iot.id": datastream_id},
                        "components": [
                            "result",
                            "phenomenonTime",
                            "resultTime",
                            "resultQuality",
                        ],
                        "dataArray": [],
                    }
                ]
            if max_rows:
                if tmp_rows &gt;= max_rows:
                    if i &gt; 0:
                        req = requests.post(
                            f"{IST_SOS_ENDPOINT}/BulkObservations",
                            data=json.dumps(post_data),
                            headers=tmp_headers,
                        )
                        if req.status_code == 201:
                            print(f"Observation created successfully ({i})")
                        else:
                            print(req.text)
                            break
                    i = 0
                    tmp_rows = 0
                    break
        if i &gt; 0:
            req = requests.post(
                f"{IST_SOS_ENDPOINT}/BulkObservations",
                data=json.dumps(post_data),
                headers=tmp_headers,
            )
            if req.status_code == 201:
                print(f"Observation created successfully ({i})")
            else:
                print(req.text)
            tmp_rows = 0
files = os.listdir("data")
for _file in files:
    if "sta_" not in _file:
        print("data" + os.sep + f"{_file}")
        for ds in datastreams:
            if _file.split(".")[0] in ds["name"]:
                print(ds["name"], _file)
                dt = ds["@iot.id"]
                break
        csv2sta("data" + os.sep + f"{_file}", dt, token, max_rows=30000)
data/H_BED.csv
editor0-H_BED H_BED.csv
Observation created successfully (10000)
Observation created successfully (10000)
Observation created successfully (10000)
Observation created successfully (4)
data/P_BED.csv
editor0-P_BED P_BED.csv
Observation created successfully (7008)
data/T_BED.csv
editor0-T_BED T_BED.csv
Observation created successfully (10000)
Observation created successfully (10000)
Observation created successfully (10000)
Observation created successfully (4)

From SensorThings API to Dataframe

Procedure for obtaining dataframes

import pandas as pd
from saqc import SaQC

def get_dfs_by_datastreams(
    datastreams, top=15000, orderby="phenomenonTime asc"
):
    """
    Get dataframes by datastreams
    Args:
        filter (str): The filter to apply
        top (int): The number of results to return
        orderby (str): The order of the results
    Returns:
        List[pd.DataFrame]: The dataframes
    """
    dfs = {}
    qcs = {}

    for datastream in datastreams:
        print(f"Datastream: {datastream['name']}")
        print(f"Description: {datastream['description']}")
        print(
            f"Unit of measurement: {datastream['unitOfMeasurement']['name']} ({datastream['unitOfMeasurement']['symbol']})"
        )
        response = requests.get(
            f"{IST_SOS_ENDPOINT}/Datastream({datastream['@iot.id']})/Observations?$top={top}&amp;$orderby={orderby}",
            headers={
                "Authorization": f"Bearer {token}",
            },
        )
        observations = response.json()
        print(f"Number of observations: {len(observations['value'])}")
        if len(observations["value"]) == 0:
            print("\n")
            print("--------------------")
            continue
        print(f"As of: {observations['@iot.as_of']}")
        # Create a DataFrame
        df = pd.DataFrame(observations["value"])
        df.index = pd.to_datetime(df["phenomenonTime"])
        df["result"] = pd.to_numeric(df["result"])
        dfs[datastream["name"]] = df
        # Create a QC object
        qcs[datastream["name"]] = SaQC(data=df, scheme="float")
        df["ylabel"] = f"{datastream['unitOfMeasurement']['symbol']}"
        print("\n")
        print("--------------------")
    return dfs, qcs
/usr/lib/python3/dist-packages/scipy/__init__.py:146: UserWarning: A NumPy version >=1.17.3 and <1.25.0 is required for this version of SciPy (detected version 1.26.4
  warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"

dfs, qcs = get_dfs_by_datastreams(datastreams)
Datastream: editor0-T_BED
Description: Temperature measurement
Unit of measurement: editor0-Celsius degree (°C)
Number of observations: 15001
As of: 2024-12-14T23:56:49Z


--------------------
Datastream: editor0-H_BED
Description: Humidity measurement
Unit of measurement: editor0-Percentage (%)
Number of observations: 15001
As of: 2024-12-14T23:56:50Z


--------------------
Datastream: editor0-P_BED
Description: Pressure measurement
Unit of measurement: editor0-Millimiters (mm)
Number of observations: 7007
As of: 2024-12-14T23:56:51Z


--------------------

Plot Observations

for df in dfs:
    dfs[df].plot(
        y="result",
        title=df,
        figsize=(10, 4),
        grid=True,
        ylabel=f"{dfs[df]['ylabel'][0]}"
    )
/tmp/ipykernel_89703/3886337611.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}"
/tmp/ipykernel_89703/3886337611.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}"
/tmp/ipykernel_89703/3886337611.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}"

No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Some quality checks examples

To conduct quality controls we are using the SaQC - System for automated Quality Control library that provides all the building blocks to comfortably run data quality controls.

for _qc in qcs:
    if "-T_" in _qc:
        print(f"QC for {_qc} datastream temperature")
        qc = (
            qcs[_qc]
            .flagMissing("result", flag=90) # flag missing values
            .flagConstants("result", thresh=0.1, window="1D", flag=91) # flag constant values (10% variation max allowed)
            .flagZScore("result", window="1D", thresh=3, flag=92) # flag z-score bigger than 3-sigma
            .flagRange("result", min=-20, max=50, flag=93) # flag range outside range
        )
        dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
        dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
        dfs[_qc].loc[qc.flags["result"] == 92, "resultQuality"] = 92
        dfs[_qc].loc[qc.flags["result"] == 93, "resultQuality"] = 93
        qc.plot(
            "result",
            ax_kwargs={
                "ylabel": f"Temperature (°C)",
            },
        )
        dfs[_qc].update
    elif "-H_" in _qc:
        print(f"QC for {_qc} datastream humidity")
        qc = (
            qcs[_qc]
            .flagMissing("result", flag=90)
            .flagConstants("result", thresh=0.1, window="1D", flag=91)
            .flagZScore("result", window="1D", flag=92)
            .flagRange("result", min=0, max=100, flag=93)
        )
        dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
        dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
        dfs[_qc].loc[qc.flags["result"] == 92, "resultQuality"] = 92
        dfs[_qc].loc[qc.flags["result"] == 93, "resultQuality"] = 93
        qc.plot(
            "result",
            ax_kwargs={
                "ylabel": f"Humidity (%)",
            },
        )
    else:
        print(f"QC for {_qc} datastream rain")
        qc = (
            qcs[_qc]
            .flagMissing("result", flag=90)
            .flagRange("result", min=0.2, max=0.2, flag=91)
        )
        dfs[_qc].loc[qc.flags["result"] == 90, "resultQuality"] = 90
        dfs[_qc].loc[qc.flags["result"] == 91, "resultQuality"] = 91
        qc.plot(
            "result",
            ax_kwargs={
                "ylabel": f"Rain (mm)",
            },
        )
QC for editor0-T_BED datastream temperature

No description has been provided for this image
QC for editor0-H_BED datastream humidity

No description has been provided for this image
QC for editor0-P_BED datastream rain

No description has been provided for this image

Plots quality flags for each Datastream

for _df in dfs:
    dfs[_df].plot(y="resultQuality", title=_df, figsize=(10, 4), grid=True)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Update Observation outlier

datetime_obs_update = datetime.now()
for df in dfs:
    for row in dfs[df].loc[dfs[df]["resultQuality"]&lt;100].itertuples():
        body = {
            "resultQuality": str(row[8])
        }
        response = requests.patch(
            f"{IST_SOS_ENDPOINT}/Observations({row[1]})",
            data=json.dumps(body),
            headers={
                "Content-type": "application/json",
                "Authorization": f"Bearer {token}",
                "Commit-message": "Quality flagging",
            },
        )
    print(f"Observations of Datastream {df} updated")
Observations of Datastream editor0-T_BED updated
Observations of Datastream editor0-H_BED updated
Observations of Datastream editor0-P_BED updated

Plot Observations after update

for df in dfs:
    dfs[df].loc[dfs[df]["resultQuality"] &gt;= 100].plot(
        y="result",
        title=df,
        figsize=(10, 4),
        grid=True,
        ylabel=f"{dfs[df]['ylabel'][0]}",
    )
    dfs[df].loc[dfs[df]["resultQuality"] &lt; 100].plot.scatter(
        x="phenomenonTime",
        y="result",
        grid=True,
    )
/tmp/ipykernel_89703/3016639249.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}",
/tmp/ipykernel_89703/3016639249.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}",
/tmp/ipykernel_89703/3016639249.py:7: FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`
  ylabel=f"{dfs[df]['ylabel'][0]}",

No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image

Plot the things on a Map

# STA.get_token(username, password)

# t = STA.query_api("Things?$expand=Locations")
# STA.map_things(t)

response = requests.get(
    IST_SOS_ENDPOINT + "/Things?$expand=Locations",
    headers={"Authorization": f"Bearer {token}"},
)
# things = json.dumps(response.json(), indent=2)
# display(Markdown(f"```json\n{things}\n```"))
things = response.json()['value']
import folium

map_center = [46.505978, 8.511378]
mymap = folium.Map(location=map_center, zoom_start=16)

if things:
    for thing in things:
        if thing["Locations"]:
            # print(thing["Locations"])
            for location in thing["Locations"]:
                coords = location["location"]["coordinates"]
                lat, lon = (
                    coords[1],
                    coords[0],
                )
                folium.Marker(
                    location=[lat, lon],
                    tooltip=thing["name"],
                ).add_to(mymap)

mymap
Make this Notebook Trusted to load map: File -> Trust Notebook