Accessing the EHR data from the TRE

pixl
tre
Author

Tom Young

Published

March 4, 2023

1 Accessing EHR data from the TRE

This notebook describes the process of accessing EHR data stored within a TRE-accessible Azure Data Lake Storage Gen2 (ADLS) instance.

Note that PII has been masked from reports stored in the TRE and as a consequence structural changes may have appeared with respect to the original data.

Features intended for use as inputs to machine learning models should not include or be derived from structural information from reports (e.g. line breaks, sentence length) stored in the storage instance detailed below.

1.0.1 Authenticate with Azure

Open the link shown in a browser outside of the TRE, enter the code and log in with your user account

!az login --use-device-code

1.0.2 Set some key variables - the storage account name, input data filesystem name and the directory containing files

storage_account_name = "stpixldflowehrprod"
input_data_fs_name="data-lake-storage-pixld-flowehr-prod"
data_directory_path="/"

Import dependencies and define functions to query data

#Function definitions inspired by MS docs
#at https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-directory-file-acl-python
import os, uuid, sys
from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azure.storage.filedatalake._models import ContentSettings
from azure.identity import DefaultAzureCredential, AzureCliCredential

class StorageClient:
    def __init__(self, storage_account_name):
        self.storage_account_name = storage_account_name
        self.service_client = self.initialize_storage_account_ad()

    def initialize_storage_account_ad(self):
        try:          
            credential = AzureCliCredential()
            service_client = DataLakeServiceClient(account_url=f"https://{self.storage_account_name}.dfs.core.windows.net", credential=credential)

            return service_client
        except Exception as e:
            print(e)
            return

    def download_file_from_directory(self, file_syst, directory, file_name):
        try:
            file_system_client = self.service_client.get_file_system_client(file_system=file_syst)
            directory_client = file_system_client.get_directory_client(directory)
            if not os.path.exists("downloaded_data"):
                os.makedirs("downloaded_data")
            print
            local_file = open(f"downloaded_data/{file_name}",'wb')
            file_client = directory_client.get_file_client(file_name)
            download = file_client.download_file()
            downloaded_bytes = download.readall()
            local_file.write(downloaded_bytes)
            local_file.close()
            return
        except Exception as e:
            print(e)
            return
                          
    def list_directory_contents(self, file_syst, directory):
        try:
            file_system_client = self.service_client.get_file_system_client(file_system=file_syst)

            paths = file_system_client.get_paths(path=directory)

            path_list = []
            for path in paths:
                path_list.append(path.name)
            return path_list

        except Exception as e:
            print(e)
            return
                             
    def create_directory(self, file_syst, directory):
        try:
            file_system_client = self.service_client.get_file_system_client(file_system=file_syst)
            file_system_client.create_directory(directory)
            return
        except Exception as e:
            print(e)
            return
    def upload_file_to_directory(self, file_syst, directory, uploaded_file_name, file_to_upload):
        try:
            file_system_client = self.service_client.get_file_system_client(file_system=file_syst)
            directory_client = file_system_client.get_directory_client(directory)
            file_client = directory_client.create_file(uploaded_file_name)
            with open(file_to_upload, 'r') as local_file:
                file_contents = local_file.read()
                file_client.append_data(data=file_contents, offset=0, length=len(file_contents))
                file_client.flush_data(len(file_contents))
            return
        except Exception as e:
            print(e)
            return

1.0.3 Create an instance of our StorageClient object

client = StorageClient(storage_account_name)

1.0.4 List the contents of the specified directory within the ADLS file system

available_files = client.list_directory_contents(input_data_fs_name, data_directory_path)
print(available_files)

1.0.5 Download all files from a directory

[client.download_file_from_directory(input_data_fs_name, data_directory_path, datafile.rsplit("/",1)[-1]) for datafile in available_files]

1.0.6 Download individual files from a directory

client.download_file_from_directory(input_data_fs_name, data_directory_path, available_files[0].rsplit("/", 1)[-1])

1.1 Reading downloaded files with pandas

import re
import pandas as pd
parquet = []
csv = []
for x in available_files:
    parquet_re = re.match("^.*\.parquet", x)
    csv_re = re.match("^.*\.csv", x)
    if parquet_re is not None:
        parquet.append(parquet_re.group(0))
    if csv_re is not None:
        csv.append(csv_re.group(0))

1.1.1 Parquet

local_df = pd.read_parquet(f"downloaded_data/{parquet[0].rsplit('/',1)[-1]}")
local_df.head()

1.1.2 CSV

local_df = pd.read_csv(f"downloaded_data/{csv[0].rsplit('/',1)[-1]}")
local_df.head()