Example: Email Analytics

Goals

Enrich your data warehouse and analyze thousands of emails in minutes with Trellis.

In this example, we will demonstrate how to Ingest the data into Trellis engine, Define your own transformation and monitor Trellis jobs.

Data Ingestion:

Create a new python file name ingestion.py and put in the following code:

In this steps, we’re using upload API and do the following:

  • Define the folder where we store our data
  • Put in API_KEY in the header for authorization
  • Define new project and specify the relevant parameters. Here’s a quick overview of the parameters:
    • doc_type: type of documents or object we’re ingesting. In this case, it’s txt file.
    • ext_ids: ids of each object in the folder/data sources
    • ext_file_names: name of each object in folder/data sources
      Refer to API documents here for a list of full parameters.
  • Make a request to Trellis Server
import os
import requests 

data_folder = "my_emails"
file_paths = [
    os.path.join(FOLDER_PATH, file)
    for file in os.listdir(FOLDER_PATH)
    if not file.startswith('.')
]
upload_url = "https://api.usetrellis.co/v1/assets/upload/"

headers = {
'Authorization': "YOUR_API_KEY"
}

data = {
    'file_type': 'txt', # document type 
    'proj_name': 'email_analysis', # project name 
    'ext_ids': file_paths,  # object ids 
    'files': file_paths  # 

}
files = [('files', (file_path, open(file_path, 'rb'))) for file_path in file_paths]

response = requests.post(upload_url, headers=headers, data=data, files=files)

Execute the module above

python ingestion.py

If the module run successfully, the results should be

{'message': 'Files upload initiated', 'data': {'asset_2dZ1mAJn1RSaAqJjsbBIl8E7D1h': {'created_at': '2024-03-11T23:49:02.664758+00:00', 'file_type': 'pdf', 'cust_id': 'cust_2dKaQIHZuESv1L1fvHRwcwOjtNU', 'ext_file_id': 'data/clinical-summary-input.pdf', 'ext_file_name': 'data/clinical-summary-input.pdf', 'updated_at': '2024-03-11T23:49:02.664758+00:00', 'status': 'processing'}}}

Define The Transformation:

After the asset is uploaded, we will now define a set of transformations we want to do on our assets.

In this example, we want to get the following information from the emails:

  • Is the email from an investor?
  • List of person names mentioned in the email
  • One line summary of the email

First, pick the proj_name (which is email_analysis in our case) ****to run a transformation on. Then define the set of transformation above along with the required output.

{
 "model": "trellis-premium",
 "mode": "document",
"operations": 
[ {
            "column_name": "names_list",
            "column_type": "text[]",
            "transform_type": "extraction",
            "task_description": "List of person names mentioned in the email",
            "output_values": float
        },
{
            "column_name": "is_investor",
            "column_type": "text",
            "transform_type": "classification",
            "task_description": "Is the email from an investor?",
            "output_values": {"yes":"this email is from investor", "no":"this email is not from investor"}
        },
]
}

Create Transformation

import requests
import json
initiate_url = "https://api.usetrellis.co/v1/transform/initiate"

headers = {
    'Authorization': YOUR_API_KEY,
    'Content-Type': 'application/json'
}
transformation_file = "transformation_param.json" 
with open(transformation_file, "r") as json_file:
    transforms_param = json.load(json_file)  

data = {
"proj_name": "email_analysis"
"transforms_param": transforms_param
}

response = requests.post(url, headers=headers)

print(response.text)

Results

{"message":"Transformation initiated","data":{"transform_id":"transform_2dZ1t5uLQDSPMiRn6tyhXupfIz6"}}

Monitor the transformation runs and get the results back

PING_INTERVAL = 5 # seconds
while True:
  status_url = "https://api.usetrellis.co/v1/transform/status/"
  data = { "ids": [transform_id] }
  headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "Authorization": YOUR_API_KEY
  }

  response = requests.get(status_url, json=data, headers=headers)
  response = json.loads(response.text)
  transformation_status = list(response["data"].values())[0]
  print(transformation_status)
  if (transformation_status == "completed") or (transformation_status == "failed"):
    break
  time.sleep(PING_INTERVAL)

If the response is successful, run the following code to get the results:

results_url = f"https://api.usetrellis.co/v1/transform/{transform_id}/results"
headers = {
    "accept": "application/json",
    "Authorization": YOUR_API_KEY
}
response = requests.get(results_url, headers=headers)
print(response.text)

To list all the transformation associated with a project:

list_transform_url = f"https://api.usetrellis.co/v1/transform/{PROJ_NAME}"

headers = {
    "accept": "application/json",
    "Authorization": YOUR_API_KEY
}
response = requests.get(list_transform_url, headers=headers)
print(response.text)