Handlers#
Some of the core functionality provided by fourinsight.engineroom.utils relies
on handlers that facilitate downloading and uploading of text content from/to a source.
The source can be a local file, an Azure Storage Blob, or any other suitable storage
place. Two handlers, the LocalFileHandler and the AzureBlobHandler,
are available out-of-the-box. Custom handlers are easily set up by inheriting from
BaseHandler.
The LocalFileHandler is used to store text content in a local file.
from fourinsight.engineroom.utils import LocalFileHandler
handler = LocalFileHandler(<file-path>)
The AzureBlobHandler is used to store text content in Azure Blob Storage.
from fourinsight.engineroom.utils import AzureBlobHandler
# Instantiate from a connection string
handler = AzureBlobHandler(conn_str, container_name, blob_name)
# Instantiate from a container-level SAS URL
handler = AzureBlobHandler.from_container_url(container_url, blob_name)
The handlers behave like ‘streams’, and provide all the normal stream capabilities. Downloading and uploading is done by a push/pull
strategy; content is retrieved from the source by a pull() request, and uploaded
to the source by a push(). Correspondingly, reading and writing to the handler is
done using read() and write().
For reading from handlers:
# Pull stream
handler.pull()
# Read stream content
handler.seek(0)
handler.read()
and writing to handlers:
# Write text content to stream
handler.write("Hello, World!")
# Push stream content
handler.push()
More interestingly, handler can also be used with pandas.read_csv():
# Pull stream w/ CSV content
handler.pull()
# Load stream content as 'pandas.DataFrame'
handler.seek(0)
df = pd.read_csv(handler, index_col=0)
and pandas.DataFrame.to_csv():
df = pd.DataFrame({"Hello": [1, 2], "World!": [3, 4]})
# Write 'pandas.DataFrame' to stream
df.to_csv(handler)
# Push stream content
handler.push()
Important
Remember to perform seek(0) to go to the beginning of the stream before reading.
Custom handlers#
The custom handler must inherit from BaseHandler, and override
the two abstract methods, _push() and _pull(). It is recommended to also
set the class variable, _SOURCE_NOT_FOUND_ERROR, to the type of exception that
is expected to be raised if the source file can not be read. The example below shows how you can set up a custom handler based on FTP.
from io import BytesIO
from ftplib import FTP, error_perm
from fourinsight.engineroom.utils.core import BaseHandler
class FTPHandler(BaseHandler):
"""
Handler for push/pull text content to/from an FTP server file.
Parameters
----------
host : str
FTP host.
user: str
FTP user.
passwd : str
FTP password.
folder : str
Folder where the file should be stored.
filename : str
Filename.
"""
_SOURCE_NOT_FOUND_ERROR = error_perm
def __init__(self, host, user, passwd, folder, filename):
self._folder = folder
self._filename = filename
self._ftp = FTP(host=host, user=user, passwd=passwd)
self._cwd(self._folder)
super().__init__()
def _cwd(self, folder):
"""
Change current working directory, and make it if it does not exist.
"""
try:
self._ftp.cwd(folder)
except error_perm:
self._ftp.mkd(folder)
self._ftp.cwd(folder)
def _pull(self):
"""
Pull text content from FTP server, and write the string to stream.
Returns
-------
int
Number of characters written to stream (which is always equal to the
length of the string).
"""
with BytesIO() as binary_content:
self._ftp.retrbinary("RETR " + self._filename, binary_content.write)
characters_written = self.write(binary_content.getvalue().decode(self.encoding))
return characters_written
def _push(self):
"""
Push the stream content to source.
"""
self.seek(0)
self._ftp.storbinary("STOR " + self._filename, self.buffer)