ROAM Developer Guide

This is the developer guide for the risk, opportunity, analysis and monitoring (ROAM) tool.

ROAM and EFPF

Relation between ROAM and EFPF

The ROAM Tool has an internal MQTT client that can publish and subscribe. Whenever a user supplies data to the EFPF message bus, they can also configure the ROAM Tool to listen to that data. Depending on the type of workflow, the user may receive a notification. Regardless, the workflow will always publish the output to the message bus as well. As soon as they are available, the ROAM Tool will provide compatibility with the secure data store and the pub/sub security tool. Our output will be saved to the SDSS, and, if the user gives access to the tool, it will be able to retrieve their historic data. The tool will use the pub/sub security tool to verify whether the user has access to the topics and vhost they are configuring for their workflows. Lastly, the ROAM Tool uses EFS to authenticate users.

ROAM and Apache Beam

The ROAM tool makes extensive use of Apache Beam, which is self-described as “an advanced unified programming model”, which can be use to “implement batch and streaming data processing jobs that run on any execution engine”. It provides SDKs in multiple languages to allow users to make data processing pipelines, which take the input collection of data (aka PCollection), and run it through one or more data transformations (aka PTransforms). Some PTransforms can be supplied user code in the form of a DoFn, which specifies how to change a single element of the collection. The final PCollection after all PTransforms in the pipeline have been executed is the output of running the pipeline. For a more detailed description of Apache Beam objects, please consult the Apache Beam programming guide.

Beam-ROAM correspondence

The ROAM tool facilitates the creation and execution of EFPF-specific pipelines and transforms, and running them with data from MQTT topics (e.g. from the EFPF dataspine). Specifically, pipelines are managed via Workflow objects, which are composed of one or more Recipe objects (which are EFPF-specific implementations of DoFn’s) and both input and output MQTT topics. When data is published to the input MQTT topic, the corresponding pipeline is run and the transformed data is published to the output MQTT topic.

Recipe development

Recipes are implementations of Beam’s DoFn class. In addition to the basic DoFn functionality, they define:

  • A recipe name, which must be specified via the class constructor
  • previous_transforms (optional, default value = []), which is a list of other recipes which should be executed before this recipe is executed
  • ma_schema, which is a marshmallow schema used for class (de)serialization. By default, RecipeSchema is used.

The example recipes below are available in backend/flaskr/domain/recipe/examples/basic_math.py.

A simple recipe

The following is a minimal example of how to write a recipe. It assumes that the input collection is a list of lists of numbers (for example, [[7, 8], [1, 2, 3]]).For every item in the input collection, the recipe returns the sum of the items in the list of numbers. So for the example input collection, the output would be [15, 6].

from flaskr.domain.recipe.base import Recipe


class SumOfAllInListRecipe(Recipe):
    """Recipe which returns the sum of all numbers in the input list"""

    def __init__(self):
        super(SumOfAllInListRecipe, self).__init__(self.__module__)

    def process(self, element):
        """Returns the sum of all numbers in the input list as single number within a list.

        Args:
            element (list): A list of numbers

        Returns:
            list: A single-element list which contains the sum of the numbers in the input list
        """
        return [sum(element)]

Things to note in this example:

  • The new recipe inherits from the Recipe base class.
  • The constructor function (e.g.__init__) explicitly defines the recipe name to be self.__module__.
  • The process function defines how to transform a single element of the input collection into the desired output.
  • No ma_schema is explicitly defined, so the default RecipeSchema will be used.

The following can be run in the python interpreter to see how this class works:

In [1]: import apache_beam as beam

In [2]: from flaskr.domain.recipe.examples import basic_math

In [3]: [[7,8], [1,2,3]] | beam.ParDo(basic_math.SumOfAllInListRecipe())
Out[3]: [15, 6]

A recipe with a parameter

Recipes can define additional parameters. For example, this recipe multiplies an input element by a constant defined in the constructor. So, if we define our constant to be 2.2 then [15, 6] would become [33.0, 13.2].

class MultiplyByConstantRecipeSchema(RecipeBaseSchema):
    constant = fields.Number()


class MultiplyByConstantRecipe(Recipe):
    """Recipe which returns the input number multiplied by a pre-defined constant"""

    def __init__(self, constant):
        self.constant = constant
        super(MultiplyByConstantRecipe, self).__init__(self.__module__)

    def process(self, element):
        """Returns the sum of all numbers in the input list as single number within a list.

        Args:
            element (list): A list of numbers

        Returns:
            list: A single-element list which contains the sum of the numbers in the input list
        """
        return [element * self.constant]

Things to note in this example:

  • A new schema type, MultiplyByConstantRecipeSchema was defined. It specifies that the constructor requires a field named constant which should be of type marshmallow.fields.Number.
  • The ma_schema of MultiplyByConstantRecipe is set to

The following can be run in the python interpreter to see how this class works:

In [1]: import apache_beam as beam

In [2]: from flaskr.domain.recipe.examples import basic_math

In [3]: [15, 6] | beam.ParDo(basic_math.MultiplyByConstantRecipe(2.2))
Out[3]: [33.0, 13.200000000000001]

We can also combine two recipes like so:

In [4]: [[7,8], [1,2,3]] | beam.ParDo(basic_math.SumOfAllInListRecipe()) | beam.ParDo(basic_math.MultiplyByConstantRecipe(2.2))
Out[4]: [33.0, 13.200000000000001]

Serialization and deserialization:

Serialization and deserialization is used by the ROAM tool in the REST API and database. The recipe’s ma_schema can be used to serialize the object:

In [5]: from flaskr.domain.recipe import RecipeSchemaRegistry

In [6]: RecipeSchemaRegistry.load_recipe_subclasses(ignore_existing=True)

In [7]: RecipeSchemaRegistry().dump(basic_math.MultiplyByConstantRecipe(2.2))
Out[7]: {'constant': 2.2, 'type': 'MultiplyByConstantRecipe'}

And to deserialize the dictionary into a recipe object

In [19]: obj = RecipeSchemaRegistry().load({'constant': 2.2, 'type': 'MultiplyByConstantRecipe'})

In [20]: obj
Out[20]: <flaskr.domain.recipe.examples.basic_math.MultiplyByConstantRecipe at 0x7fe9a8ca1370>

In [21]: obj.constant
Out[21]: 2.2

How to submit recipes

In order to contribute recipes to ROAM, they must first be written in Python (as described above), and then submit to the ROAM repository. Recipes belong in an appropriate subdirectory of backend/flaskr/domain/recipe. Currently the service must be restarted for new recipes to be visible. Please contact the tool maintainers for assistance.

Contributing to main code-base

If you would like to contribute to the core functionality of the tool, then please contact the tool maintainers to gain write access to the repository and to request further assistance.

Publishing docker images to the repository

This is how one can publish a new docker image to the repository:

  1. Login to the docker repository:

    docker login https://registry.fit.fraunhofer.de

  2. Use the following script to build, tag and push docker images for both the frontend and backend (assumes one is running Linux, if not, look in file and find equivalent commands for your operating system):

    ./docker_build_tag_and_push.sh

Secrets encryption

Secrets (such as passwords) should never be stored unecrypted in the repository. The repository is currently setup to use OpenSSL encryption on files found in backend/config/encrypted/* and frontend/config/encrypted/*. In order to edit/add the encrypted configuration files, developers should obtain the .gitencrypt directory from the repository maintainers and store it in their home directory.

You can test whether decryption is working correctly by verifying that the contents of backend/config/encrypted/test.py locally are "If you can read this, it isn't encrypted." (in the remote repository, it will be encrypted, and thus an unreadable string).

Backend configuration

The backend can be configured by providing an environment file using the ENV_FILE environment variable. The variables below must be set:

SECRET_KEY=in_production_use_something_more_secure
MONGO_URI=mongodb://<mongodbhost>:27017/<flaskrDB>
MQTT_BROKER=broker.yourbrokersdomain.com
MQTT_PORT=broker_port
MQTT_USERNAME_{vhost}=yourusername
MQTT_PW=yourpassword
MQTT_CLIENT_SUFFIX=your_suffix_for_the_mqtt_client_id
USE_TLS=boolean_that_determines_whether_to_use_tls_certificate
SMTP_HOST=smtp_host_to_use_as_default_settings_for_email_nofitications
SMTP_USERNAME=smtp_username_to_use_as_default_settings_for_email_nofitications
SMTP_PASSWORD=smtp_password_to_use_as_default_settings_for_email_nofitications
SMTP_PORT=smtp_port_to_use_as_default_settings_for_email_nofitications
MONGO_USERNAME=yourusername
MONGO_PASSWORD=yourpassword
OIDC_PUBLIC_KEY=public_key_for_keycloak_realm
KC_AUTHORITY=keycloak_authority_url
KC_CLIENT_ID=keycloak_client_id
KC_REDIRECT=keycloak_redirect_url
KC_SCOPE=keycloak_scope
BACKEND_API_URL=backend_api_url

Assuming you are running out of the project root (roam), and the above configuration was found in the file ./local.env then configuration could be used while starting the application while loading the application using ENV_FILE=./local.env flask run.

Alternatively, you can store ENV_FILE=./local.env in ./.env and run the application using flask run.

Note that if you try to run the application from the backend directory or one of its subdirectories then the root directory will be used as the current working directory.

Previous
Next