dojot.module package¶
Subpackages¶
Submodules¶
dojot.module.auth module¶
Authentication module
-
class
dojot.module.auth.
Auth
(config)¶ Bases:
object
Class responsible for authentication mechanisms in dojot
-
get_access_token
(tenant)¶ Retrieves a token for normal operations associated to a particular tenant.
Return type: str Returns: The token
-
dojot.module.config module¶
Configuration data module
-
class
dojot.module.config.
Config
(config=None)¶ Bases:
object
Main configuration class
This class contains all needed configuration for this library
-
__init__
(config=None)¶ Config constructor
Parameters: config (dict or None) – A configuration dictionary. If set, all its attributes will be set to this object. Any top level key will overwrite the default configuration, i.e., setting a kafka object to config param will overwrite all Kafka configuration. An example of such dictionary is:
config = { "kafka" : { "producer": { "client.id": "kafka", "bootstrap_servers": ["kafka:9092"], "compression.codec": "gzip", "retry.backoff.ms": 200, "message.send.max.retries": 10, "socket.keepalive.enable": True, "queue.buffering.max.messages": 100000, "queue.buffering.max.ms": 1000, "batch.num.messages": 1000000, "dr_cb": True }, "consumer": { "group_id": "my-module", "bootstrap_servers": ["kafka:9092"] }, "dojot": { "poll_timeout": 2000, "subscription_holdoff": 2.5 } }, "data_broker" : { "url": "http://data-broker", "timeout_sleep": 5, "connection_retries": 3 }, "device_manager": { "url": "http://device-manager:5000", "timeout_sleep": 5, "connection_retries": 3 }, "keycloak" = { "timeout_sleep": 5, "connection_retries": 3, "base_path": "http://keycloak:8080/auth/", "ignore_realm": "master", "credentials": { "username": "admin", "password": "admin", "client_id": "admin-cli", "grant_type": "password", } }, "dojot" : { "management": { "user" : "dojot-management", "tenant" : "dojot-management" }, "subjects": { "tenancy": "dojot.tenancy", "devices": "dojot.device-manager.device", "device_data": "device-data, } } }
Warning
If set, the dojot section should be in sync with all other modules. Otherwise this module won’t work properly.
Note
The Kafka object is straight from librdkafka configuration, separated into producer and consumer subobjects. For more information about this configuration, you should check its documentation.
-
load_defaults
()¶ Load default configuration, which is:
kafka: producer: client.id: "kafka" bootstrap_servers: - "kafka:9092" compression.codec: "gzip" retry.backoff.ms: 200 message.send.max.retries: 10 socket.keepalive.enable: True queue.buffering.max.messages: 100000 queue.buffering.max.ms: 1000 batch.num.messages: 1000000 dr_cb: true consumer: group_id: "my-module" bootstrap_servers: - "kafka:9092" dojot: poll_timeout: 2000 subscription_holdoff: 2.5 data_broker: url: "http://data-broker" "timeout_sleep": 5 "connection_retries": 3 device_manager: url: "http://device-manager:5000" "timeout_sleep": 5 "connection_retries": 3 keycloak: "base_path": "http://keycloak:8080/auth/" "timeout_sleep": 5 "connection_retries": 3 "ignore_realm": "master", "credentials": "username": "admin", "password": "admin", "client_id": "admin-cli", "grant_type": "password" dojot: management: user: "dojot-management" tenant: "dojot-management" subjects: tenancy: "dojot.tenancy" devices: "dojot.device-manager.device" device_data: "device-data"
Warning
Calling this function will overwrite any previously set configuration in the created object. Also setting any configuration after Kafka is started or any Messenger object is created will have no effect on them.
Warning
If set, the dojot section should be in sync with all other modules. Otherwise this module won’t work properly.
-
load_env
()¶ Load configuration from environment variables.
Any environment variable will overwrite the default configuration. Check load_defaults() function.
The list of envirnoment variables is:
KAFKA_HOSTS
: a comma-separated list of hosts where an instance of Kafka is running. This will affect the bootstrap_servers parameter for both Kafka consumer and producer.KAFKA_GROUP_ID
: The Kafka consumer group ID to be used.DOJOT_KAFKA_SUBSCRIPTION_HOLDOFF
: Time to wait before performing any subscription.DOJOT_KAFKA_POLL_TIMEOUT
: Time to wait for new messages in Kafka.DATA_BROKER_URL
: Where DataBroker service can be reached.DEVICE_MANAGER_URL
: URL to reach the device-manager service.KEYCLOAK_URL
: Where Keycloak service can be reached.KEYCLOAK_USER
: Keycloak user (this user must have permission to list realms).KEYCLOAK_PASSWORD
: Keycloak user password.KEYCLOAK_CLIENT_ID
: Keycloak client id.DOJOT_MANAGEMENT_TENANT
: tenant to be used when asking DataBroker for management topics (such as tenancy-related topics)DOJOT_MANAGEMENT_USER
: user to be used when asking DataBroker for management topics (such as tenancy-related topics)DOJOT_SUBJECT_TENANCY
: Subject to be used when asking DataBroker for tenancy topics.DOJOT_SUBJECT_DEVICES
: Subject to be used when asking DataBroker for device topics.DOJOT_SUBJECT_DEVICE_DATA
: Subject to be used when asking DataBroker for device data topics.
-
dojot.module.logger module¶
Logging module
dojot.module.messenger module¶
dojot messenger module
-
class
dojot.module.messenger.
Messenger
(name, config)¶ Bases:
object
Class responsible for sending and receiving messages through Kafka using dojot subjects and tenants.
Using this class should be as easy as:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
from dojot.module import Messenger, Config from dojot.module.logger import Log LOGGER = Log().color_log() def rcv_msg(tenant,data): LOGGER.critical("rcvd msg from tenant: %s -> %s" % (tenant,data)) config = Config() messenger = Messenger("Dojot-Snoop", config) messenger.init() # Create a channel using a default subject ``device-data``. messenger.create_channel(config.dojot['subjects']['device_data'], "rw") # Create a second channel using a particular subject ``device-status`` messenger.create_channel("service-status", "w") # Register callback to process incoming device data messenger.on(config.dojot['subjects']['device_data'], "message", rcv_msg) # Publish a message on ``service-status`` subject using ``dojot-management`` service. messenger.publish("service-status", config.dojot['dojot-management'], "service X is up")
And that’s all.
You can use an internal event publishing/subscribing mechanism in order to send events to other parts of the code (using
messenger.on()
andmessenger.emit()
functions) without actually send or receive any messages to/from Kafka. An example:messenger.on("extra-subject", "subject-event", lambda tenant, data: print("Message received ({}): {}", (tenant, data))) messenger.emit("extra-subject", "management-tenant", "subject-event", "message data")
-
create_channel
(subject, mode='r', is_global=False)¶ Creates a new channel tha is related to tenants, subjects, and kafka topics.
Parameters:
-
emit
(subject, tenant, event, data)¶ Executes all callbacks related to that subject:event
Parameters: - subject (str) – The subject to be used when emitting this new event
- tenant (str) – The tenant to be used when emitting this new event
- event (str) – The event to be emitted. This is a arbitrary string.
The module itself will emit only
message
events (seldomlynew-tenant
also) - data (dict) – The data to be emitted
-
generate_device_create_event_for_active_devices
()¶ Generates device creation message for existing devices
-
init
()¶ Initializes the messenger and sets with all tenants
This library uses its own mechanism to discover new tenants and subscribe to topics related to all configured subjects. That way the user can rely only on calling
messenger.on()
functions and therefore it will receive all messages from that subject related to different tenants.Raises: UserWarning – When the list of tenants cannot be retrieved.
-
on
(subject, event, callback)¶ Register new callbacks to be invoked when something happens to a subject The callback should have two parameters: tenant, data
Parameters:
-
process_new_tenant
(tenant, msg)¶ Process new tenant: bootstrap it for all subjects registered and emit an event
Parameters:
-
publish
(subject, tenant, message)¶ Publishes a message in kafka
Parameters:
-
request_device
(tenant)¶ Get devices from the device-manager module
Parameters: - tenant (str) – tenant name for the devices
- page_num (integer) – the number of the page that will be retrieved
-
shutdown
()¶ Shutdown this consumer.
This function will indicate to the consumer threads that they must stop. Remember, though, that this might not occur right away.
-
dojot.module contents¶
dojot module classes