Table of contents

Introduction

When shards in the ECK reaches capacity, one option is to clean up the indices with zero documents or delete the data that is no longer needed, to bring own the shard capacity. The other option is, of course, to increase the shard capacity, which has cost implications.

Steps to clean up data

Before the indices can be cleaned up, the data associated with the schema must be deleted. Below are the steps to clean up the storage records related to the schema we want to clean up.

  1. Query for all the records in a kind/schema using Search Service.
    a. If the number of records is less than 10k, it is best to use the Search Query API.
    b. If the number of records exceeds 10k, then you can use the Search Query with Cursor API
    Note that if there are records that failed to index correctly, then they will not be returned when querying via Search service. In this case, please reach out to an OSDU SRE for help querying for all the records belonging to a kind from Storage service (this is a privileged API).
    This needs to be done by a script to iterate through all the records until the cursor returns null.

  2. Delete all the records retrieved from Step 1 using the Storage Purge API from Storage Service. This operation should also delete index record info with it.
    Since only one record can be purged at a time, this needs to be done by a script to iterate through all the records.

  3. Verify that all the records have been purged properly by searching for them again using the Search Query API

Required roles for using API

StepAPIRequired roles
1POST / search/query_with_cursorusers.datalake.viewers or users.datalake.editors or users.datalake.admins
2DELETE storage/records/{id}users.datalake.admins
3POST / search/queryusers.datalake.viewers or users.datalake.editors or users.datalake.admins

Example code

Here is the Python script that follows above-mentioned steps.

import requests
import time

tenant_id = ""
resource_id = ""
client_id = ""
client_secret = ""

data_partition_id="opendes"
schema_id="opendes:test:facet:1.0.5"

BASE_URL="https://evd.managed-osdu.cloud.slb-ds.com/api"

#Delay can vary between a minute and up to half an hour, depending on the number of records to be deleted
delay_before_verification_in_seconds=60
#Limit(number of record ids received per requests) for SEARCH/query_with_cursor API
limit=1000

STORAGE_URL=f"{BASE_URL}/storage/v2"
SEARCH_URL=f"{BASE_URL}/search/v2"

data = {
    'grant_type': 'client_credentials',
    'client_id': client_id,
    'client_secret': client_secret,
    'resource': resource_id
}

auth_response = requests.post(
    f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
    headers={'Content-Type': 'application/x-www-form-urlencoded'},
    data=data
)

auth_token = auth_response.json()['access_token']

headers = {
    "Content-Type": "application/json",
    "data-partition-id": data_partition_id,
    "Authorization": f"Bearer {auth_token}"
}

#Step 1 Get all records to delete
print("Search all the records related to the kind/schema")
exit_from_loop = False
cursor = ""
records_to_delete = []
new_elements = []
while not exit_from_loop:
    data = {
        "cursor": cursor,
        "limit": limit,
        "kind": schema_id,
        "returnedFields": ["id"]
    }

    response = requests.post(f"{SEARCH_URL}/query_with_cursor", json=data, headers=headers, verify=False)
    RECORDS_BY_KIND = response.json()

    cursor = RECORDS_BY_KIND.get("cursor")
    new_elements = [obj["id"] for obj in RECORDS_BY_KIND["results"]]

    records_to_delete.extend(new_elements)
    if cursor is None:
        exit_from_loop = True
print("Search completed. Number of records to delete: ", len(records_to_delete))

#Step 2 Deleting records
for id in records_to_delete:
    response = requests.delete(f"{STORAGE_URL}/records/{id}", headers=headers, verify=False)
    print(f"Deleting record with id {id}. Response status: ", response.status_code)

#Step 3 Searching records
print(f"Waiting for {delay_before_verification_in_seconds} seconds...")
time.sleep(delay_before_verification_in_seconds)

data = {
    "kind": schema_id,
    "returnedFields": ["id"]
}
response = requests.post(f"{SEARCH_URL}/query", headers=headers, json=data, verify=False)
print(f"Searching records with kind {schema_id}. Total count: ", response.json().get("totalCount"))
  1. Save script as file with *.py extension (for example filename.py).
  2. Fill in the appropriate values for tenant_id, resource_id, client_id, client_secrets, data_partition_id, schema_id and BASE_URL. Open a terminal or command prompt, navigate to the directory containing Python file, and run the following command python filename.py. If everything went well, you should see Total count: 0 at the end (similar to this one):
Search all the records related to the kind/schema
Search completed. Number of records to delete:  4
Deleting record with id opendes:test:well1... Response status:  204
Deleting record with id opendes:test:well2... Response status:  204
Deleting record with id opendes:test:well3... Response status:  204
Deleting record with id opendes:test:well4... Response status:  204
Waiting for 60 seconds...
Searching records with kind slb:test:well:1.0.0. Total count:  0