Example#

Here is an example of run.py that use some of the utilities in fourinsight.engineroom.utils.

In this example, two timeseries ‘A’ and ‘B’ are downloaded from the DataReservoir.io. Another variable, ‘C’, is then calculated from ‘A’ and ‘B’. Then, the 1-hour standard deviation of each variable is calculated, collected, and stored in Azure Blob Storage.

A DrioDataSource object is used to download the data group, ‘A’ and ‘B’, from the DataReservoir.io. The data is downloaded in 1-hour chunks using the iter() method and the date_range() convenience function. Caching is enabled by providing a cache folder (and an appropriate cache_size if you don’t want the default 24-hours caching).

PersistentDict is used to keep track of the state parameter, ‘TimeOfLastIndex’. This state parameter tells the script which results it has already collected, so that the script can continue where it left off last time it ran. The state is stored persistently in a local file using the LocalFileHandler.

The ResultCollector provides collecting and storing of results. Results are stored in Azure Blob Storage using the AzureBlobHandler.

import os

import numpy as np
import pandas as pd

import datareservoirio as drio
from datareservoirio.authenticate import ClientAuthenticator
from fourinsight.engineroom.utils import (
    DrioDataSource,
    PersistentDict,
    ResultCollector,
    LocalFileHandler,
    AzureBlobHandler,
    iter_index,
)


auth = ClientAuthenticator(os.environ["APP_CLIENT_ID"], os.environ["APP_CLIENT_SECRET"])
drio_client = drio.Client(auth)

# Initialize a data source object with labels 'A' and 'B'
data_labels = {
    "A": "8b1683bb-32a9-4e64-b122-6a0534eff592",
    "B": "4bf4606b-b18e-408d-9d4d-3f1465ed23f2"
}
source = DrioDataSource(drio_client, data_labels, cache='.cache')

# Get the application state
state_handler = LocalFileHandler("state.json")
state = PersistentDict(state_handler)
state.pull(raise_on_missing=False)

# Initialize a ResultCollector
results_handler = AzureBlobHandler(
    os.environ["APP_CONNECTION_STRING"],
    "example_container",
    "example_blob_folder/results.csv"
)
result_headers = {"a_std": float, "b_std": float, "c_std": float}
results = ResultCollector(result_headers, handler=result_handler, indexing_mode="timestamp")
results.pull()   # 'pull' already collected results from source

 # Start from '2021-09-28 00:00' and end 'now'
 # If the app has already run previously, start from last collected index
start = state.get("TimeOfLastIndex", default="2021-09-28 00:00")
start = pd.to_datetime(start, utc=True)
end = pd.to_datetime("now", utc=True)

# Iterate over the data in 1-hour chunks
for index_i, data_i in source.iter(*iter_index.date_range(start, end, freq="1h")):
    results.new_row(index_i)

    series_a = data_i["A"]
    series_b = data_i["B"]

    # Collect the standard deviation of timeseries A and B
    results.collect(a_std=np.std(series_a), b_std=np.std(series_b))

    # Do some calculations with your timeseries data
    series_a = series_a + np.random.random(size=len(series_a))
    series_b = series_b - 1.0
    series_c = (series_a + series_b) / 2.0

    # Collect the standard deviation of the calculated variable C
    results.collect(c_std=np.std(series_c))

# Store the results
results.push()

# Update the application state wih the latest collected index
state["TimeOfLastIndex"] = results.dataframe.index[-1].isoformat()
state.push()