In this tutorial we will explain:
- How to write bulk data using Wellbore DDMS chunking APIs
- How to read and write a given version of a WellLog
- How to read bulk data with filtering options as columns, offset and limit
- How is ensured meta (record) and bulk data consistency for WellLogs
- How is ensured meta (record) and bulk data consistency for Wellbore Trajectories
Before to start to write bulk data through Wellbore DDMS API's, you will need to install the Python packages below:
- The pandas module and its Pandas.Dataframe json format to structure log bulk data to be written to the Wellbore DDMS.
- The pyarrow module to transform Pandas.Dataframe to parquet file through the pyarrow engine.
- The httpx module that allows to post request to the Wellbore DDMS.
# Prerequisite to run this notebook
!python -m pip install pip --upgrade
!pip install pandas numpy httpx pyarrowFor any call to Wellbore DDMS API's you need to pass into the header of the request a valid bearer token. This token can be obtained from any API catalog on the developer portal. You will need first to request a developer base subscription. Then from the developer base subscription pick any API and execute it. A valid bearer token is returns in the Curl section of the response. Copy this token value and assign it to the TOKEN variable below.
TOKEN = '' # Paste here the token without the bearer prefixHelper functions used in the different sample scripts of this tutorial.
from typing import List
import httpx
import pandas as pd
import numpy as np
import io
from IPython.display import display_html, display, HTML
from itertools import chain, cycle
def generate_df_typed(columns, index):
def gen_values(col_name, size):
if col_name.startswith('float'):
return np.random.random_sample(size=size)
if col_name.startswith('str'):
return [f'string_value_{i}' for i in range(size)]
if col_name.startswith('bool'):
return np.random.choice(a=[False, True], size=size)
if col_name.startswith('date'):
return (np.datetime4('2021-01-01') + days for days in range(size))
return np.random.randint(-100, 1000, size=size)
df = pd.DataFrame({c: gen_values(c, len(index))
for c in columns}, index=index)
return df
def multi_table(table_list):
'''Acceps a list of IpyTable objects and returns a table which contains each IpyTable in a cell'''
return HTML(
'<table><tr style="background-color:white;">' +
''.join(['<td>' + table._repr_html_() + '</td>' for table in table_list]) +
'</tr></table>'
)
def gen_color(color):
def fct(val=None):
return f'color: {color}'
return fct
def display_operation(before, sent, after):
colors = ['blue', 'green', 'orange', 'red']
color_fct = [gen_color(c) for c in colors]
sent_st = [sent[i].style.set_caption(f'chunk {i+1} sent').applymap(color_fct[i]) for i in range(len(sent))]
def color_output(s):
res = []
for r in s.index:
c = ''
for i in range(len(sent)):
if s.name in sent[i] and int(r) in sent[i][s.name]:
c = color_fct[i]()#f'color: {colors[i]}'
res.append(c)
return res
margin = '65'
after_st = after.style.set_table_attributes(f"style='margin-left:{margin}px'").apply(color_output).highlight_null(null_color='lightyellow').set_caption('Final data - After session commit')
display(multi_table([before.style.set_table_attributes(f"style='margin-right:{margin}px'").set_caption('Initial data - Before session'), *sent_st, after_st]))
def display_side_by_side(dfs:list, captions:list):
"""Display tables side by side to save vertical space
Input:
dfs: list of pandas.DataFrame
captions: list of table captions
"""
output = ""
combined = dict(zip(captions, dfs))
for caption, df in combined.items():
output += df.style.set_table_attributes("style='display:inline'").set_caption(caption)._repr_html_()
output += "\xa0\xa0\xa0"
display(HTML(output))
def generate_df(columns: List[str], index):
nbrows = len(index)
df = pd.DataFrame(
np.random.randint(-100, 1000, size=(nbrows, len(columns))), index=index)
df.columns= columns
return df
def print_response(resp):
print(f'{resp.request.method} : {resp.url} -> {resp.status_code}')
if resp.status_code != httpx.codes.OK:
display(resp.content)
def create_df_from_response(response):
"""Returns a dataframe created from the WellLog bulk data response
Input:
response: a httpx.response object
Output:
dataframe: a pandas.dataframe object
"""
content_type = response.headers.get('content-type')
if content_type == 'application/json':
return pd.DataFrame.from_dict(response.json())
elif content_type == 'application/x-parquet':
f = io.BytesIO(response.content)
f.seek(0)
return pd.read_parquet(f)
raise ValueError(f"Unknown content-type: '{content_type}'")
def display_previous_and_current_well_log_data_versions(record_id):
"""Display the previous and current WellLog data versions for a given record id and highlight differences between them.
Input:
record_id: a WellLog record id
"""
# list record version
results_response = client.get(f'{welllog_dms_url}/{record_id}/versions')
wellLog_versions_response = results_response.json()
versions = wellLog_versions_response['versions']
is_previous_results = False
is_current_results = False
if len(versions) >= 2:
previous_version_id = versions[len(versions)-2]
curl = f'{welllog_dms_url}/{record_id}/versions/{previous_version_id}/data'
results_response = client.get(curl)
if results_response.status_code == 200:
previous_results = create_df_from_response(results_response)
is_previous_results = True
current_version_id = versions[len(versions)-1]
curl = f'{welllog_dms_url}/{record_id}/versions/{current_version_id}/data'
results_response = client.get(curl)
if results_response.status_code == 200:
current_results = create_df_from_response(results_response)
is_current_results = True
colors = ['blue', 'red']
color_fct = [gen_color(c) for c in colors]
def color_output(s):
res = []
for r in s.index:
c = ''
if s.name in previous_results and int(r) in previous_results[s.name]:
c = color_fct[0]()
else:
c = color_fct[1]()
res.append(c)
return res
margin = '65'
tables = []
if is_previous_results:
previous_results_st = previous_results.style.set_table_attributes(f"style='margin-left:{margin}px'").highlight_null(null_color='lightyellow').set_caption('Previous WellLog data version').applymap(color_fct[0])
tables.append(previous_results_st)
if is_current_results:
if is_previous_results:
current_results_st = current_results.style.set_table_attributes(f"style='margin-left:{margin}px'").apply(color_output).highlight_null(null_color='lightyellow').set_caption('Current WellLog data version with data chunks added in red')
tables.append(current_results_st)
else:
current_results_st = current_results.style.set_table_attributes(f"style='margin-left:{margin}px'").highlight_null(null_color='lightyellow').set_caption('Current WellLog data version')
tables.append(current_results_st)
display(multi_table(tables))Several settings as the base url end-point and the data partition id to create a WellLog to the Wellbore DDMS. Please change those settings accordingly to the environment settings that you want to target.
base_url = "" # set a base URL value
data_partition_id = "" # set a data partition id
legal_tag = "" # set a valid legal tag in the data partition
acl_domain = "" # set an Access Control Lists (ACL) domain
welllog_dms_url = f'{base_url}/api/os-wellbore-ddms/ddms/v3/welllogs'
client = httpx.Client(verify=False,
headers={
"data-partition-id": f"{data_partition_id}",
"Authorization": f"Bearer {TOKEN}",
},
timeout=120
)
# Create a new WellLog. Here is a fake body just to illustrate the API use
record = {
"kind": "osdu:wks:work-product-component--WellLog:1.0.0",
"acl": {
"viewers": [f"data.default.viewers@{data_partition_id}.{acl_domain}"],
"owners": [f"data.default.owners@{data_partition_id}.{acl_domain}"]
},
"legal": {
"legaltags": [f"{legal_tag}"],
"otherRelevantDataCountries": ["US"],
},
"data": {""
"WellboreID": "namespace:master-data--Wellbore:SomeUniqueWellboreID:",
"Curves": [
{
"CurveID": "MD",
},
{
"CurveID": "X",
}
]
},
"version" : 0
}The script below is creating a WellLog record that is used in this tutorial to demonstrate how to write WellLog bulk data to the Wellbore DDMS.
response = client.post(welllog_dms_url, json=[record])
print_response(response)
record_id = response.json()["recordIds"][0]
record_idEach time that data are written to the WellLog, a new version is created to the Wellbore DDMS. This is true when writting the entire bulk data at once or even by chunks (cover in a next section of this tutorial). So when writting all bulk data at once, the payload is expected to contain the entire bulk data that replaces the previous bulk version by creating a new version. This new bulk version becomes the latest one and the current version that is returned by the GET WellLog bulk data API for the given record id.
The Wellbore DDMS bulk data API supports both Parquet and JSON formats. In order to target one of this format the 'Content-Type' must be set accordingly in the headers of the HTTP POST request. Wellbore DDMS API supports HTTP chunked encoding as well.
First of all let's generate a Pandas.Dataframe through the code below with 2 columns and 5 rows.
generated_dataframe = generate_df(['COLUMN_MD', 'COLUMN_X'], range(5))
generated_dataframe| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 986 | 712 |
| 1 | 311 | 348 |
| 2 | -27 | 339 |
| 3 | 230 | 191 |
| 4 | 162 | 740 |
Sending the whole dataframe to the WellLog bulk data.
data_to_send_parquet = generated_dataframe.to_parquet(path=None, engine="pyarrow")
headers = { 'content-type': 'application/x-parquet'}
print_response(client.post(f'{welllog_dms_url}/{record_id}/data', data=data_to_send_parquet, headers=headers))With the JSON format the orient parameter has to be set accordingly to the Pandas.Dataframe orientation. This orient value can be passed through the params argument of the HTTP POST request. Supported orient values are split and columns. The default orient value is set to split.
Here are examples of the same Pandas.Dataframe (5 rows and 2 columns) with different orientation:
split: {"columns":["COLUMN_MD","COLUMN_X"],"index":[0,1,2,3,4],"data":[[0.0,1001],[0.5,1002],[1.0,1003],[1.5,1004],[2.0,1005]]}
columns: {"COLUMN_MD":{"0":0.0,"1":0.5,"2":1.0,"3":1.5,"4":2.0},"COLUMN_X":{"0":1001,"1":1002,"2":1003,"3":1004,"4":1005}}
data_to_send_json = {
'index': [0, 1, 2, 3, 4],
'columns': ['COLUMN_MD', 'COLUMN_X'],
'data': [[265, 845], [92, 246], [804, 268], [645, 877], [-20, -28]]
}
params = {'orient':'split'}
print_response(client.post(f'{welllog_dms_url}/{record_id}/data', params=params, json=data_to_send_json))In order to write WellLog bulk data by chunks to the Wellbore DDMS you have to follow those 3 steps:
- Create a WellLog session - POST /ddms/v3/welllogs/{record_id}/sessions
- Send data by chunk in the session - POST /ddms/v3/welllogs/{record_id}/sessions/{session_id}/data
- Commit the session once all chunks are sent - PATCH /ddms/v3/welllogs/{welllog_id}/sessions/{session_id}
In step 3 you can also update the session or abandon. This is controlled by the state attribute that is passed in the JSON of the PATCH HTTP session API.
{ "state": "commit", "abandon" or "update" }
A session can be created with two different modes:
- update: existing data in previous WellLog version is merged with the data sent during the session when the session is committed.
- overwrite: existing data in previous WellLog version is ignored, the final result only contains data sent during the session when the session is committed. In this case the only way to retrieve the previous data is querying the previous WellLog version.
SESSION_MODE = 'update' # 'update' | 'overwrite'In the sample script below the WellLog data is ingested by chunk of row data. In the same session it is possible to liberate WellLog data with both JSON and Parquet formats as shown below:
# Create a session
create_session_response = client.post(f'{welllog_dms_url}/{record_id}/sessions', json={'mode': SESSION_MODE})
print_response(create_session_response)
session_data = create_session_response.json()
session_id = session_data['id']
print(f"Session created: {session_data['state']} with id {session_id}\n")
# append first chunk - JSON
chunk_1 = generate_df(['COLUMN_MD', 'COLUMN_X'], range(5,10))
response_chunk_1 = client.post(f'{welllog_dms_url}/{record_id}/sessions/{session_id}/data', json=chunk_1.to_dict(orient='split'))
print_response(response_chunk_1)
# append second chunk - JSON
chunk_2 = generate_df(['COLUMN_MD', 'COLUMN_X'], range(10,15))
response_chunk_2 = client.post(f'{welllog_dms_url}/{record_id}/sessions/{session_id}/data', json=chunk_2.to_dict(orient='split'))
print_response(response_chunk_2)
Once the whole WellLog data has been sent through the session, then the session needs to be committed using a session PATCH API call with the 'state' attribute sets to 'commit' value.
# Commit session
commit_session_response = client.patch(f'{welllog_dms_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'})
print_response(commit_session_response)
session = commit_session_response.json()
print('Session after commit =', session['state'])Or the session can be abandonned calling the session PATCH API with the 'state' attribute sets to 'abandon' value.
# OR else, ABANDON session
abandon_session_response = client.patch(f'{welllog_dms_url}/{record_id}/sessions/{session_id}', json={'state': 'abandon'})
print_response(abandon_session_response)
if abandon_session_response.status_code == httpx.codes.OK:
print('Session after commit =', abandon_session_response.json()['state'])SESSION_MODE = 'update'# Create a session to send parquet
create_session_response = client.post(f'{wellbore_dms_url}/{record_id}/sessions', json={'mode': SESSION_MODE})
print_response(create_session_response)
session_data = create_session_response.json()
session_id = session_data['id']
print(f"Session created: {session_data['state']} with id {session_id}\n")# append first chunk - PARQUET
chunk_3 = generate_df(['COLUMN_MD', 'COLUMN_X'], range(15,20))
headers = {'content-type': 'application/x-parquet'}
response_chunk_3 = client.post(f'{wellbore_dms_url}/{record_id}/sessions/{session_id}/data', data=chunk_3.to_parquet(engine="pyarrow"), headers=headers)
print_response(response_chunk_3)# append second chunk - PARQUET
chunk_4 = generate_df(['COLUMN_MD', 'COLUMN_X'], range(20,25))
headers = {'content-type': 'application/x-parquet'}
response_chunk_4 = client.post(f'{wellbore_dms_url}/{record_id}/sessions/{session_id}/data', data=chunk_4.to_parquet(engine="pyarrow"), headers=headers)
print_response(response_chunk_4)# commit session for parquet
print_response(client.patch(f'{wellbore_dms_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'}))The code below shows initial WellLog data before the session and chunks by rows inserted to the final WellLog data version after the session has been committed.
# Display result
results_response = client.get(f'{welllog_dms_url}/{record_id}/data')
results_cols_md_x = create_df_from_response(results_response)
display_operation(generated_dataframe, [chunk_1, chunk_2, chunk_3], results_cols_md_x)| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 957 | 190 |
| 1 | 649 | 907 |
| 2 | 598 | 697 |
| 3 | 396 | 8 |
| 4 | 57 | 297 |
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 5 | 462 | 95 |
| 6 | 275 | 946 |
| 7 | -79 | 965 |
| 8 | 174 | 5 |
| 9 | 848 | 344 |
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 10 | 252 | 929 |
| 11 | 390 | 629 |
| 12 | 449 | 986 |
| 13 | -34 | 400 |
| 14 | 607 | 272 |
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 15 | 390 | 915 |
| 16 | -73 | 368 |
| 17 | 277 | -21 |
| 18 | 543 | -78 |
| 19 | 754 | 94 |
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 20 | -82 | 27 |
| 21 | 431 | 933 |
| 22 | 318 | 465 |
| 23 | -3 | 593 |
| 24 | 256 | 130 |
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 265 | 845 |
| 1 | 92 | 246 |
| 2 | 804 | 268 |
| 3 | 645 | 877 |
| 4 | -20 | -28 |
| 5 | 462 | 95 |
| 6 | 275 | 946 |
| 7 | -79 | 965 |
| 8 | 174 | 5 |
| 9 | 848 | 344 |
| 10 | 252 | 929 |
| 11 | 390 | 629 |
| 12 | 449 | 986 |
| 13 | -34 | 400 |
| 14 | 607 | 272 |
| 15 | 390 | 915 |
| 16 | -73 | 368 |
| 17 | 277 | -21 |
| 18 | 543 | -78 |
| 19 | 754 | 94 |
| 20 | -82 | 27 |
| 21 | 431 | 933 |
| 22 | 318 | 465 |
| 23 | -3 | 593 |
| 24 | 256 | 130 |
It is possible to get access to the exhaustive list of versions created for a given WellLog id (GET /ddms/v3/welllogs/{welllogid}/versions). And then access the WellLog data for a given version (GET /ddms/v3/welllogs/{welllogid}/versions/{version}/data). This is what the function below is doing reading WellLog data of the previous and current version and highlighting differences between them. Differences when sending WellLog data in a session with update or overwrite mode is clearly illustrated through WellLog data previous and current versions returned by the function.
display_previous_and_current_well_log_data_versions(record_id)| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 265 | 845 |
| 1 | 92 | 246 |
| 2 | 804 | 268 |
| 3 | 645 | 877 |
| 4 | -20 | -28 |
| 5 | -29 | 832 |
| 6 | -15 | 107 |
| 7 | 339 | 212 |
| 8 | 823 | 240 |
| 9 | -97 | 349 |
| 10 | 183 | 89 |
| 11 | 194 | 276 |
| 12 | -7 | -7 |
| 13 | 446 | 829 |
| 14 | 32 | 706 |
| 15 | 914 | 740 |
| 16 | 593 | 279 |
| 17 | 304 | -57 |
| 18 | 697 | 145 |
| 19 | 775 | 247 |
| COLUMN_Y | |
|---|---|
| 5 | 192 |
| 6 | 816 |
| 7 | 61 |
| 8 | 658 |
| 9 | 104 |
| 10 | 704 |
| 11 | 681 |
| 12 | 393 |
| 13 | 329 |
| 14 | 402 |
| 15 | 418 |
| 16 | -9 |
| 17 | 857 |
| 18 | 845 |
| 19 | 78 |
| 20 | 484 |
| 21 | 384 |
| 22 | 658 |
| 23 | 622 |
| 24 | 459 |
| COLUMN_Z | |
|---|---|
| 10 | 141 |
| 11 | 478 |
| 12 | 72 |
| 13 | 476 |
| 14 | 434 |
| COLUMN_MD | COLUMN_X | COLUMN_Y | COLUMN_Z | |
|---|---|---|---|---|
| 0 | 265.000000 | 845.000000 | nan | nan |
| 1 | 92.000000 | 246.000000 | nan | nan |
| 2 | 804.000000 | 268.000000 | nan | nan |
| 3 | 645.000000 | 877.000000 | nan | nan |
| 4 | -20.000000 | -28.000000 | nan | nan |
| 5 | -29.000000 | 832.000000 | 192.000000 | nan |
| 6 | -15.000000 | 107.000000 | 816.000000 | nan |
| 7 | 339.000000 | 212.000000 | 61.000000 | nan |
| 8 | 823.000000 | 240.000000 | 658.000000 | nan |
| 9 | -97.000000 | 349.000000 | 104.000000 | nan |
| 10 | 183.000000 | 89.000000 | 704.000000 | 141.000000 |
| 11 | 194.000000 | 276.000000 | 681.000000 | 478.000000 |
| 12 | -7.000000 | -7.000000 | 393.000000 | 72.000000 |
| 13 | 446.000000 | 829.000000 | 329.000000 | 476.000000 |
| 14 | 32.000000 | 706.000000 | 402.000000 | 434.000000 |
| 15 | 914.000000 | 740.000000 | 418.000000 | nan |
| 16 | 593.000000 | 279.000000 | -9.000000 | nan |
| 17 | 304.000000 | -57.000000 | 857.000000 | nan |
| 18 | 697.000000 | 145.000000 | 845.000000 | nan |
| 19 | 775.000000 | 247.000000 | 78.000000 | nan |
| 20 | nan | nan | 484.000000 | nan |
| 21 | nan | nan | 384.000000 | nan |
| 22 | nan | nan | 658.000000 | nan |
| 23 | nan | nan | 622.000000 | nan |
| 24 | nan | nan | 459.000000 | nan |
The function below shows the differences between the current WellLog data version with new columns added by chunk and the previous version of the WellLog data.
display_previous_and_current_well_log_data_versions(record_id)| COLUMN_MD | COLUMN_X | COLUMN_Y | COLUMN_Z | |
|---|---|---|---|---|
| 0 | 265.000000 | 845.000000 | nan | nan |
| 1 | 92.000000 | 246.000000 | nan | nan |
| 2 | 804.000000 | 268.000000 | nan | nan |
| 3 | 645.000000 | 877.000000 | nan | nan |
| 4 | -20.000000 | -28.000000 | nan | nan |
| 5 | -29.000000 | 832.000000 | 192.000000 | nan |
| 6 | -15.000000 | 107.000000 | 816.000000 | nan |
| 7 | 339.000000 | 212.000000 | 61.000000 | nan |
| 8 | 823.000000 | 240.000000 | 658.000000 | nan |
| 9 | -97.000000 | 349.000000 | 104.000000 | nan |
| 10 | 183.000000 | 89.000000 | 704.000000 | 141.000000 |
| 11 | 194.000000 | 276.000000 | 681.000000 | 478.000000 |
| 12 | -7.000000 | -7.000000 | 393.000000 | 72.000000 |
| 13 | 446.000000 | 829.000000 | 329.000000 | 476.000000 |
| 14 | 32.000000 | 706.000000 | 402.000000 | 434.000000 |
| 15 | 914.000000 | 740.000000 | 418.000000 | nan |
| 16 | 593.000000 | 279.000000 | -9.000000 | nan |
| 17 | 304.000000 | -57.000000 | 857.000000 | nan |
| 18 | 697.000000 | 145.000000 | 845.000000 | nan |
| 19 | 775.000000 | 247.000000 | 78.000000 | nan |
| 20 | nan | nan | 484.000000 | nan |
| 21 | nan | nan | 384.000000 | nan |
| 22 | nan | nan | 658.000000 | nan |
| 23 | nan | nan | 622.000000 | nan |
| 24 | nan | nan | 459.000000 | nan |
| COLUMN_MD | COLUMN_X | COLUMN_Y | COLUMN_Z | |
|---|---|---|---|---|
| 0 | 614 | 964 | 108.000000 | nan |
| 1 | 887 | 155 | 979.000000 | nan |
| 2 | 865 | 179 | 533.000000 | nan |
| 3 | 343 | 167 | 235.000000 | nan |
| 4 | 212 | 100 | 497.000000 | nan |
| 5 | -52 | -98 | 608.000000 | nan |
| 6 | 738 | 573 | 781.000000 | nan |
| 7 | 151 | 138 | 646.000000 | nan |
| 8 | -21 | 378 | 157.000000 | nan |
| 9 | 178 | 266 | 895.000000 | nan |
| 10 | 172 | 596 | 705.000000 | 141.000000 |
| 11 | 521 | 618 | 873.000000 | 478.000000 |
| 12 | 592 | 832 | 298.000000 | 72.000000 |
| 13 | 560 | 831 | -82.000000 | 476.000000 |
| 14 | 926 | 179 | 484.000000 | 434.000000 |
| 15 | 901 | 486 | 446.000000 | nan |
| 16 | 610 | 472 | 456.000000 | nan |
| 17 | 587 | 325 | 776.000000 | nan |
| 18 | 463 | 653 | 208.000000 | nan |
| 19 | 9 | 923 | 236.000000 | nan |
| 20 | 138 | 460 | 795.000000 | nan |
| 21 | 715 | 362 | 760.000000 | nan |
| 22 | 590 | -91 | 160.000000 | nan |
| 23 | 642 | -18 | 667.000000 | nan |
| 24 | 679 | 54 | 447.000000 | nan |
As prerequisite a new WellLog record is created below to store array data. The WellLog is created with a MD column storing reference values and single WellLog values stored in a column X.
# Create new record for 2D curves
record_2d_response = client.post(welllog_dms_url, json=[record])
print_response(record_2d_response)
record_2d_id = record_2d_response.json()["recordIds"][0]
print(f"2D record created '{record_2d_id}'")
initial_df = generate_df(['COLUMN_MD', 'COLUMN_X'], range(10))
headers = { 'content-type': 'application/x-parquet'}
print_response(client.post(f'{welllog_dms_url}/{record_2d_id}/data', data=initial_df.to_parquet(engine="pyarrow"), headers=headers))By convention array data are added to the WellLog record through a Panda dataframe with columns that contain the name of the array and the column number between square bracket. The orient value has to be set to columns.
# Create a session
create_2d_session_response = client.post(f'{welllog_dms_url}/{record_2d_id}/sessions', json={'mode': 'update'})
print_response(create_2d_session_response)
session_id_2d = create_2d_session_response.json()['id']
# Send chunk data for 2D
arr_data_dataframe = generate_df(['2D[0]', '2D[1]'], range(15))
print_response(client.post(f'{welllog_dms_url}/{record_2d_id}/sessions/{session_id_2d}/data',
params={"orient": 'columns'},
headers={ 'content-type': 'application/json'},
data=arr_data_dataframe.to_json(orient='columns')))
# Commit session
print_response(client.patch(f'{welllog_dms_url}/{record_2d_id}/sessions/{session_id_2d}', json={'state': 'commit'}))The script below shows initial WellLog data before the session and array data added to the final WellLog data version after the session has been committed.
# Display result
bulk_2d_data_response = client.get(f'{welllog_dms_url}/{record_2d_id}/data')
bulk_2d_data = create_df_from_response(bulk_2d_data_response)
display_operation(initial_df, [arr_data_dataframe], bulk_2d_data)| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 752 | 700 |
| 1 | -36 | 241 |
| 2 | 883 | 107 |
| 3 | 177 | 159 |
| 4 | 156 | 801 |
| 5 | 277 | 597 |
| 6 | -1 | 202 |
| 7 | -21 | 669 |
| 8 | 334 | 291 |
| 9 | 771 | -56 |
| 2D[0] | 2D[1] | COLUMN_MD | COLUMN_X | |
|---|---|---|---|---|
| 0 | 676 | 702 | 752.000000 | 700.000000 |
| 1 | 983 | 588 | -36.000000 | 241.000000 |
| 2 | 948 | 422 | 883.000000 | 107.000000 |
| 3 | 272 | -59 | 177.000000 | 159.000000 |
| 4 | 986 | 869 | 156.000000 | 801.000000 |
| 5 | 563 | 131 | 277.000000 | 597.000000 |
| 6 | 703 | 31 | -1.000000 | 202.000000 |
| 7 | 375 | 538 | -21.000000 | 669.000000 |
| 8 | 244 | 416 | 334.000000 | 291.000000 |
| 9 | 761 | 580 | 771.000000 | -56.000000 |
| 10 | 825 | 222 | nan | nan |
| 11 | 174 | 644 | nan | nan |
| 12 | 871 | 857 | nan | nan |
| 13 | 880 | 780 | nan | nan |
| 14 | 783 | 883 | nan | nan |
This section explains how to replace values for specific curves in a specific range for a given WellLog record id. First let's create through the sample script below a new WellLog record with some bulk data posted as a JSON dataframe to the WellLog record.
# Create new record
response = client.post(welllog_dms_url, json=[record])
print_response(response)
record_id = response.json()["recordIds"][0]
record_id
# Add first bulk data to the record
df_cols_md_x_y_z = generate_df(['COLUMN_MD', 'COLUMN_X', 'COLUMN_Y', 'COLUMN_Z'], range(5))
print_response(client.post(f'{welllog_dms_url}/{record_id}/data', json=df_cols_md_x_y_z.to_dict(orient='split')))
check_data_response = client.get(f'{welllog_dms_url}/{record_id}/data')
print_response(check_data_response)
df_cols_md_x_y_z = create_df_from_response(check_data_response)
df_cols_md_x_y_z| COLUMN_MD | COLUMN_X | COLUMN_Y | COLUMN_Z | |
|---|---|---|---|---|
| 0 | -15 | -21 | 283 | 768 |
| 1 | 643 | 659 | -3 | 437 |
| 2 | 674 | 988 | 739 | 530 |
| 3 | -40 | 244 | 311 | 171 |
| 4 | 989 | 989 | 710 | 541 |
| COLUMN_MD | COLUMN_Y | |
|---|---|---|
| 0 | -91 | 877 |
| 1 | -28 | 336 |
| 2 | 971 | 648 |
| 3 | 458 | -50 |
| 4 | 569 | 89 |
| COLUMN_Z | |
|---|---|
| 3 | 964 |
| 4 | 991 |
| COLUMN_X | |
|---|---|
| 5 | 587 |
| 6 | 818 |
| 7 | 768 |
| COLUMN_MD | COLUMN_X | COLUMN_Y | COLUMN_Z | |
|---|---|---|---|---|
| 0 | -91.000000 | -21 | 877.000000 | 768.000000 |
| 1 | -28.000000 | 659 | 336.000000 | 437.000000 |
| 2 | 971.000000 | 988 | 648.000000 | 530.000000 |
| 3 | 458.000000 | 244 | -50.000000 | 964.000000 |
| 4 | 569.000000 | 989 | 89.000000 | 991.000000 |
| 5 | nan | 587 | nan | nan |
| 6 | nan | 818 | nan | nan |
| 7 | nan | 768 | nan | nan |
Each time that the WellLog record metadata or its associated bulk data are updated a new version of the WellLog record is created. This rule makes that the first version for a given WellLog record has never a bulk data associated to it as demonstrated by the script below:
# creating a new record
response = client.post(welllog_dms_url, json=[record])
print_response(response)
record_id = response.json()["recordIds"][0]
record_id
# posting bulk data to the WellLog record
initial_df = generate_df(['COLUMN_MD', 'COLUMN_X'], range(10))
headers = { 'content-type': 'application/x-parquet'}
print_response(client.post(f'{welllog_dms_url}/{record_id}/data', data=initial_df.to_parquet(engine="pyarrow"), headers=headers))
# checking for versions = 2 versions of the WellLog record with only the last one with associated bulk data
results_response = client.get(f'{welllog_dms_url}/{record_id}/versions')
wellLog_versions_response = results_response.json()
versions = wellLog_versions_response['versions']
for index, version in enumerate(versions):
print(f'{index}. version number: {version}')
version_data_response = client.get(f'{welllog_dms_url}/{record_id}/versions/{version}/data')
#print_response(version_data_response)
if version_data_response.status_code == 200:
version_df = create_df_from_response(version_data_response)
version_df_st = version_df.style.set_table_attributes(f"style='margin-left:65px'").highlight_null(null_color='lightyellow').set_caption(f'WellLog data version {version}')
display(multi_table([version_df_st]))
else:
print(f'\tNo bulk data associated to version {version}')- version number: 1627640423310341 No bulk data associated to version 1627640423310341
- version number: 1627640424041113
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 265 | 970 |
| 1 | 643 | -22 |
| 2 | -87 | 926 |
| 3 | 710 | 432 |
| 4 | 977 | 225 |
| 5 | 997 | 880 |
| 6 | 997 | 806 |
| 7 | 33 | 80 |
| 8 | 517 | 650 |
| 9 | 514 | 792 |
Through the wellbore DDMS API it is possible to write bulk data from a given version of the WellLog record. The example below shows a WellLog record with two different versions of the bulk data.
- First version contains only a column X
- Second version contains columns X and Y
If a column Z is written from the first version, only columns X and Z remains in the final version of the WellLog bulk data.
# creating a new record
response = client.post(welllog_dms_url, json=[record])
print_response(response)
record_id = response.json()["recordIds"][0]
record_id
# sending data for column A
generated_A_dataframe = generate_df(['COLUMN_MD','COLUMN_X'], range(10))
headers = { 'content-type': 'application/x-parquet'}
print_response(client.post(f'{welllog_dms_url}/{record_id}/data', data=generated_A_dataframe.to_parquet(engine="pyarrow"), headers=headers))
SESSION_MODE = 'update' # 'update' | 'overwrite'
# adding column B to the WellLog by chunk through a session
create_session_response = client.post(f'{welllog_dms_url}/{record_id}/sessions', json={'mode': SESSION_MODE})
print_response(create_session_response)
session_id = create_session_response.json()['id']
generated_B_dataframe = generate_df(['COLUMN_Y'], range(10))
print_response(client.post(f'{welllog_dms_url}/{record_id}/sessions/{session_id}/data', json=generated_B_dataframe.to_dict(orient='split')))
# Commit session
print_response(client.patch(f'{welllog_dms_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'}))
results_response = client.get(f'{welllog_dms_url}/{record_id}/versions')
wellLog_versions_response = results_response.json()
version = wellLog_versions_response['versions'][1]
# Create a session from previous version that contains only column A
session_json = {
'mode': SESSION_MODE,
'fromVersion': version
}
create_session_response = client.post(f'{welllog_dms_url}/{record_id}/sessions', json=session_json)
print_response(create_session_response)
session_id = create_session_response.json()['id']
# adding column C to the WellLog by chunk through a session and from the previous version
generated_C_dataframe = generate_df(['COLUMN_Z'], range(10))
print_response(client.post(f'{welllog_dms_url}/{record_id}/sessions/{session_id}/data', json=generated_C_dataframe.to_dict(orient='split')))
# Commit session
print_response(client.patch(f'{welllog_dms_url}/{record_id}/sessions/{session_id}', json={'state': 'commit'}))
# Display result
results_response = client.get(f'{welllog_dms_url}/{record_id}/versions')
wellLog_versions_response = results_response.json()
versions = wellLog_versions_response['versions']
titles = []
dataframes = []
for index, version in enumerate(versions):
version_data_response = client.get(f'{welllog_dms_url}/{record_id}/versions/{version}/data')
if version_data_response.status_code == 200:
if index == 3:
titles.append(f'{index}. version number {version} created from version {versions[1]}')
else:
titles.append(f'{index}. version number {version}')
version_df = create_df_from_response(version_data_response)
dataframes.append(version_df)
display_side_by_side(dataframes, titles)
| COLUMN_MD | COLUMN_X | |
|---|---|---|
| 0 | 345 | 18 |
| 1 | 845 | 863 |
| 2 | 290 | -62 |
| 3 | 947 | 698 |
| 4 | 562 | 825 |
| 5 | 79 | 450 |
| 6 | 809 | 153 |
| 7 | 53 | 450 |
| 8 | 121 | 793 |
| 9 | 352 | -97 |
| COLUMN_MD | COLUMN_X | COLUMN_Y | |
|---|---|---|---|
| 0 | 345 | 18 | 750 |
| 1 | 845 | 863 | 499 |
| 2 | 290 | -62 | 114 |
| 3 | 947 | 698 | 637 |
| 4 | 562 | 825 | 368 |
| 5 | 79 | 450 | 219 |
| 6 | 809 | 153 | 46 |
| 7 | 53 | 450 | 628 |
| 8 | 121 | 793 | 267 |
| 9 | 352 | -97 | 990 |
| COLUMN_MD | COLUMN_X | COLUMN_Z | |
|---|---|---|---|
| 0 | 345 | 18 | -31 |
| 1 | 845 | 863 | 431 |
| 2 | 290 | -62 | 322 |
| 3 | 947 | 698 | 5 |
| 4 | 562 | 825 | -53 |
| 5 | 79 | 450 | 949 |
| 6 | 809 | 153 | -47 |
| 7 | 53 | 450 | 195 |
| 8 | 121 | 793 | 291 |
| 9 | 352 | -97 | -95 |
The wellbore DDMS provides an API that allows to list the sessions used to write data for a given WellLog record id. The response returned by the API contains for each session some information as from which version the WellLog data have been written in the session.
sessions_response = client.get(f'{welllog_dms_url}/{record_id}/sessions')
sessions_response.json()[{'id': '23854a8c-9051-48c2-b3f0-2a3c632f85fc', 'recordId': 'data-partition-id:work-product-component--WellLog:30f8f5173cc444cca28582ee7814cc0d', 'fromVersion': 1627640429377696, 'mode': 'update', 'expiry': '2021-07-31T10:20:32.187305', 'createdTime': '2021-07-30T10:20:32.187305', 'updatedTime': '2021-07-30T10:20:34.001277', 'state': 'committed', 'meta': None}, {'id': 'd28ad3ff-30e2-40e1-ac96-a4efedd6b15e', 'recordId': 'data-partition-id:work-product-component--WellLog:30f8f5173cc444cca28582ee7814cc0d', 'fromVersion': 1627640429377696, 'mode': 'update', 'expiry': '2021-07-31T10:20:29.915170', 'createdTime': '2021-07-30T10:20:29.915170', 'updatedTime': '2021-07-30T10:20:31.832840', 'state': 'committed', 'meta': None}]
As for writing it is possible to specify the format to be returned when reading WellLog bulk data. This is done through the header passed to the GET http client request.
headers = {
'Accept': 'application/parquet' # 'application/parquet' | 'application/json'
}The whole WellLog bulk data can be read in one API call as below:
response = client.get(f'{welllog_dms_url}/{record_id}/data', headers=headers)
print_response(response)
create_df_from_response(response)| COLUMN_MD | COLUMN_X | COLUMN_Z | |
|---|---|---|---|
| 0 | 345 | 18 | -31 |
| 1 | 845 | 863 | 431 |
| 2 | 290 | -62 | 322 |
| 3 | 947 | 698 | 5 |
| 4 | 562 | 825 | -53 |
| 5 | 79 | 450 | 949 |
| 6 | 809 | 153 | -47 |
| 7 | 53 | 450 | 195 |
| 8 | 121 | 793 | 291 |
| 9 | 352 | -97 | -95 |
The GET WellLog data API allows you to pass the list of curves (WellLog data column names) to be returned into the response as follow:
response = client.get(f'{welllog_dms_url}/{record_id}/data', params={'curves': 'COLUMN_MD,COLUMN_Z'}, headers=headers)
print_response(response)
create_df_from_response(response)| COLUMN_MD | COLUMN_Z | |
|---|---|---|
| 0 | 345 | -31 |
| 1 | 845 | 431 |
| 2 | 290 | 322 |
| 3 | 947 | 5 |
| 4 | 562 | -53 |
| 5 | 79 | 949 |
| 6 | 809 | -47 |
| 7 | 53 | 195 |
| 8 | 121 | 291 |
| 9 | 352 | -95 |
For array data you can pass to the GET WellLog data API the name of the array and the column number between square bracket to specify which array columns you want to get returned into the response.
response = client.get(f'{welllog_dms_url}/{record_2d_id}/data', params={'curves': '2D[0],2D[1]'}, headers=headers)
print_response(response)
create_df_from_response(response)| 2D[0] | 2D[1] | |
|---|---|---|
| 0 | 676 | 702 |
| 1 | 983 | 588 |
| 2 | 948 | 422 |
| 3 | 272 | -59 |
| 4 | 986 | 869 |
| 5 | 563 | 131 |
| 6 | 703 | 31 |
| 7 | 375 | 538 |
| 8 | 244 | 416 |
| 9 | 761 | 580 |
| 10 | 825 | 222 |
| 11 | 174 | 644 |
| 12 | 871 | 857 |
| 13 | 880 | 780 |
| 14 | 783 | 883 |
Some additional filtering options are available when reading WellLog bulk data as:
- offset: starting index from which the data have to be read from the WellLog bulk data
- limit: the maximum number of rows to be returned.
response = client.get(f'{welllog_dms_url}/{record_id}/data',
params={'limit': 4, 'offset': 4, 'curves': 'COLUMN_MD,COLUMN_Z'},
headers=headers)
print_response(response)
create_df_from_response(response)| COLUMN_MD | COLUMN_Z | |
|---|---|---|
| 4 | 562 | -53 |
| 5 | 79 | 949 |
| 6 | 809 | -47 |
| 7 | 53 | 195 |
see WellLog schema.
rule 1: Each
CurveIDlisted indata.Curves.CurveIDmust be unique.rule 2: Ensure
data.ReferenceCurveIDexists indata.Curves.CurveIDlist.
Example
wellog record:
{
"id": "...",
"data": {
"ReferenceCurveID": "MD",
"SamplingStart": 7627.0,
"SamplingStopt": 7627.6,
"Curves": [
{
"CurveID": "CSHG",
"Mnemonic": "CSHG",
"LogCurveFamilyID": "data-partition-id:reference-data--LogCurveFamily:Core%20Mercury%20Saturation:",
"NumberOfColumns": 4
},
{
"CurveID": "MD",
"CurveUnit": "data-partition-id:reference-data--UnitOfMeasure:ft:",
"Mnemonic": "MD",
"LogCurveFamilyID": "data-partition-id:reference-data--LogCurveFamily:Measured%20Depth:",
"NumberOfColumns": 1
}
],
}rule 1: Each
Curves.CurveIDis unique, hereMDandCSHG.rule 2:
ReferenceCurveIDis set toMDandMDexistsCurves.CurveIDlist.
WellLog record can exist without bulk data.
When bulk is added\edited following checks to be done :
rule 3: Ensure
Curves.CurveIDlisted in the record match thecolumn namesin the bulk.rule 4: For each curve, ensure that
NumberOfColumnsmatches thecolumncount in the bulk for this curve.
Example
WellLog bulk data:
| DEPTH | CSHG[0] | CSHG[1] | CSHG[2] | CSHG[3] |
|---|---|---|---|---|
| 7627.0 | 0.573 | 0.573 | 0.573 | 0.573 |
| 7627.1 | 0.531 | 0.531 | 0.531 | 0.531 |
| 7627.2 | 0.653 | 0.653 | 0.653 | 0.653 |
| 7627.3 | 0.788 | 0.788 | 0.788 | 0.788 |
| 7627.4 | 0.034 | 0.034 | 0.034 | 0.034 |
| 7627.5 | 0.035 | 0.035 | 0.035 | 0.035 |
| 7627.6 | 0.607 | 0.607 | 0.607 | 0.607 |
using previous section well log record.
rule 3:
Curves.CurveIDlist,DEPTHandCSHGmatches thecolumn namesin the bulk. HereCSHGis an array with 4 columns: CSHG[0], CSHG[1], CSHG[2], CSHG[3].rule 4:
DEPTH.NumberOfColumnsmatches thecolumncount in the bulk ==> 1.CSHG.NumberOfColumnsmatches thecolumncount in the bulk ==> 4, CSHG[0], CSHG[1], CSHG[2], CSHG[3].
The following rules are only applied if the reference is type "Measured Depth".
rule 5: The values associated to the
ReferenceCurveIDin the record are monotonic.rule 6: The top and bottom bulk values associated to the
ReferenceCurveIDshould match valuesdata.SamplingStartanddata.SamplingStopin the record.
Example
from previous record and bulk data:
record:
{
"id": "...",
"data": {
"ReferenceCurveID": "MD",
"SamplingStart": 7627.0,
"SamplingStopt": 7627.6,bulk:
| DEPTH | ... |
|---|---|
| 7627.0 | ... |
| 7627.1 | ... |
| 7627.2 | ... |
| 7627.3 | ... |
| 7627.4 | ... |
| 7627.5 | ... |
| 7627.6 | ... |
- rule 5: The values associated to the
ReferenceCurveID,DEPTH, are monotonic: no duplicates, strictly increasing, no missing values. - rule 6:
data.SamplingStartmatches bulkDEPTHtop value ==> 7627.0.data.SamplingStopmatches bulkDEPTHbottom value ==> 7627.6.
see Wellbore trajectory schema
- rule 1: Each
Namelisted indata.AvailableTrajectoryStationProperties.Namemust be unique.
Example
Wellbore trajectory record:
{
"id": "...",
"data": {
"Name": "Index",
"WellboreID": "data-partition-id:master-data--Wellbore:71612d776:",
"TopDepthMeasuredDepth": 0.0,
"AzimuthReferenceType": "data-partition-id:reference-data--AzimuthReferenceType:truenorth:",
"BaseDepthMeasuredDepth": 7628.0,
"AvailableTrajectoryStationProperties": [
{
"TrajectoryStationPropertyTypeID": "data-partition-id:reference-data--TrajectoryStationPropertyType:BOREHOLE_AZIMUTH:",
"StationPropertyUnitID": "data-partition-id:reference-data--UnitOfMeasure:dega:",
"Name": "BOREHOLE_AZIMUTH"
},
{
"TrajectoryStationPropertyTypeID": "data-partition-id:reference-data--TrajectoryStationPropertyType:BOREHOLE_DEVIATION:",
"StationPropertyUnitID": "qa-weu-des-prod-testing-eu:reference-data--UnitOfMeasure:dega:",
"Name": "BOREHOLE_DEVIATION"
},
{
"TrajectoryStationPropertyTypeID": "data-partition-id:reference-data--TrajectoryStationPropertyType:MD:",
"StationPropertyUnitID": "data-partition-id:reference-data--UnitOfMeasure:ft:",
"Name": "MD"
}
]
}
}- rule 1:
AvailableTrajectoryStationProperties.Nameis unique, hereBOREHOLE_AZIMUTH,BOREHOLE_DEVIATIONandMD.
Wellbore trajectory record can exist without bulk data.
When bulk is added\edited following checks to be done :
- rule 2: Ensure
AvailableTrajectoryStationProperties.Namelisted in the record match thecolumn namesin the bulk.
Example
Wellbore trajectory bulk data:
| MD | BOREHOLE_AZIMUTH | BOREHOLE_DEVIATION |
|---|---|---|
| 0.0 | 360.573 | 0.573 |
| 0.5 | 360.531 | 0.531 |
| 1.0 | 360.653 | 0.653 |
| ... | ... | ... |
| 7627.5 | 360.035 | 0.035 |
| 7628.0 | 360.607 | 0.607 |
using previous section well log record.
- rule 2:
AvailableTrajectoryStationProperties.Namelisted in the record match thecolumn namesin the bulk, hereBOREHOLE_AZIMUTH,BOREHOLE_DEVIATIONandMD.
The following rules are only applied for TrajectoryStationPropertyType:MD.
rule 3: The values associated to the reference in the record must be monotonic.
rule 4: The top and bottom bulk values associated to the reference should match values
data.TopDepthMeasuredDepthanddata.BaseDepthMeasuredDepthin the record.
Example
from previous record and bulk data:
record:
{
"id": "...",
"data": {
"WellboreID": "data-partition-id:master-data--Wellbore:71612d776:",
"TopDepthMeasuredDepth": 0.0,
"BaseDepthMeasuredDepth": 7628.0,bulk:
| MD | ... |
|---|---|
| 0.0 | ... |
| 0.5 | ... |
| ... | ... |
| 7627.5 | ... |
| 7628.0 | ... |
rule 3: The values of
MDare monotonic: no duplicates, strictly increasing, no missing values.rule 4:
data.TopDepthMeasuredDepthmatches bulkMDtop value ==> 0.0.data.BaseDepthMeasuredDepthmatches bulkMDbottom value ==> 7628.0.