Communication Interfaces

There are some examples of communication interfaces implemented in Encodapy. See examples/03_interfaces for more details.

The following information is required for the configuration of each interface.

Configuration of the FIWARE Interface

The FIWARE Interface allows communication with FIWARE-based systems using the NGSI standard. For details about the Usage of FIWARE see N5GEH or the FIWARE openapi documentation. Until now, only NGSI v2 is supported.

You need to configure some general settings via environment variables. The following information is required for the FIWARE interface configuration:

pydantic settings encodapy.config.env_values.FiwareEnvVariables[source]

Environment variables for FIWARE communication. They are automatically loaded from the environment or a .env file.

Environment variables always have the prefix FIWARE_.

Show JSON schema
{
   "title": "FiwareEnvVariables",
   "description": "Environment variables for FIWARE communication.\nThey are automatically loaded from the environment or a .env file.\n\nEnvironment variables always have the prefix `FIWARE_`.",
   "type": "object",
   "properties": {
      "auth": {
         "default": false,
         "description": "Enables authentication for FIWARE requests",
         "title": "Auth",
         "type": "boolean"
      },
      "client_id": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Client ID for FIWARE authentication",
         "title": "Client Id"
      },
      "client_pw": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Client password for FIWARE authentication",
         "title": "Client Pw"
      },
      "token_url": {
         "anyOf": [
            {
               "format": "uri",
               "minLength": 1,
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Token URL for FIWARE authentication",
         "title": "Token Url"
      },
      "bearer_token": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Bearer token for FIWARE authentication",
         "title": "Bearer Token"
      },
      "cb_url": {
         "anyOf": [
            {
               "format": "uri",
               "minLength": 1,
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": "http://localhost:1026/",
         "description": "URL of the Context Broker (e.g., Orion-LD)",
         "title": "Cb Url"
      },
      "service": {
         "description": "FIWARE Service header for tenant isolation",
         "title": "Service",
         "type": "string"
      },
      "service_path": {
         "default": "/",
         "description": "FIWARE Service Path for sub-tenant isolation",
         "title": "Service Path",
         "type": "string"
      },
      "crate_db_url": {
         "default": "http://localhost:4200/",
         "description": "URL of the CrateDB instance",
         "format": "uri",
         "minLength": 1,
         "title": "Crate Db Url",
         "type": "string"
      },
      "crate_db_user": {
         "default": "crate",
         "description": "Username for CrateDB",
         "title": "Crate Db User",
         "type": "string"
      },
      "crate_db_pw": {
         "default": "",
         "description": "Password for CrateDB (empty = no authentication)",
         "title": "Crate Db Pw",
         "type": "string"
      },
      "crate_db_ssl": {
         "default": false,
         "description": "Enables SSL for the connection to CrateDB",
         "title": "Crate Db Ssl",
         "type": "boolean"
      }
   },
   "required": [
      "service"
   ]
}

Config:
  • extra: str = ignore

  • env_prefix: str = FIWARE_

  • env_file: str = .env

field auth: bool = False

Enables authentication for FIWARE requests

field client_id: Optional[str] = None

Client ID for FIWARE authentication

field client_pw: Optional[str] = None

Client password for FIWARE authentication

field token_url: Optional[Annotated[Url]] = None

Token URL for FIWARE authentication

field bearer_token: Optional[str] = None

Bearer token for FIWARE authentication

field cb_url: Optional[Annotated[Url]] = Url('http://localhost:1026/')

URL of the Context Broker (e.g., Orion-LD)

field service: str [Required]

FIWARE Service header for tenant isolation

field service_path: str = '/'

FIWARE Service Path for sub-tenant isolation

field crate_db_url: Annotated[Url] = Url('http://localhost:4200/')

URL of the CrateDB instance

Constraints:
  • allowed_schemes = [‘http’, ‘https’]

field crate_db_user: str = 'crate'

Username for CrateDB

field crate_db_pw: str = ''

Password for CrateDB (empty = no authentication)

field crate_db_ssl: bool = False

Enables SSL for the connection to CrateDB

Configuration of the MQTT Interface

The MQTT Interface allows communication with MQTT brokers for publishing and subscribing to topics. You need to configure some general settings via environment variables. The following information is required for the MQTT interface configuration:

pydantic settings encodapy.config.env_values.MQTTEnvVariables[source]

MQTT environment variables for the service. They are automatically loaded from the environment or a .env file.

Environment variables always have the prefix MQTT_.

Show JSON schema
{
   "title": "MQTTEnvVariables",
   "description": "MQTT environment variables for the service.\nThey are automatically loaded from the environment or a .env file.\n\nEnvironment variables always have the prefix `MQTT_`.",
   "type": "object",
   "properties": {
      "host": {
         "default": "localhost",
         "description": "Hostname or IP address of the MQTT broker",
         "title": "Host",
         "type": "string"
      },
      "port": {
         "default": 1883,
         "description": "Port number of the MQTT broker",
         "title": "Port",
         "type": "integer"
      },
      "username": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": "",
         "description": "Username for MQTT broker authentication",
         "title": "Username"
      },
      "password": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": "",
         "description": "Password for MQTT broker authentication",
         "title": "Password"
      },
      "topic_prefix": {
         "default": "",
         "description": "Prefix for all MQTT topics used by the service",
         "title": "Topic Prefix",
         "type": "string"
      },
      "timestamp_key": {
         "default": "TimeInstant",
         "description": "Key name in the MQTT message payload that contains the timestamp. Examples: 'TimeInstant' (default), 'timestamp', 'time'. If the key is not present in the payload, the timestamp of the MQTT message receipt will be used.",
         "title": "Timestamp Key",
         "type": "string"
      },
      "tls_enabled": {
         "default": false,
         "description": "Enable TLS/SSL encryption for MQTT connection",
         "title": "Tls Enabled",
         "type": "boolean"
      },
      "tls_ca_cert": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Path to the CA certificate file for TLS/SSL verification. Optional - if not provided, system default certificates are used. Only needed for self-signed or custom CA certificates.",
         "title": "Tls Ca Cert"
      },
      "tls_insecure": {
         "default": false,
         "description": "Set to True to disable certificate verification (insecure). Only use for testing purposes.",
         "title": "Tls Insecure",
         "type": "boolean"
      }
   }
}

Config:
  • extra: str = ignore

  • env_prefix: str = MQTT_

  • env_file: str = .env

field host: str = 'localhost'

Hostname or IP address of the MQTT broker

field port: int = 1883

Port number of the MQTT broker

field username: Optional[str] = ''

Username for MQTT broker authentication

field password: Optional[str] = ''

Password for MQTT broker authentication

field topic_prefix: str = ''

Prefix for all MQTT topics used by the service

field timestamp_key: str = 'TimeInstant'

Key name in the MQTT message payload that contains the timestamp. Examples: ‘TimeInstant’ (default), ‘timestamp’, ‘time’. If the key is not present in the payload, the timestamp of the MQTT message receipt will be used.

field tls_enabled: bool = False

Enable TLS/SSL encryption for MQTT connection

field tls_ca_cert: Optional[str] = None

Path to the CA certificate file for TLS/SSL verification. Optional - if not provided, system default certificates are used. Only needed for self-signed or custom CA certificates.

field tls_insecure: bool = False

Set to True to disable certificate verification (insecure). Only use for testing purposes.

You could use different topics and payload templates for publishing and subscribing to MQTT messages. See the example for the mqtt interface for more details.

For a custom template, the following model is used:

pydantic model encodapy.config.mqtt_messages_template.MQTTTemplateConfig[source]

Model for MQTT template configuration.

Loads and validates MQTT topic and payload templates from environment variables / file as dictionary. Templates support the following placeholders:

  • __OUTPUT_ENTITY__: The entity ID of the output.

  • __OUTPUT_ATTRIBUTE__: The attribute ID of the output.

  • __OUTPUT_VALUE__: The value of the output.

  • __OUTPUT_UNIT__: The unit of the output.

  • __OUTPUT_TIME__: The timestamp of the output.

  • __MQTT_TOPIC_PREFIX__: The MQTT topic prefix from environment variables.

The dictionary to build the templates can be provided in three ways:

  1. As a dictionary directly to the model.

  2. Via an environment variable MQTT_TEMPLATE_<NAME> as a string.

  3. Via a file path stored in an environment variable MQTT_TEMPLATE_<NAME>.

The evironment variable MQTT_TEMPLATE_<NAME> (or variables for multiple templates) must be set, where <NAME> is the name of the template to load. It isn’t loaded automatically from a .env file. The dictionary must contain the keys topic and payload, and could look like this:

{
    "topic": "sensors/__OUTPUT_ENTITY__/__OUTPUT_ATTRIBUTE__",
    "payload": {
        "value": "__OUTPUT_VALUE__",
        "info_dict": {
            "output_entity": "__OUTPUT_ENTITY__",
            "output_attribute": "__OUTPUT_ATTRIBUTE__",
            "output_time": "__OUTPUT_TIME__"
        }
    }
    "time_format": "%Y-%m-%dT%H:%M%z"
}

The key time_format is optional, as default ‘%Y-%m-%dT%H:%M:%S%z’ is used.

Please see the examples for this.

Parameters:
  • topic (Template) – The Jinja2 template for the MQTT topic.

  • payload (Template) – The Jinja2 template for the MQTT payload.

  • time_format (str) – The format string for the timestamp in the payload. Refer to Python’s datetime.strptime documentation for more details.

Raises:

ValueError – If the input is invalid or templates cannot be loaded.

field topic: Template [Required]
field payload: Template [Required]
field time_format: str = '%Y-%m-%dT%H:%M:%S%z'
classmethod load_mqtt_template(template_raw: dict, part: str) Template[source]

Process a raw template string or dictionary into a jinja2.Template.

Parameters:
  • template_raw – Raw template data (str or dict).

  • part – Either “topic” or “payload”.

Returns:

The rendered Jinja2 template.

Return type:

Template

Raises:

ValueError – If the template format is invalid.

Parameters:

Configuration of the File Interface

The File Interface allows reading from and writing to data files. You need also to configure some general settings via environment variables. The following information is required for the file interface configuration:

pydantic settings encodapy.config.env_values.FileEnvVariables[source]

File environment variables for the service. They are automatically loaded from the environment or a .env file.

Environment variables always have the prefix FILE_.

Show JSON schema
{
   "title": "FileEnvVariables",
   "description": "File environment variables for the service.\nThey are automatically loaded from the environment or a .env file.\n\nEnvironment variables always have the prefix `FILE_`.",
   "type": "object",
   "properties": {
      "storage_method": {
         "allOf": [
            {
               "$ref": "#/$defs/FileStorageMethod"
            }
         ],
         "default": "append",
         "description": "Type of file storage: 'overwrite', 'append', or 'new_file',\n        see FileStorageMethod enum for details"
      },
      "path_of_input_file": {
         "default": "./input/input_file.csv",
         "description": "Path to the input CSV file",
         "title": "Path Of Input File",
         "type": "string"
      },
      "path_of_static_data": {
         "default": "./input/static_data.json",
         "description": "Path to the static data JSON file",
         "title": "Path Of Static Data",
         "type": "string"
      },
      "path_of_results": {
         "default": "./results",
         "description": "Directory path to store the results",
         "title": "Path Of Results",
         "type": "string"
      }
   },
   "$defs": {
      "FileStorageMethod": {
         "description": "Enum for the file storage method.",
         "enum": [
            "overwrite",
            "append",
            "new_file"
         ],
         "title": "FileStorageMethod",
         "type": "string"
      }
   }
}

Config:
  • extra: str = ignore

  • env_prefix: str = FILE_

  • env_file: str = .env

field storage_method: FileStorageMethod = FileStorageMethod.APPEND

Type of file storage: ‘overwrite’, ‘append’, or ‘new_file’, see FileStorageMethod enum for details

field path_of_input_file: str = './input/input_file.csv'

Path to the input CSV file

field path_of_static_data: str = './input/static_data.json'

Path to the static data JSON file

field path_of_results: str = './results'

Directory path to store the results

For the specific configuration / structure of used data files, the following models are used:

pydantic model encodapy.config.models.DataFile[source]

Model for static data files.

Variables:

data (list[DataFileEntity]) – The data entities.

Show JSON schema
{
   "title": "DataFile",
   "description": "Model for static data files.\n\nAttributes:\n    data (list[DataFileEntity]): The data entities.",
   "type": "object",
   "properties": {
      "data": {
         "items": {
            "$ref": "#/$defs/DataFileEntity"
         },
         "title": "Data",
         "type": "array"
      }
   },
   "$defs": {
      "DataFileAttribute": {
         "description": "Model for data file attributes.\n\nAttributes:\n    id (str): The unique identifier for the attribute.\n    value (Union[str, float, int, bool, Dict, List, DataFrame, None]):             The value of the attribute.\n    unit (Optional[DataUnits]): The unit of the attribute.\n    time (Optional[str]): The timestamp of the attribute as a string.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "value": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "number"
                  },
                  {
                     "type": "integer"
                  },
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "object"
                  },
                  {
                     "items": {},
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Value"
            },
            "unit": {
               "anyOf": [
                  {
                     "$ref": "#/$defs/DataUnits"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null
            },
            "time": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Time"
            }
         },
         "required": [
            "id",
            "value"
         ],
         "title": "DataFileAttribute",
         "type": "object"
      },
      "DataFileEntity": {
         "description": "Model for data file entities.\n\nAttributes:\n    id (str): The unique identifier for the entity.\n    attributes (list[DataFileAttribute]): The attributes of the entity.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "attributes": {
               "items": {
                  "$ref": "#/$defs/DataFileAttribute"
               },
               "title": "Attributes",
               "type": "array"
            }
         },
         "required": [
            "id",
            "attributes"
         ],
         "title": "DataFileEntity",
         "type": "object"
      },
      "DataUnits": {
         "description": "Possible units for the data\nUnits which are defined by Unit Code (https://unece.org/trade/cefact/UNLOCODE-Download\nor https://github.com/RWTH-EBC/FiLiP/blob/master/filip/data/unece-units/units_of_measure.csv)\nor here: https://unece.org/fileadmin/DAM/cefact/recommendations/rec20/rec20_rev3_Annex3e.pdf\nTODO:\n    - Is there a better way to handle the units?\n    - Add more units?",
         "enum": [
            "SEC",
            "HUR",
            "MIN",
            "CEL",
            "KEL",
            "LTR",
            "MTQ",
            "MQH",
            "MQS",
            "E32",
            "L2",
            "WTT",
            "WHR",
            "KWH",
            "CMT",
            "MTR",
            "MTK",
            "MTS",
            "P1",
            "OHM",
            "VLT"
         ],
         "title": "DataUnits",
         "type": "string"
      }
   },
   "required": [
      "data"
   ]
}

field data: list[DataFileEntity] [Required]
pydantic model encodapy.config.models.DataFileEntity[source]

Model for data file entities.

Variables:
  • id (str) – The unique identifier for the entity.

  • attributes (list[DataFileAttribute]) – The attributes of the entity.

Show JSON schema
{
   "title": "DataFileEntity",
   "description": "Model for data file entities.\n\nAttributes:\n    id (str): The unique identifier for the entity.\n    attributes (list[DataFileAttribute]): The attributes of the entity.",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "type": "string"
      },
      "attributes": {
         "items": {
            "$ref": "#/$defs/DataFileAttribute"
         },
         "title": "Attributes",
         "type": "array"
      }
   },
   "$defs": {
      "DataFileAttribute": {
         "description": "Model for data file attributes.\n\nAttributes:\n    id (str): The unique identifier for the attribute.\n    value (Union[str, float, int, bool, Dict, List, DataFrame, None]):             The value of the attribute.\n    unit (Optional[DataUnits]): The unit of the attribute.\n    time (Optional[str]): The timestamp of the attribute as a string.",
         "properties": {
            "id": {
               "title": "Id",
               "type": "string"
            },
            "value": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "number"
                  },
                  {
                     "type": "integer"
                  },
                  {
                     "type": "boolean"
                  },
                  {
                     "type": "object"
                  },
                  {
                     "items": {},
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "title": "Value"
            },
            "unit": {
               "anyOf": [
                  {
                     "$ref": "#/$defs/DataUnits"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null
            },
            "time": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "title": "Time"
            }
         },
         "required": [
            "id",
            "value"
         ],
         "title": "DataFileAttribute",
         "type": "object"
      },
      "DataUnits": {
         "description": "Possible units for the data\nUnits which are defined by Unit Code (https://unece.org/trade/cefact/UNLOCODE-Download\nor https://github.com/RWTH-EBC/FiLiP/blob/master/filip/data/unece-units/units_of_measure.csv)\nor here: https://unece.org/fileadmin/DAM/cefact/recommendations/rec20/rec20_rev3_Annex3e.pdf\nTODO:\n    - Is there a better way to handle the units?\n    - Add more units?",
         "enum": [
            "SEC",
            "HUR",
            "MIN",
            "CEL",
            "KEL",
            "LTR",
            "MTQ",
            "MQH",
            "MQS",
            "E32",
            "L2",
            "WTT",
            "WHR",
            "KWH",
            "CMT",
            "MTR",
            "MTK",
            "MTS",
            "P1",
            "OHM",
            "VLT"
         ],
         "title": "DataUnits",
         "type": "string"
      }
   },
   "required": [
      "id",
      "attributes"
   ]
}

field id: str [Required]
field attributes: list[DataFileAttribute] [Required]
pydantic model encodapy.config.models.DataFileAttribute[source]

Model for data file attributes.

Variables:
  • id (str) – The unique identifier for the attribute.

  • value (Union[str, float, int, bool, Dict, List, DataFrame, None]) – The value of the attribute.

  • unit (Optional[DataUnits]) – The unit of the attribute.

  • time (Optional[str]) – The timestamp of the attribute as a string.

Show JSON schema
{
   "title": "DataFileAttribute",
   "description": "Model for data file attributes.\n\nAttributes:\n    id (str): The unique identifier for the attribute.\n    value (Union[str, float, int, bool, Dict, List, DataFrame, None]):             The value of the attribute.\n    unit (Optional[DataUnits]): The unit of the attribute.\n    time (Optional[str]): The timestamp of the attribute as a string.",
   "type": "object",
   "properties": {
      "id": {
         "title": "Id",
         "type": "string"
      },
      "value": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "number"
            },
            {
               "type": "integer"
            },
            {
               "type": "boolean"
            },
            {
               "type": "object"
            },
            {
               "items": {},
               "type": "array"
            },
            {
               "type": "null"
            }
         ],
         "title": "Value"
      },
      "unit": {
         "anyOf": [
            {
               "$ref": "#/$defs/DataUnits"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "time": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Time"
      }
   },
   "$defs": {
      "DataUnits": {
         "description": "Possible units for the data\nUnits which are defined by Unit Code (https://unece.org/trade/cefact/UNLOCODE-Download\nor https://github.com/RWTH-EBC/FiLiP/blob/master/filip/data/unece-units/units_of_measure.csv)\nor here: https://unece.org/fileadmin/DAM/cefact/recommendations/rec20/rec20_rev3_Annex3e.pdf\nTODO:\n    - Is there a better way to handle the units?\n    - Add more units?",
         "enum": [
            "SEC",
            "HUR",
            "MIN",
            "CEL",
            "KEL",
            "LTR",
            "MTQ",
            "MQH",
            "MQS",
            "E32",
            "L2",
            "WTT",
            "WHR",
            "KWH",
            "CMT",
            "MTR",
            "MTK",
            "MTS",
            "P1",
            "OHM",
            "VLT"
         ],
         "title": "DataUnits",
         "type": "string"
      }
   },
   "required": [
      "id",
      "value"
   ]
}

field id: str [Required]
field value: Optional[Union[str, float, int, bool, Dict, List, DataFrame]] [Required]
field unit: Optional[DataUnits] = None
field time: Optional[str] = None