Table of Contents

Introduction

The Data Platform (DP) notification system allows you to subscribe to data and metadata changes using a publish/subscriber pattern.

Here is a typical notification workflow:

  • Find a "topic" for which you want to keep up to date with any changes in the Data Platform.
  • Create a push endpoint that is used to receive notifications about the interested topic.
  • Create a subscription in the Data Platform Notification system and prove the ownership of the push endpoint.
  • The consuming application, the push endpoint registered for the subscription, starts receiving notifications for that topic and processes the message to synchronize with the Data Platform state.
  • Periodically rotate the "secret" used for the subscription.

The topics below describe these steps/APIs in detail that allow you to create integrated workflows using the DP Notification system.

Back to Table of Contents

Steps

Get topics available to subscribe

Use the Data Notification "topics" API to view the list of supported notification topics and corresponding sample messages.

POST {root-url}/api/register/v1/topics
curl
curl --request GET \
  --url '{root-url}/api/register/v1/topics' \
  --header 'Authorization:  Bearer <JWT>' \
  --header 'Content-Type: application/json' \
  --header 'data-partition-id: opendes' \

A sample output is shown below. Note the "name" of the topic. The "name" is required to create a subscription for the topic you are interested in.

Sample response
[
  {
	"name": "recordstopic",
    "description": "This notification is sent whenever a new record or record version is created, updated or deleted in storage. 'previousVersionKind' is noted upon 'kind' update. Record deletion is noted as a soft 'deletionType'. Record purge is noted as a hard 'deletionType'.",
    "state": "ACTIVE",
    "example": [
      {
        "id": "opendes:abc:123",
        "kind": "opendes:petrel:regularheightfieldsurface:1.0.0",
        "op": "create"
      },
      {
          "id": "opendes:abc:345",
          "kind": "opendes:petrel:regularheightfieldsurface:1.0.1",
          "op": "update",
          "previousVersionKind": "opendes:petrel:regularheightfieldsurface:1.0.0"
      },
      {
          "id": "opendes:abc:567",
          "kind": "opendes:petrel:regularheightfieldsurface:1.0.0",
          "op": "delete",
          "deletionType": "soft"
      },
      {
          "id": "opendes:abc:789",
          "kind": "opendes:petrel:regularheightfieldsurface:1.0.0",
          "op": "delete",
          "deletionType": "hard"
      }
    ]
  },
  {
    "name": "schemachangedtopic",
    "description": "This notification is sent whenever a new schema is created or updated via schema-service.",
    "state": "ACTIVE",
    "example": [
      {
        "kind": "opendes:wks:wellbore:1.0.0",
        "op": "update"
      },
      {
        "kind": "opendes:wks:wellbore:2.0.0",
        "op": "create"
      }
    ]
  },
  {
    "name": "statuschangedtopic",
    "description": "Every Service/Stage would publish their respective status changed information in this topic.",
    "state": "ACTIVE",
    "example": [
      {
        "kind": "status",
        "properties": {
          "correlationId": "12345",
          "recordId": "opendes:file:3479d828-a47d-4e13-a1f8-9791a19e1a7e",
          "recordIdVersion": "1610537924768407",
          "stage": "STORAGE_SYNC",
          "status": "FAILED",
          "message": "acl is not valid",        
          "errorCode ": 400,
          "timestamp ": 1622118996000
        }
      },
      {
        "kind": "dataSetDetails",
        "properties": {
          "correlationId": "12345",
          "dataSetId": "12345",
          "dataSetIdVersion": "1",
          "dataSetType": "FILE",
          "recordCount": 10,
           "timestamp ": 1622118996000
        }
      }
    ]
  },
  {
    "name": "recordstopic-v2",
    "description": "This notification is sent whenever a new record or record version is created, updated or deleted in storage for all collaboration and non-collaboration context changes. The collaboration context will be provided as part of the message properties if exist. 'previousVersionKind' is noted upon 'kind' update. Record deletion is noted as a soft 'deletionType'. Record purge is noted as a hard 'deletionType'.",
    "state": "ACTIVE",
    "example": {
      "message": {
        "data": [
          {
            "id": "opendes:abc:123",
            "kind": "opendes:petrel:regularheightfieldsurface:1.0.0",
            "op": "create"
          }
        ],
        "account-id": "opendes",
        "data-partition-id": "opendes",
        "correlation-id": "4f1982a2-cbdf-438a-b5a1-e0c6239d46fc",
        "x-collaboration": "id=1e1c4e74-3b9b-4b17-a0d5-67766558ec65,application=Test App"
      }
    }
  },
  {
    "name": "entitlements-changed",
    "description": "This notification is sent whenever a group is updated or deleted.",
    "state": "ACTIVE",
    "example": [
      {
        "kind": "groupChanged",
        "group": "users.testgroup.12345@dp1.slb.com",
        "user": "member1@slb.com",
        "action": "add",
        "modifiedBy": "requester1@slb.com",
        "modifiedOn": 123456789
      },
      {
        "kind": "groupChanged",
        "group": "users.testgroup.12345@dp1.slb.com",
        "user": "member1@slb.com",
        "updatedGroupEmail": "",
        "action": "remove",
        "modifiedBy": "requester1@slb.com",
        "modifiedOn": 123456789
      },
      {
        "kind": "groupChanged",
        "group": "users.testgroup.12345@dp1.slb.com",
        "updatedGroupEmail": "users.testgroup_new.12345@dp1.slb.com",
        "action": "replace",
        "modifiedBy": "requester1@slb.com",
        "modifiedOn": 123456789
      },
      {
        "kind": "groupDeleted",
        "group": "users.testgroup.12345@dp1.slb.com",
        "modifiedBy": "requester1@slb.com",
        "modifiedOn": 123456789
      }
    ]
  }
]

Back to Table of Contents

Subscribing to a topic

Use the Data Notification Subscription API to create a subscription for the topic of interest. A subscription ID is returned in the response that can be used to get the subscription details again or to delete the subscription later.

POST {root-url}/api/register/v1/subscription

To subscribe to a topic, you must have an "https" endpoint that supports both "GET" and "POST" methods. "GET" is used as a challenge endpoint when creating or updating a subscription to validate that the consuming application owns this endpoint. "POST" is used to push the notifications to consuming application.

The challenge is performed only when creating a subscription or when you Update the secret for a subscription.

The details of the two types of subscriptions and the challenge process follows:

Hash-based Message Authentication Code (HMAC) subscription using a "secret" string

curl
  curl --request POST \
    --url '{root-url}/api/register/v1/subscription \
    -header 'Authorization: Bearer <JWT>' \
    -header 'Content-Type: application/json' \
    -header 'data-partition-id: opendes' \
    -data '{
  	"name": "testSubscription",
  	"description": "Description",
  	"topic": "records-changed",
  	"pushEndpoint": "<DomainEndpoint>",
  	"secret": {
  		"secretType": "HMAC",
  		"value": "4A6F686E446F65"
  	}
  }'

Before creating an HMAC subscription, make sure that "GET" is supported on the endpoint being registered with the DP Notification system and that the endpoint accepts the query parameters named "crc" and "HMAC". The DP Notification system sends a "GET" request to this endpoint with a random crc, and it expects a response hash generated using the crc and the secret value, provided as an xsd:hexBinary string (for example: 4A6F686E446F65).

In addition, you may also want to validate the HMAC field, which is the signature that is used when a message is pushed to this endpoint. The signature verification must be used in the push endpoint implementation, before processing the messages, to ensure that the message is coming from the DP Notification system.

Note: The secret must conform to the hexBinary format (defined in XML Schema) which is pairs of hexadecimal characters (digits 0–9 and letters A–F in either lower or upper case); any characters outside this range are not allowed. The number of characters must be even.

Sample API definition for setting up the challange end point
...
paths:
   ...
   ...
  '/consumer':
    get:
      summary: Notification "get" API
      consumes:
        - application/json
      produces:
        - application/json
      parameters:
        - in: query
          name: crc
          type: string
          required: true
        - in: query
          name: hmac
          type: xsd:hexBinary string
          required: true
      responses:
        '200':
          description: OK
          schema:
            $ref: '#/definitions/challengeResponse'

    ...
    ...

definitions:
  challengeResponse:
    type: object
    required:
      - schema
    properties:
      responseHash:
        type: string    
Sample Java code to generate the HMAC signature, validate it, and send a response with hash.

...
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
....

@GET
@Path(<DomainEndpoint>)
public Response challenge(@QueryParam("crc") @NotBlank String crc, 
                          @QueryParam("hmac") @NotBlank String hmac) {
    // Use the secret you send to the subscriber registration create request.
    // Hint: The secret string can be stored as configuration for the service.    
    String secret = <getSecretString()>
    verifyHmacSignature(hmac, secret);
    
    String response = getResponseHash( secret + crc);
    return Response.status(HttpStatus.SC_OK).entity(Collections.singletonMap("responseHash", response)).build();
}

private String getResponseHash(String input) {
    String response = Hashing.sha256()
            .hashString(input, StandardCharsets.UTF_8)
            .toString();
    return Base64.getEncoder().encodeToString(response.getBytes());
}

private static final String HMAC_SHA_256 = "HmacSHA256";
private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}";
private static final String HMAC_SHA_256 = "HmacSHA256";
private static final String DATA_FORMAT = "{\"expireMillisecond\": \"%s\",\"hashMechanism\": \"hmacSHA256\",\"endpointUrl\": \"%s\",\"nonce\": \"%s\"}";
private static final String NOTIFICATION_SERVICE = "de-notification-service";
private static final long EXPIRE_DURATION = 30000L;

private void verifyHmacSignature(String hmac, String secret) throws Exception {
    if (Strings.isNullOrEmpty(hmac)) {
        throw new Exception(MISSING_HMAC_SIGNATURE);
    }
    if (Strings.isNullOrEmpty(secret)) {
        throw new Exception(MISSING_SECRET_VALUE);
    }
    String[] tokens = hmac.split("\\.");
    if (tokens.length != 2) {
        throw new Exception(INVALID_SIGNATURE);
    }
    byte[] dataBytes = Base64.getDecoder().decode(tokens[0]);
    String requestSignature = tokens[1];

    String data = new String(dataBytes, StandardCharsets.UTF_8);
    HmacData hmacData = new Gson().fromJson(data, HmacData.class);
    String url = hmacData.getEndpointUrl();
    String nonce = hmacData.getNonce();
    String expireTime = hmacData.getExpireMillisecond();
    if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(nonce) || Strings.isNullOrEmpty(expireTime)) {
        throw new Exception(MISSING_ATTRIBUTES_IN_SIGNATURE);
    }
    String newSignature = getSignedSignature(url, secret, expireTime, nonce);
    if (!requestSignature.equalsIgnoreCase(newSignature)) {
        throw new Exception(INVALID_SIGNATURE);
    }
}

private String getSignedSignature(String url, String secret, String expireTime, String nonce) throws Exception {
    if (Strings.isNullOrEmpty(url) || Strings.isNullOrEmpty(secret) || !StringUtils.isNumeric(expireTime)) {
        throw new Exception(ERROR_GENERATING_SIGNATURE);
    }
    final long expiry = Long.parseLong(expireTime);
    if (System.currentTimeMillis() > expiry) {
        throw new Exception(SIGNATURE_EXPIRED);
    }
    String timeStamp = String.valueOf(expiry - EXPIRE_DURATION);
    String data = String.format(DATA_FORMAT, expireTime, url, nonce);
    try {
        final byte[] signature = getSignature(secret, nonce, timeStamp, data);
        return DatatypeConverter.printHexBinary(signature).toLowerCase();
    } catch (Exception ex) {
        throw new Exception(ERROR_GENERATING_SIGNATURE, ex);
    }
}

private byte[] getSignature(String secret, String nonce, String timeStamp, String data) throws Exception {
    final byte[] secretBytes = DatatypeConverter.parseHexBinary(secret);
    final byte[] nonceBytes = DatatypeConverter.parseHexBinary(nonce);
    final byte[] encryptedNonce = computeHmacSha256(nonceBytes, secretBytes);
    final byte[] encryptedTimestamp = computeHmacSha256(timeStamp, encryptedNonce);
    final byte[] signedKey = computeHmacSha256(NOTIFICATION_SERVICE, encryptedTimestamp);
    final byte[] signature = computeHmacSha256(data, signedKey);
    return signature;
}

private byte[] computeHmacSha256(final String data, final byte[] key) throws Exception {
    final Mac mac = Mac.getInstance(HMAC_SHA_256);
    mac.init(new SecretKeySpec(key, HMAC_SHA_256));
    return mac.doFinal(data.getBytes(StandardCharsets.UTF_8));
}

private byte[] computeHmacSha256(final byte[] data, final byte[] key) throws Exception {
    final Mac mac = Mac.getInstance(HMAC_SHA_256);
    mac.init(new SecretKeySpec(key, HMAC_SHA_256));
    return mac.doFinal(data);
}

Back to Table of Contents

Get a subscription by ID

Use this API to get the subscription details for the subscription with the given ID.

GET {root-url}/api/register/v1/subscription/{id}
curl
curl --request GET \
   --url '{root-url}/api/register/v1/subscription/{id} \
  --header 'Authorization: Bearer <JWT>' \
  --header 'data-partition-id: opendes'

Back to Table of Contents

Delete a subscription by ID

Use this API to delete a subscription with the given subscription ID.

DELETE {root-url}/api/register/v1/subscription/{id}
curl
curl --request DELETE\
  --url '{root-url}/api/register/v1/subscription/{id}' \
  --header 'authorization: Bearer <JWT>' \
  --header 'content-type: application/json' \
  --header 'data-partition-id: opendes'

Back to Table of Contents

Handling notifications

The consuming application starts receiving messages for the topics to which it has subscribed.

Notification requests to registered endpoints

The Data Platform supports notification for these topics: recordstopic, schemachangedtopic, and statuschangedtopic. Every record or permission changed message sent from the notification service will be a POST request with a JSON body. All messages sent will include the following headers:

authorizationThe JASON Web Token (JWT) of the service account sent from the service bus. You can secure the endpoint by validating this.
data-partition-idThe partition from where the event was triggered.
correlation-idThe correlation ID for the request generated by the notification service.

Records change message content

The sample message for record change notification looks like this:

Note that the example with previousVersionKind is where the record kind is updated.

[ 
  {"id":"record_id_1","kind":"opendes:osdu:wks:1.0.0","op":"create"},
  {"id":"record_id_2","kind":"opendes:osdu:wks:1.0.1","op":"update","previousVersionKind":"opendes:osdu:wks:1.0.0"},
  {"id":"record_id_3","kind":"opendes:osdu:wks:1.0.0","op":"update"},
  {"id":"record_id_4","kind":"opendes:osdu:wks:1.0.0","op":"delete"}
  ...
]

Possible values of operation types, the "op" field in above example, are as follows:

  • create
  • update
  • delete

Schema change message content

The sample message for a schema change notification looks like this:

[ 
  { "kind": "osdu:wks:wellbore:1.0.0", "op": "update" },
  { "kind": "osdu:wks:wellbore:2.0.0", "op": "create" }
  ...
]

Status change message content

The sample message for a status change notification looks like this:

[
  {
    "kind": "status",
    "properties": {
      "correlationId": "12345",
      "recordId": "opendes:file:3479d828-a47d-4e13-a1f8-9791a19e1a7e",
      "recordIdVersion": "1610537924768407",
      "stage": "STORAGE_SYNC",
      "status": "FAILED",
      "message": "acl is not valid",
      "errorCode ": 400,
      "timestamp ": 1622118996000
    }
  },
  {
    "kind": "dataSetDetails",
    "properties": {
      "correlationId": "12345",
      "dataSetId": "12345",
      "dataSetIdVersion": "1",
      "dataSetType": "FILE",
      "recordCount": 10,
      "timestamp ": 1622118996000
    }
  }
  ....
]

Note that each message can contain a maximum of 50 record updates and can have updates for multiple kinds and operations. If there are more than 50 record updates, subscribers will receive multiple messages.

Notification handler endpoint - HMAC Secret type

Endpoints with HMAC subscriptions must accept a parameter named "hmac", which has a signature, as described above. Make sure to validate this signature before processing messages in the request body.

This example is a simple API definition to receive notifications.

...
  '/consumer':
    post:
      summary: Receive notification
      description: "Receives push notification from DP"
      consumes:
        - application/json
      produces:
        - application/json
      parameters:
        - in: query
          name: hmac
          type: xsd:hexBinary string
          required: true
      responses:
        '200':
          description: OK
...
This example is a simple Java implementation of the endpoint.
@POST
@Path(<ConsumerEndpoint>)
public Response processMessage(@NotBlank(message = "Request body can not be null") String data,
                               @NotBlank(message = "'hmac signature' can not be null or empty") @QueryParam("hmac") String hmac,
                               @HeaderParam("data-partition-id") String partitionId,
                               @HeaderParam("correlation-id") String correlationId) throws Exception {
    // Exception is thrown if not possible to verify Hmac signature
    verifyHmacSignature(hmac)

    ...
    // Get message from body
    
    // Process message
    
    ...
    
    return response
}

Responding to notifications

The Notification service expects a response with the code in the 200-299 range for successfully acknowledged messages. It expects such a response from the consumer endpoint within 30 seconds. If the acknowledgement is not received in this time, the Notification service will continue to call the endpoint for 5 days. The frequency of the message slows down if it consistently fails to receive a successful acknowledgement.

Back to Table of Contents

Update the secret for a subscription

Update your secret regularly for the subscriptions to avoid security issues.

You can do this using the DP Notification Update Subscription API. Update the "GET" endpoint first to point to the new secret. The same verification will be performed again with new secret value.

The secret change takes effect immediately.

PUT {root-url}/api/register/v1/subscription/{id}/secret
curl
curl --request PUT \
  --url '{root-url}/api/register/v1/subscription/{id}/secret' \
  --header 'authorization: Bearer <JWT>' \
  --header 'content-type: application/json' \
  --header 'data-partition-id: opendes' \
  --data '{
            "secretType": "HMAC",
            "value": <newValue>
         }'

Back to Table of Contents

Current limitations

  • No filtering is applied to messages, such as based on the kind, etc., in the Data Platform. All the messages are pushed to the consuming application.
  • Existing record update events are also notified as a create event.

Back to Table of Contents