External Systems Notifications
This document describes the structure and operation of the external notification configuration using the history-processor service, which is based on stream events and performs HTTP requests when specified conditions are met.
General Structure
In the history-processor configuration file:
ALERTING:
ALLOWED: true
RULES:
- ALLOWED: true
CONDITION: <condition>
CONSUMER: <data source settings>
DESTINATIONS:
HTTP: <list of HTTP actions>
EXPIRATION: <expiration parameters>
This can also be added via environment variables, for example:
<ENV PREFIX>.ALERTING.RULES.1=json:{"ALERTING":{"ALLOWED":true,"RULES":[{"ALLOWED":true,"CONDITION":"<condition>","CONSUMER":"<data source settings>","DESTINATIONS":{"HTTP":"<list of HTTP actions>"},"EXPIRATION":"<expiration parameters>"}]}}
ALERTING.ALLOWED
Enables or disables the entire alerting system. Values:
- true: alerting is active;
- false: alerting is completely disabled.
RULES
A list of rules. Each rule describes what to do when a certain condition is met.
Rule Parameters:
- ALLOWED: enables or disables the specific rule.
- REDIS: Redis connection parameters (optional, see below).
- CONDITION: logical condition for triggering the rule (e.g.,
dbservice_data.action == 'DELETE'). - CONSUMER: stream connection settings.
- EXPIRATION: event expiration parameters.
- DESTINATIONS.HTTP: list of HTTP requests to execute upon activation.
REDIS
Optional Redis connection parameters, defined per rule. If not defined, will be inherited from main Redis block configuration.
REDIS:
HOST: 127.0.0.1
PORT: 6379
USERNAME: ""
PASSWORD: ""
DB: 0
IS_SENTINEL: false
SENTINEL_MASTER_NAME: ""
SENTINEL_USERNAME: ""
SENTINEL_PASSWORD: ""
SENTINEL_ADDRESSES: ""
CONDITION
A logical condition that defines when the rule should trigger. Example:
CONDITION: dbservice_data.action == 'DELETE'
The syntax
dbservice_data.actionassumes the data format is{"dbservice_data": {"action": "..."}}. Subconditions may be grouped with parentheses and combined using logicalORorAND, e.g.(dbservice_data.action == 'CREATE') OR (dbservice_data.action == 'DELETE')
CONSUMER
Defines the source of data for analysis:
CONSUMER:
BLOCK_TIMEOUT: 10
GROUP: dmserver_historyprocessor
STREAM_NAME: dbservice_events
- STREAM_NAME: stream name.
- GROUP: consumer group.
- BLOCK_TIMEOUT: wait time for new events (in seconds).
EXPIRATION
Specifies how long the alert remains valid:
EXPIRATION:
SECONDS: 3600
TS: dbservice_data.ts
- TS: path to the timestamp field.
- SECONDS: validity period in seconds.
DESTINATIONS.HTTP
List of HTTP requests executed when the rule activates. Each request has these parameters:
- METHOD: HTTP method (
GET,POST, etc.). - URL: request URL.
- HEADERS: HTTP headers.
- ARGS: query parameters.
- BODY: request body (for
POSTetc.). - RETRIES: number of retry attempts on failure.
Example POST request:
- METHOD: POST
URL: http://request-printer:8080/post
HEADERS:
Content-Type: application/json
ARGS:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
BODY:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
RETRIES: 1
Example GET request with dynamic URL:
- METHOD: GET
URL: __stream__:dbservice_data.URL
HEADERS:
Content-Type: application/json
ARGS:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
RETRIES: 1
__stream__ Syntax
Used to substitute values from incoming JSON.
Format: __stream__:<field_path>
Example input event:
{
"dbservice_data": {
"action": "DELETE",
"id": "CPE1234",
"ts": 1720886400,
"URL": "http://external-service.local/notify"
}
}
Substitution examples:
__stream__:dbservice_data.action→"DELETE"__stream__:dbservice_data.id→"CPE1234"__stream__:dbservice_data.URL→"http://external-service.local/notify"
Rule Examples
Rule 1: Delete
CONDITION: dbservice_data.action == 'DELETE'
- Executes GET and POST requests based on event content.
- One request uses a dynamic URL.
- Valid until the expiration time calculated from
dbservice_data.ts.
Rule 2: Create
CONDITION: dbservice_data.action == 'CREATE'
- Behavior is analogous to the
DELETErule.
Activation Behavior
When a condition is met:
- An event is read from the
dbservice_eventsstream. - The
CONDITIONis checked. - If matched, all HTTP requests in
DESTINATIONS.HTTPare executed. - If the event is expired (
EXPIRATION), it is ignored.
Example of configuration of the source of events in DMS
The dumping can be natively configured on the storages in DMS, e.g. cpedbservice (the adapter for CPEs table in RBMS) configuration file:
STREAM_DUMPING:
DELETE: true # CRUD related action name and boolean value
UPDATE: false # CRUD related action name and boolean value
CREATE: true # CRUD related action name and boolean value
STREAM_NAME: dbservice_results
STREAM_LENGTH: 1000
RESULT_KEY: dbservice_data
TIMESTAMP_KEY: ts
This configuration can also be provided via environment variable:
<ENV PREFIX>.STREAM_DUMPING=json:{"DELETE":true,"UPDATE":false,"CREATE":true,"STREAM_NAME":"dbservice_results","STREAM_LENGTH":1000,"RESULT_KEY":"dbservice_data","TIMESTAMP_KEY":"ts"}}
Campaigns.SendNotification scenario usage
scenario allows you to send customized notifications to external systems.
Configuring history-processor rule
First you need to define a rule in history-processor.
Set RX.ALERTING.RULES index and your <DESTINATION_URL> accordingly.
historyprocessor:
name: historyprocessor
env:
- name: RX.ALERTING.RULES.0
value: |
json:{
"ALLOWED": true,
"CONDITION": "",
"CONSUMER": {
"BLOCK_TIMEOUT": 10,
"GROUP": "dmserver_device_notifications",
"STREAM_NAME": "notifications"
},
"DESTINATIONS": {
"HTTP": [
{
"BODY": {
"SerialNumber": "__stream__:notification.SerialNumber",
"NotificationType": "__stream__:notification.NotificationType",
"Timestamp": "__stream__:notification.Timestamp",
"Data": "__stream__:notification.Data"
},
"HEADERS": {
"Content-Type": "application/json"
},
"METHOD": "POST",
"RETRIES": 1,
"URL": "<DESTINATION_URL>"
}
]
},
"EXPIRATION": {
"SECONDS": 86400,
"TS": "notification.Timestamp"
}
}
Scenario input
| Key | Possible values | Required | Description |
|---|---|---|---|
| notification_type | DeviceRegistration, DeviceInform, Custom | yes | Type of notification |
| custom_fields | customparams.SYSTEM.BEHIND_NAT, customparams.SYSTEM.CNR_MODE, customparams.SYSTEM.LAST_INFORM, customparams.SYSTEM.LAST_SESSION_METHODS, customparams.SYSTEM.LAST_RETRY_COUNT, customparams.SYSTEM.STUN_SUPPORTED, customparams.SYSTEM.XMPP_SUPPORTED, datamodel, events, firstseen, hw, ip, lastseen, location, mac, model, sw, vendor | for {"notification_type": "Custom"} only | CPE fields to request |
Scenario input examples:
{"notification_type": "DeviceInform"}
{"notification_type": "DeviceRegistration"}
{"notification_type": "Custom", "custom_fields": ["lastseen", "events"]}
Scenario output
Scenario will publish notification to the Redis stream notifications.
Each notification is then asynchronously delivered in the background as an HTTP POST request.
Request characteristics:
- Headers:
- Content-Type: application/json
- Accept-Encoding: gzip
- Body: JSON-encoded message
SON body structure:
| Key | Description |
|---|---|
| Data | NotificationType specific mapping |
| NotificationType | Same as notification_type from input |
| SerialNumber | CPE identificator (cpe.cpeid) |
| Timestamp | Timestamp of notification creation |
Data examples
DeviceRegistration
{
"Data": {
"DeviceRegistrationTime": 1755089375
},
"NotificationType": "DeviceRegistration",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792791.0146933
}
- DeviceRegistrationTime is
cpe.firstseenfield
DeviceInform
{
"Data": {
"LastInform": {
"InternetGatewayDevice.DeviceInfo.HardwareVersion": "ALPHA 001",
"InternetGatewayDevice.DeviceInfo.ProvisioningCode": "",
"InternetGatewayDevice.DeviceInfo.SoftwareVersion": "01.12.20",
"InternetGatewayDevice.DeviceInfo.SpecVersion": "1.0",
"InternetGatewayDevice.DeviceSummary": "InternetGatewayDevice:1.4[](Baseline:2, EthernetLAN:1, ADSLWAN:1,ADSL2WAN:1, Time:2, IPPing:1, WiFiLAN:2, DeviceAssociation:1), VoiceService:1.0[1](SIPEndpoint:1, Endpoint:1, TAEndpoint:1), StorageService:1.0[1](Baseline:1, FTPServer:1, NetServer:1, HTTPServer:1, UserAccess:1, VolumeConfig:1)",
"InternetGatewayDevice.ManagementServer.ConnectionRequestURL": "http://cpe-emulator:8082/CPE/dms/ACSURL/http%3A//edgeproxy%3A7547/TR69/DMS_DEVICE",
"InternetGatewayDevice.ManagementServer.ParameterKey": "parameter update",
"InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANPPPConnection.1.ExternalIPAddress": "127.0.0.1"
}
},
"NotificationType": "DeviceInform",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792842.081471
}
- LastInform is last CPE Inform message
Custom
{
"Data": {
"customparams.SYSTEM.LAST_RETRY_COUNT": 0
},
"NotificationType": "Custom",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792960.1382487
}
- Data contains CPE fields requested in
custom_fields
You can use this scenario, for example, in Campaigns — to automatically send notifications to external systems when specified conditions are met. Learn more about working with Campaigns here.
Campaign Scenario Result Notifications
The Scenario Service executes all scenarios and publishes their results into a dedicated Redis stream.
These results can be processed in the same way as notifications from the notifications stream described above.
For CWMP campaigns, results are published into the cwmp_results stream, where they can be consumed by history-processor and forwarded to external systems.
Example Rule for Sending Campaign Results to a Specified URL
Below is an example of a history-processor rule that:
- consumes events from the
cwmp_resultsstream, - filters only records where the scenario type is
campaign, - sends the scenario execution results to a specified HTTP endpoint.
{
"ALLOWED": true,
"CONDITION": "scenario_type == \"campaign\"",
"CONSUMER": {
"BLOCK_TIMEOUT": 10,
"GROUP": "dmserver_historyprocessor",
"STREAM_NAME": "cwmp_results"
},
"DESTINATIONS": {
"HTTP": [
{
"BODY": {
"SerialNumber": "__stream__:scenario.cpeid",
"Result": "__stream__:result",
"Envid": "__stream__:scenario.envID"
},
"HEADERS": {
"Content-Type": "application/json"
},
"METHOD": "POST",
"RETRIES": 1,
"URL": "http://request-printer:8080/post"
}
]
},
"EXPIRATION": {
"SECONDS": 86400,
"TS": "scenario.end_ts"
}
}
Example of an event from the cwmp_results stream for a campaign:
{
"CAMPAIGN:7ccd7537-0787-45db-afa8-c01462fde970:f363adc9-53f1-4cd6-8c64-03805f9e03b8:emulator": [
"GetRPCMethods",
"SetParameterValues",
"GetParameterValues",
"GetParameterNames",
"SetParameterAttributes",
"GetParameterAttributes",
"AddObject",
"DeleteObject",
"Reboot",
"Download"
],
"scenario": {
"status": "Finished",
"scenario": "RPC.CWMP.GetRPCMethods",
"cpeid": "emulator",
"initargs": {},
"cache": {},
"step": "1",
"envID": "CAMPAIGN:7ccd7537-0787-45db-afa8-c01462fde970:f363adc9-53f1-4cd6-8c64-03805f9e03b8:emulator",
"author": "DMServer Campaign",
"start_ts": 1763755336.801468,
"end_ts": 1763755336.82063,
"exit_code": 0,
"callback": "",
"silent": true,
"storage": null
},
"scenario_type": "campaign",
"result": [
"GetRPCMethods",
"SetParameterValues",
"GetParameterValues",
"GetParameterNames",
"SetParameterAttributes",
"GetParameterAttributes",
"AddObject",
"DeleteObject",
"Reboot",
"Download"
],
"storage_address": "http://campaignresultstorage:20101/"
}