Skip to main content

External Systems Notifications

This document describes the structure and operation of the external notification configuration using the history-processor service, which is based on stream events and performs HTTP requests when specified conditions are met.

General Structure

In the history-processor configuration file:

ALERTING:
ALLOWED: true
RULES:
- ALLOWED: true
CONDITION: <condition>
CONSUMER: <data source settings>
DESTINATIONS:
HTTP: <list of HTTP actions>
EXPIRATION: <expiration parameters>

This can also be added via environment variables, for example:

<ENV PREFIX>.ALERTING.RULES.1=json:{"ALERTING":{"ALLOWED":true,"RULES":[{"ALLOWED":true,"CONDITION":"<condition>","CONSUMER":"<data source settings>","DESTINATIONS":{"HTTP":"<list of HTTP actions>"},"EXPIRATION":"<expiration parameters>"}]}}

ALERTING.ALLOWED

Enables or disables the entire alerting system. Values:

  • true: alerting is active;
  • false: alerting is completely disabled.

RULES

A list of rules. Each rule describes what to do when a certain condition is met.

Rule Parameters:

  • ALLOWED: enables or disables the specific rule.
  • REDIS: Redis connection parameters (optional, see below).
  • CONDITION: logical condition for triggering the rule (e.g., dbservice_data.action == 'DELETE').
  • CONSUMER: stream connection settings.
  • EXPIRATION: event expiration parameters.
  • DESTINATIONS.HTTP: list of HTTP requests to execute upon activation.

REDIS

Optional Redis connection parameters, defined per rule. If not defined, will be inherited from main Redis block configuration.

REDIS:
HOST: 127.0.0.1
PORT: 6379
USERNAME: ""
PASSWORD: ""
DB: 0
IS_SENTINEL: false
SENTINEL_MASTER_NAME: ""
SENTINEL_USERNAME: ""
SENTINEL_PASSWORD: ""
SENTINEL_ADDRESSES: ""

CONDITION

A logical condition that defines when the rule should trigger. Example:

CONDITION: dbservice_data.action == 'DELETE'

The syntax dbservice_data.action assumes the data format is {"dbservice_data": {"action": "..."}}. Subconditions may be grouped with parentheses and combined using logical OR or AND, e.g. (dbservice_data.action == 'CREATE') OR (dbservice_data.action == 'DELETE')

CONSUMER

Defines the source of data for analysis:

CONSUMER:
BLOCK_TIMEOUT: 10
GROUP: dmserver_historyprocessor
STREAM_NAME: dbservice_events
  • STREAM_NAME: stream name.
  • GROUP: consumer group.
  • BLOCK_TIMEOUT: wait time for new events (in seconds).

EXPIRATION

Specifies how long the alert remains valid:

EXPIRATION:
SECONDS: 3600
TS: dbservice_data.ts
  • TS: path to the timestamp field.
  • SECONDS: validity period in seconds.

DESTINATIONS.HTTP

List of HTTP requests executed when the rule activates. Each request has these parameters:

  • METHOD: HTTP method (GET, POST, etc.).
  • URL: request URL.
  • HEADERS: HTTP headers.
  • ARGS: query parameters.
  • BODY: request body (for POST etc.).
  • RETRIES: number of retry attempts on failure.

Example POST request:

- METHOD: POST
URL: http://request-printer:8080/post
HEADERS:
Content-Type: application/json
ARGS:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
BODY:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
RETRIES: 1

Example GET request with dynamic URL:

- METHOD: GET
URL: __stream__:dbservice_data.URL
HEADERS:
Content-Type: application/json
ARGS:
action: __stream__:dbservice_data.action
cpe: __stream__:dbservice_data.id
RETRIES: 1

__stream__ Syntax

Used to substitute values from incoming JSON. Format: __stream__:<field_path>

Example input event:

{
"dbservice_data": {
"action": "DELETE",
"id": "CPE1234",
"ts": 1720886400,
"URL": "http://external-service.local/notify"
}
}

Substitution examples:

  • __stream__:dbservice_data.action"DELETE"
  • __stream__:dbservice_data.id"CPE1234"
  • __stream__:dbservice_data.URL"http://external-service.local/notify"

Rule Examples

Rule 1: Delete

CONDITION: dbservice_data.action == 'DELETE'
  • Executes GET and POST requests based on event content.
  • One request uses a dynamic URL.
  • Valid until the expiration time calculated from dbservice_data.ts.

Rule 2: Create

CONDITION: dbservice_data.action == 'CREATE'
  • Behavior is analogous to the DELETE rule.

Activation Behavior

When a condition is met:

  1. An event is read from the dbservice_events stream.
  2. The CONDITION is checked.
  3. If matched, all HTTP requests in DESTINATIONS.HTTP are executed.
  4. If the event is expired (EXPIRATION), it is ignored.

Example of configuration of the source of events in DMS

The dumping can be natively configured on the storages in DMS, e.g. cpedbservice (the adapter for CPEs table in RBMS) configuration file:

STREAM_DUMPING:
DELETE: true # CRUD related action name and boolean value
UPDATE: false # CRUD related action name and boolean value
CREATE: true # CRUD related action name and boolean value
STREAM_NAME: dbservice_results
STREAM_LENGTH: 1000
RESULT_KEY: dbservice_data
TIMESTAMP_KEY: ts

This configuration can also be provided via environment variable:

<ENV PREFIX>.STREAM_DUMPING=json:{"DELETE":true,"UPDATE":false,"CREATE":true,"STREAM_NAME":"dbservice_results","STREAM_LENGTH":1000,"RESULT_KEY":"dbservice_data","TIMESTAMP_KEY":"ts"}}

Campaigns.SendNotification scenario usage

scenario allows you to send customized notifications to external systems.

Configuring history-processor rule

First you need to define a rule in history-processor. Set RX.ALERTING.RULES index and your <DESTINATION_URL> accordingly.

historyprocessor:
name: historyprocessor
env:
- name: RX.ALERTING.RULES.0
value: |
json:{
"ALLOWED": true,
"CONDITION": "",
"CONSUMER": {
"BLOCK_TIMEOUT": 10,
"GROUP": "dmserver_device_notifications",
"STREAM_NAME": "notifications"
},
"DESTINATIONS": {
"HTTP": [
{
"BODY": {
"SerialNumber": "__stream__:notification.SerialNumber",
"NotificationType": "__stream__:notification.NotificationType",
"Timestamp": "__stream__:notification.Timestamp",
"Data": "__stream__:notification.Data"
},
"HEADERS": {
"Content-Type": "application/json"
},
"METHOD": "POST",
"RETRIES": 1,
"URL": "<DESTINATION_URL>"
}
]
},
"EXPIRATION": {
"SECONDS": 86400,
"TS": "notification.Timestamp"
}
}

Scenario input

KeyPossible valuesRequiredDescription
notification_typeDeviceRegistration, DeviceInform, CustomyesType of notification
custom_fieldscustomparams.SYSTEM.BEHIND_NAT, customparams.SYSTEM.CNR_MODE, customparams.SYSTEM.LAST_INFORM, customparams.SYSTEM.LAST_SESSION_METHODS, customparams.SYSTEM.LAST_RETRY_COUNT, customparams.SYSTEM.STUN_SUPPORTED, customparams.SYSTEM.XMPP_SUPPORTED, datamodel, events, firstseen, hw, ip, lastseen, location, mac, model, sw, vendorfor {"notification_type": "Custom"} onlyCPE fields to request

Scenario input examples:

{"notification_type": "DeviceInform"}
{"notification_type": "DeviceRegistration"}
{"notification_type": "Custom", "custom_fields": ["lastseen", "events"]}

Scenario output

Scenario will publish notification to the Redis stream notifications. Each notification is then asynchronously delivered in the background as an HTTP POST request.

Request characteristics:

  • Headers:
    • Content-Type: application/json
    • Accept-Encoding: gzip
  • Body: JSON-encoded message

SON body structure:

KeyDescription
DataNotificationType specific mapping
NotificationTypeSame as notification_type from input
SerialNumberCPE identificator (cpe.cpeid)
TimestampTimestamp of notification creation

Data examples

DeviceRegistration

{
"Data": {
"DeviceRegistrationTime": 1755089375
},
"NotificationType": "DeviceRegistration",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792791.0146933
}
  • DeviceRegistrationTime is cpe.firstseen field

DeviceInform

{
"Data": {
"LastInform": {
"InternetGatewayDevice.DeviceInfo.HardwareVersion": "ALPHA 001",
"InternetGatewayDevice.DeviceInfo.ProvisioningCode": "",
"InternetGatewayDevice.DeviceInfo.SoftwareVersion": "01.12.20",
"InternetGatewayDevice.DeviceInfo.SpecVersion": "1.0",
"InternetGatewayDevice.DeviceSummary": "InternetGatewayDevice:1.4[](Baseline:2, EthernetLAN:1, ADSLWAN:1,ADSL2WAN:1, Time:2, IPPing:1, WiFiLAN:2, DeviceAssociation:1), VoiceService:1.0[1](SIPEndpoint:1, Endpoint:1, TAEndpoint:1), StorageService:1.0[1](Baseline:1, FTPServer:1, NetServer:1, HTTPServer:1, UserAccess:1, VolumeConfig:1)",
"InternetGatewayDevice.ManagementServer.ConnectionRequestURL": "http://cpe-emulator:8082/CPE/dms/ACSURL/http%3A//edgeproxy%3A7547/TR69/DMS_DEVICE",
"InternetGatewayDevice.ManagementServer.ParameterKey": "parameter update",
"InternetGatewayDevice.WANDevice.1.WANConnectionDevice.1.WANPPPConnection.1.ExternalIPAddress": "127.0.0.1"
}
},
"NotificationType": "DeviceInform",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792842.081471
}
  • LastInform is last CPE Inform message

Custom

{
"Data": {
"customparams.SYSTEM.LAST_RETRY_COUNT": 0
},
"NotificationType": "Custom",
"SerialNumber": "DMS_DEVICE",
"Timestamp": 1755792960.1382487
}
  • Data contains CPE fields requested in custom_fields

You can use this scenario, for example, in Campaigns — to automatically send notifications to external systems when specified conditions are met. Learn more about working with Campaigns here.

Campaign Scenario Result Notifications

The Scenario Service executes all scenarios and publishes their results into a dedicated Redis stream.
These results can be processed in the same way as notifications from the notifications stream described above.

For CWMP campaigns, results are published into the cwmp_results stream, where they can be consumed by history-processor and forwarded to external systems.

Example Rule for Sending Campaign Results to a Specified URL

Below is an example of a history-processor rule that:

  • consumes events from the cwmp_results stream,
  • filters only records where the scenario type is campaign,
  • sends the scenario execution results to a specified HTTP endpoint.
{
"ALLOWED": true,
"CONDITION": "scenario_type == \"campaign\"",
"CONSUMER": {
"BLOCK_TIMEOUT": 10,
"GROUP": "dmserver_historyprocessor",
"STREAM_NAME": "cwmp_results"
},
"DESTINATIONS": {
"HTTP": [
{
"BODY": {
"SerialNumber": "__stream__:scenario.cpeid",
"Result": "__stream__:result",
"Envid": "__stream__:scenario.envID"
},
"HEADERS": {
"Content-Type": "application/json"
},
"METHOD": "POST",
"RETRIES": 1,
"URL": "http://request-printer:8080/post"
}
]
},
"EXPIRATION": {
"SECONDS": 86400,
"TS": "scenario.end_ts"
}
}

Example of an event from the cwmp_results stream for a campaign:


{
"CAMPAIGN:7ccd7537-0787-45db-afa8-c01462fde970:f363adc9-53f1-4cd6-8c64-03805f9e03b8:emulator": [
"GetRPCMethods",
"SetParameterValues",
"GetParameterValues",
"GetParameterNames",
"SetParameterAttributes",
"GetParameterAttributes",
"AddObject",
"DeleteObject",
"Reboot",
"Download"
],
"scenario": {
"status": "Finished",
"scenario": "RPC.CWMP.GetRPCMethods",
"cpeid": "emulator",
"initargs": {},
"cache": {},
"step": "1",
"envID": "CAMPAIGN:7ccd7537-0787-45db-afa8-c01462fde970:f363adc9-53f1-4cd6-8c64-03805f9e03b8:emulator",
"author": "DMServer Campaign",
"start_ts": 1763755336.801468,
"end_ts": 1763755336.82063,
"exit_code": 0,
"callback": "",
"silent": true,
"storage": null
},
"scenario_type": "campaign",
"result": [
"GetRPCMethods",
"SetParameterValues",
"GetParameterValues",
"GetParameterNames",
"SetParameterAttributes",
"GetParameterAttributes",
"AddObject",
"DeleteObject",
"Reboot",
"Download"
],
"storage_address": "http://campaignresultstorage:20101/"
}