Trying to create a pyspark script in Microsoft Fabric to automate JIRA API ticket creation (post) calls against a lakehouse delta table. To do so, I have to educate myself on the pyspark syntax and framework. I will be doing so in this blog post.
To reference a lakehouse delta table:
import json
lakehouse_path = "abfss://[path]" //complete address of a file or directory in a computer system
facilities_tickets_df = spark.read.format('delta').load(lakehouse_path) //read from data source and create data frame
first_record = facilities_tickets_df.limit(1).collect() //return a new dataframe that contains only the first row from the original dataframe
//Check if there's a record
if first_record:
//Convert the Row object to a dictionary
record_dict = first_record[0].asDict()
//Convert the dictionary to JSON format
record_json = json.dumps(record_dict, indent=4)
print("First Record in JSON Format:")
print(record_json)
else:
print("No records found in the table.")
Result:
First Record in JSON Format:
{
"No": "****",
"Date": "9/1/2024 7:01",
"Updated": "9/1/2024 9:26",
"Status": "Closed",
"Priority": "Medium",
"AlertLevel": "Closed",
"Tech": " ***** ",
"Location": " ***** ",
"RequestType": "3 - Facilities : General Duties",
"Subject": "Garbage bin",
"RequestDetail": "We need our garbage emptied, please?",
"Client": " ***** ",
"Notes": "\"<note date=\"\"09-01-24 9:25 am\"\" author=\"\"****\"\" type=\"\"tech\"\" time=\"\"0\"\" rate=\"\"None\"\">They are going to try to empty it today",
"Unnamed": " as they would like to take care of this before the parking lot fills up they may have to wait until tomorrow. </note>\"",
"TechEmail": " **** ",
"ClientEmail": " ***** ",
"NotesContent": "They are going to try to empty it today",
"NotesAuthor": " ***** ",
"JiraLocation": " **** ",
"SimpleRequestType": "General Duties",
"RTKey": "fc/4386150a-c1df-43ea-b848-1cd3799174a2"
}
Now, we have to try and take this code and push it over to a jira post call. To do this, I used a pyspark notebook and tested a ticket & comment creation for a specific record.
import json
import requests
from datetime import datetime
import re
def standardize_date(date_string):
# First, remove the time part if present
date_parts = date_string.split(" ")[0] # Take only the date part
# Handle different separators ("/" or "-")
if "/" in date_parts:
date_split = date_parts.split("/")
elif "-" in date_parts:
date_split = date_parts.split("-")
else:
return "Invalid date format"
# Check if the split has 3 parts (day, month, year)
if len(date_split) != 3:
return "Invalid date format"
# Ensure the month, day, and year are valid
day, month, year = date_split[0], date_split[1], date_split[2]
# Check for 2-digit year and convert to 4-digit year
if len(year) == 2:
year = "20" + year # Convert 2-digit year to 4-digit year
# Return the standardized date in yyyy-mm-dd format
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
def parse_notes(notes):
"""
This function processes the notes to extract content in a readable format.
It extracts the author and timestamp from the <note> tags and includes them in the formatted output.
"""
# A basic regex pattern to match the content within <note> tags along with author and date attributes
note_pattern = r'<note[^>]*date="([^"]+)"\s+author="([^"]+)"[^>]*>(.*?)</note>'
notes_content = ""
# Search for all <note> tags in the input string
matches = re.findall(note_pattern, notes, re.DOTALL)
for match in matches:
date, author, note_content = match
# Clean the note content (remove unwanted characters, fix encoding issues)
note_content = note_content.replace("", "").replace("\u200b", "") # Handle any stray characters like zero-width spaces
note_content = note_content.strip()
# If the note contains images, format them as a list or description
if re.search(r'\[image[^\]]*\]', note_content):
note_content = note_content.replace("]", "]\n") # Add new lines after each image reference
# Format the note with the author and timestamp at the beginning
formatted_note = f"**Author**: {author}\n**Date**: {date}\n\n{note_content}\n\n"
notes_content += formatted_note
return notes_content.strip() # Remove any leading/trailing whitespace
# Lakehouse path
lakehouse_path = "abfss://[path]"
# Read the Delta table
facilities_tickets_df = spark.read.format('delta').load(lakehouse_path)
# Filter for the specific ticket No
ticket_no = "175246"
record = facilities_tickets_df.filter(facilities_tickets_df["No"] == ticket_no).first() # Get the first matching record
# Check if a record was found
if record:
record_dict = record.asDict()
# Convert the dictionary to JSON format and print it
record_json = json.dumps(record_dict, indent=4)
print("Record in JSON Format:")
print(record_json)
# Extract required fields for Jira API
tech_email = record_dict.get("TechEmail", "")
jira_location = record_dict.get("JiraLocation", "")
request_detail = record_dict.get("RequestDetail", "")
subject = record_dict.get("Subject", "")
rt_key = record_dict.get("RTKey", "")
client_email = record_dict.get("ClientEmail", "")
notes = record_dict.get("Notes", "")
solarwinds_creation_date = record_dict.get("Date", "")
# Default summary to request_detail if it's null or contains a newline
if not subject or '\n' in subject or '\r' in subject:
subject = request_detail
# Clean up the subject by removing newlines, carriage returns, and extra whitespace
if subject:
subject = subject.replace('\n', ' ').replace('\r', ' ').strip() # Replace newline and carriage return with space and strip extra spaces
# Get reporter accountId via Jira user search API for the client (reporter)
jira_base_url = "[BaseURL]"
user_search_endpoint = f"{jira_base_url}/rest/api/3/user/search"
user_search_params = {"query": client_email}
auth = ("[Username]", "[APIKey]") # Replace with your actual Jira API key
user_response = requests.get(user_search_endpoint, params=user_search_params, auth=auth)
reporter_account_id = None # Default to None
if user_response.status_code == 200:
jira_users = user_response.json()
if jira_users:
reporter_account_id = jira_users[0].get("accountId", "")
else:
print(f"No Jira user found for email: {client_email}. Excluding 'reporter' field.")
else:
raise Exception(f"Failed to fetch Jira user. HTTP Status: {user_response.status_code}, Response: {user_response.text}")
# Get assignee accountId via Jira user search API for the tech_email
assignee_account_id = None # Default to None
if tech_email:
assignee_response = requests.get(user_search_endpoint, params={"query": tech_email}, auth=auth)
if assignee_response.status_code == 200:
assignee_users = assignee_response.json()
if assignee_users:
assignee_account_id = assignee_users[0].get("accountId", "")
else:
print(f"No Jira user found for tech email: {tech_email}. Excluding 'assignee' field.")
else:
raise Exception(f"Failed to fetch assignee user. HTTP Status: {assignee_response.status_code}, Response: {assignee_response.text}")
# Jira issue creation endpoint and payload
issue_creation_endpoint = f"{jira_base_url}/rest/api/3/issue"
issue_payload = {
"fields": {
"customfield_10152": {
"value": jira_location
},
"summary": subject, # Use the default if subject is null
"description": {
"type": "doc",
"version": 1,
"content": [
{
"type": "paragraph",
"content": [
{
"text": request_detail,
"type": "text"
}
]
}
]
},
"project": {
"id": "10022"
},
"customfield_10170": "N/A", # Room #
"customfield_10010": rt_key,
"issuetype": {
"id": "10085"
}
}
}
# Conditionally add 'assignee' and 'reporter' fields
if assignee_account_id:
issue_payload["fields"]["assignee"] = {
"id": assignee_account_id
}
if reporter_account_id:
issue_payload["fields"]["reporter"] = {
"accountId": reporter_account_id
}
if standardize_date(solarwinds_creation_date) != "Invalid date format":
issue_payload["fields"]["customfield_10205"] = standardize_date(solarwinds_creation_date)
# Make POST request to create Jira issue
headers = {
"Content-Type": "application/json"
}
response = requests.post(issue_creation_endpoint, auth=auth, headers=headers, data=json.dumps(issue_payload))
# After creating the Jira issue (success response)
if response.status_code == 201:
print("Jira issue created successfully:")
print(response.json())
# Get the issue key or ID from the response
issue_id_or_key = response.json().get("key", "")
# Get the timestamp for the comment (use the solarwinds_creation_date if available)
timestamp = get_formatted_timestamp(solarwinds_creation_date)
# Get the author (tech_email or client email)
author = tech_email if tech_email else client_email
# Format and clean up the notes content with author and timestamp
formatted_notes = parse_notes(notes)
# Add a comment to the created issue
comment_endpoint = f"{jira_base_url}/rest/api/3/issue/{issue_id_or_key}/comment"
comment_payload = {
"body": {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{
"text": formatted_notes, # Use the formatted notes content
"type": "text"
}]
}]
}
}
# Make POST request to add a comment
comment_response = requests.post(comment_endpoint, auth=auth, headers=headers, data=json.dumps(comment_payload))
if comment_response.status_code == 201:
print("Comment added to the Jira issue successfully.")
else:
print(f"Failed to add comment. HTTP Status: {comment_response.status_code}, Response: {comment_response.text}")
else:
print(f"Failed to create Jira issue. HTTP Status: {response.status_code}, Response: {response.text}")
This was able to produce a ticket in Jira and an associated comment.