Handlers#

Some of the core functionality provided by fourinsight.engineroom.utils relies on handlers that facilitate downloading and uploading of text content from/to a source. The source can be a local file, an Azure Storage Blob, or any other suitable storage place. Two handlers, the LocalFileHandler and the AzureBlobHandler, are available out-of-the-box. Custom handlers are easily set up by inheriting from BaseHandler.

The LocalFileHandler is used to store text content in a local file.

from fourinsight.engineroom.utils import LocalFileHandler


handler = LocalFileHandler(<file-path>)

The AzureBlobHandler is used to store text content in Azure Blob Storage.

from fourinsight.engineroom.utils import AzureBlobHandler


# Instantiate from a connection string
handler = AzureBlobHandler(conn_str, container_name, blob_name)

# Instantiate from a container-level SAS URL
handler = AzureBlobHandler.from_container_url(container_url, blob_name)

The handlers behave like ‘streams’, and provide all the normal stream capabilities. Downloading and uploading is done by a push/pull strategy; content is retrieved from the source by a pull() request, and uploaded to the source by a push(). Correspondingly, reading and writing to the handler is done using read() and write().

For reading from handlers:

# Pull stream
handler.pull()

# Read stream content
handler.seek(0)
handler.read()

and writing to handlers:

# Write text content to stream
handler.write("Hello, World!")

# Push stream content
handler.push()

More interestingly, handler can also be used with pandas.read_csv():

# Pull stream w/ CSV content
handler.pull()

# Load stream content as 'pandas.DataFrame'
handler.seek(0)
df = pd.read_csv(handler, index_col=0)

and pandas.DataFrame.to_csv():

df = pd.DataFrame({"Hello": [1, 2], "World!": [3, 4]})

# Write 'pandas.DataFrame' to stream
df.to_csv(handler)

# Push stream content
handler.push()

Important

Remember to perform seek(0) to go to the beginning of the stream before reading.

Custom handlers#

The custom handler must inherit from BaseHandler, and override the two abstract methods, _push() and _pull(). It is recommended to also set the class variable, _SOURCE_NOT_FOUND_ERROR, to the type of exception that is expected to be raised if the source file can not be read. The example below shows how you can set up a custom handler based on FTP.

from io import BytesIO
from ftplib import FTP, error_perm
from fourinsight.engineroom.utils.core import BaseHandler


class FTPHandler(BaseHandler):
    """
    Handler for push/pull text content to/from an FTP server file.

    Parameters
    ----------
    host : str
        FTP host.
    user: str
        FTP user.
    passwd : str
        FTP password.
    folder : str
        Folder where the file should be stored.
    filename : str
        Filename.
    """
    _SOURCE_NOT_FOUND_ERROR = error_perm

    def __init__(self, host, user, passwd, folder, filename):
        self._folder = folder
        self._filename = filename
        self._ftp = FTP(host=host, user=user, passwd=passwd)
        self._cwd(self._folder)
        super().__init__()

    def _cwd(self, folder):
        """
        Change current working directory, and make it if it does not exist.
        """
        try:
            self._ftp.cwd(folder)
        except error_perm:
            self._ftp.mkd(folder)
            self._ftp.cwd(folder)

    def _pull(self):
        """
        Pull text content from FTP server, and write the string to stream.

        Returns
        -------
        int
            Number of characters written to stream (which is always equal to the
            length of the string).
        """
        with BytesIO() as binary_content:
            self._ftp.retrbinary("RETR " + self._filename, binary_content.write)
            characters_written = self.write(binary_content.getvalue().decode(self.encoding))

        return characters_written

    def _push(self):
        """
        Push the stream content to source.
        """
        self.seek(0)
        self._ftp.storbinary("STOR " + self._filename, self.buffer)