dojot.module package

Submodules

dojot.module.auth module

Authentication module

class dojot.module.auth.Auth(config)

Bases: object

Class responsible for authentication mechanisms in dojot

__init__(config)

Object initialization

Parameters:config (Config) – The configuration object.
get_access_token(tenant)

Retrieves a token for normal operations associated to a particular tenant.

Return type:str
Returns:The token
get_management_token()

Retrieves a token for management operations

Return type:str
Returns:The token
get_tenants()

Retrieves all tenants

If there is a problem while retrieving the list of tenants, then None is returned.

Return type:list or None
Returns:List of tenants

dojot.module.config module

Configuration data module

class dojot.module.config.Config(config=None)

Bases: object

Main configuration class

This class contains all needed configuration for this library

__init__(config=None)

Config constructor

Parameters:config (dict or None) – A configuration dictionary. If set, all its attributes will be set to this object.

Any top level key will overwrite the default configuration, i.e., setting a kafka object to config param will overwrite all Kafka configuration. An example of such dictionary is:

config = {
    "kafka" : {
        "producer": {
            "client.id": "kafka",
            "bootstrap_servers": ["kafka:9092"],
            "compression.codec": "gzip",
            "retry.backoff.ms": 200,
            "message.send.max.retries": 10,
            "socket.keepalive.enable": True,
            "queue.buffering.max.messages": 100000,
            "queue.buffering.max.ms": 1000,
            "batch.num.messages": 1000000,
            "dr_cb": True
        },
        "consumer": {
            "group_id": "my-module",
            "bootstrap_servers": ["kafka:9092"]
        },
        "dojot": {
            "poll_timeout": 2000,
            "subscription_holdoff": 2.5
        }
    },
    "data_broker" : {
        "url": "http://data-broker",
        "timeout_sleep": 5,
        "connection_retries": 3
    },
    "device_manager": {
        "url": "http://device-manager:5000",
        "timeout_sleep": 5,
        "connection_retries": 3
    },
    "keycloak" = {
        "timeout_sleep": 5,
        "connection_retries": 3,
        "base_path": "http://keycloak:8080/auth/",
        "ignore_realm": "master",
        "credentials": {
            "username": "admin",
            "password": "admin",
            "client_id": "admin-cli",
            "grant_type": "password",
        }
    },
    "dojot" : {
        "management": {
            "user" : "dojot-management",
            "tenant" : "dojot-management"
        },
        "subjects": {
            "tenancy": "dojot.tenancy",
            "devices": "dojot.device-manager.device",
            "device_data": "device-data,
        }
    }
}

Warning

If set, the dojot section should be in sync with all other modules. Otherwise this module won’t work properly.

Note

The Kafka object is straight from librdkafka configuration, separated into producer and consumer subobjects. For more information about this configuration, you should check its documentation.

load_defaults()

Load default configuration, which is:

kafka:
    producer:
        client.id: "kafka"
        bootstrap_servers:
         - "kafka:9092"
        compression.codec: "gzip"
        retry.backoff.ms: 200
        message.send.max.retries: 10
        socket.keepalive.enable: True
        queue.buffering.max.messages: 100000
        queue.buffering.max.ms: 1000
        batch.num.messages: 1000000
        dr_cb: true
    consumer:
        group_id: "my-module"
        bootstrap_servers:
         - "kafka:9092"
    dojot:
        poll_timeout: 2000
        subscription_holdoff: 2.5
data_broker:
    url: "http://data-broker"
    "timeout_sleep": 5
    "connection_retries": 3
device_manager:
    url: "http://device-manager:5000"
    "timeout_sleep": 5
    "connection_retries": 3
keycloak:
    "base_path": "http://keycloak:8080/auth/"
    "timeout_sleep": 5
    "connection_retries": 3
    "ignore_realm": "master",
    "credentials":
        "username": "admin",
        "password": "admin",
        "client_id": "admin-cli",
        "grant_type": "password"
dojot:
    management:
        user: "dojot-management"
        tenant: "dojot-management"
    subjects:
        tenancy: "dojot.tenancy"
        devices: "dojot.device-manager.device"
        device_data: "device-data"

Warning

Calling this function will overwrite any previously set configuration in the created object. Also setting any configuration after Kafka is started or any Messenger object is created will have no effect on them.

Warning

If set, the dojot section should be in sync with all other modules. Otherwise this module won’t work properly.

load_env()

Load configuration from environment variables.

Any environment variable will overwrite the default configuration. Check load_defaults() function.

The list of envirnoment variables is:

  • KAFKA_HOSTS: a comma-separated list of hosts where an instance of Kafka is running. This will affect the bootstrap_servers parameter for both Kafka consumer and producer.
  • KAFKA_GROUP_ID: The Kafka consumer group ID to be used.
  • DOJOT_KAFKA_SUBSCRIPTION_HOLDOFF: Time to wait before performing any subscription.
  • DOJOT_KAFKA_POLL_TIMEOUT: Time to wait for new messages in Kafka.
  • DATA_BROKER_URL: Where DataBroker service can be reached.
  • DEVICE_MANAGER_URL: URL to reach the device-manager service.
  • KEYCLOAK_URL: Where Keycloak service can be reached.
  • KEYCLOAK_USER: Keycloak user (this user must have permission to list realms).
  • KEYCLOAK_PASSWORD: Keycloak user password.
  • KEYCLOAK_CLIENT_ID: Keycloak client id.
  • DOJOT_MANAGEMENT_TENANT: tenant to be used when asking DataBroker for management topics (such as tenancy-related topics)
  • DOJOT_MANAGEMENT_USER: user to be used when asking DataBroker for management topics (such as tenancy-related topics)
  • DOJOT_SUBJECT_TENANCY: Subject to be used when asking DataBroker for tenancy topics.
  • DOJOT_SUBJECT_DEVICES: Subject to be used when asking DataBroker for device topics.
  • DOJOT_SUBJECT_DEVICE_DATA: Subject to be used when asking DataBroker for device data topics.

dojot.module.logger module

Logging module

class dojot.module.logger.Log(log_level=10, log_format='[%(log_color)s%(asctime)-8s%(reset)s] |%(log_color)s%(module)-8s%(reset)s| %(log_color)s%(levelname)s%(reset)s: %(log_color)s%(message)s%(reset)s', is_disabled=False)

Bases: object

Class for logging

color_log()

Returns a logger

dojot.module.messenger module

dojot messenger module

class dojot.module.messenger.Messenger(name, config)

Bases: object

Class responsible for sending and receiving messages through Kafka using dojot subjects and tenants.

Using this class should be as easy as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from dojot.module import Messenger, Config
from dojot.module.logger import Log

LOGGER = Log().color_log()
def rcv_msg(tenant,data):
    LOGGER.critical("rcvd msg from tenant: %s -> %s" % (tenant,data))

config = Config()
messenger = Messenger("Dojot-Snoop", config)
messenger.init()

# Create a channel using a default subject ``device-data``.
messenger.create_channel(config.dojot['subjects']['device_data'], "rw")

# Create a second channel using a particular subject ``device-status``
messenger.create_channel("service-status", "w")

# Register callback to process incoming device data
messenger.on(config.dojot['subjects']['device_data'], "message", rcv_msg)

# Publish a message on ``service-status`` subject using ``dojot-management`` service.
messenger.publish("service-status", config.dojot['dojot-management'], "service X is up")

And that’s all.

You can use an internal event publishing/subscribing mechanism in order to send events to other parts of the code (using messenger.on() and messenger.emit() functions) without actually send or receive any messages to/from Kafka. An example:

messenger.on("extra-subject", "subject-event", lambda tenant, data: print("Message received ({}): {}", (tenant, data)))
messenger.emit("extra-subject", "management-tenant", "subject-event", "message data")
create_channel(subject, mode='r', is_global=False)

Creates a new channel tha is related to tenants, subjects, and kafka topics.

Parameters:
  • subject (str) – The subject associated to this channel.
  • mode (str) – Channel type (“r” for only receiving messages, “w” for only sending messages, “rw” for receiving and sending messages)
  • is_global (bool) – flag indicating whether this channel should be associated to a service or be global.
emit(subject, tenant, event, data)

Executes all callbacks related to that subject:event

Parameters:
  • subject (str) – The subject to be used when emitting this new event
  • tenant (str) – The tenant to be used when emitting this new event
  • event (str) – The event to be emitted. This is a arbitrary string. The module itself will emit only message events (seldomly new-tenant also)
  • data (dict) – The data to be emitted
generate_device_create_event_for_active_devices()

Generates device creation message for existing devices

init()

Initializes the messenger and sets with all tenants

This library uses its own mechanism to discover new tenants and subscribe to topics related to all configured subjects. That way the user can rely only on calling messenger.on() functions and therefore it will receive all messages from that subject related to different tenants.

Raises:UserWarning – When the list of tenants cannot be retrieved.
on(subject, event, callback)

Register new callbacks to be invoked when something happens to a subject The callback should have two parameters: tenant, data

Parameters:
  • subject (str) – The subject which this subscription is associated to.
  • event (str) – The event of this subscription.
  • callback – The callback function. Its signature should be (tenant: str, message:any) : void
process_new_tenant(tenant, msg)

Process new tenant: bootstrap it for all subjects registered and emit an event

Parameters:
  • tenant (str) – The tenant associated to the message (NOT NEW TENANT)
  • msg (dict) – The message just received with the new tenant..
publish(subject, tenant, message)

Publishes a message in kafka

Parameters:
  • subject (str) – The subject to be used when publish the data
  • tenant (str) – The tenant associated to that message
  • messsage – The message to be published.
request_device(tenant)

Get devices from the device-manager module

Parameters:
  • tenant (str) – tenant name for the devices
  • page_num (integer) – the number of the page that will be retrieved
shutdown()

Shutdown this consumer.

This function will indicate to the consumer threads that they must stop. Remember, though, that this might not occur right away.

dojot.module contents

dojot module classes