We needed a background script running within our sharepoint directory that would read through a list of files and organize them according to their ids: creating a parent folder with the name of the id or loading the file into an existing id folder.
Script:
from pyspark.sql import SparkSession
import requests
import json
import os
import time
import re
# Initialize Spark
spark = SparkSession.builder.appName("SharePointFileOrganizer").getOrCreate()
# Key Vault Access
KeyVaultSecret = mssparkutils.credentials.getSecret('[keyvaultpath]','scret')
KeyVaultTenantId = mssparkutils.credentials.getSecret('[keyvaultpath]','scret')
KeyVaultClientId = mssparkutils.credentials.getSecret('[keyvaultpath]','scret')
# CONFIG - Need to be properly handled by Azure key vault
tenant_id = KeyVaultTenantId
client_id = KeyVaultClientId
client_secret = KeyVaultSecret
site_name = "[Sharepoint site name]"
site_domain = "[Site domain name]"
source_drive_name = "Files" # Drive to read files from
target_drive_name = "Training Files" # Drive to create folders and move files to
folder_path = "Test" # Folder within source drive to organize
# AUTHENTICATION
def get_access_token():
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
token_data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": "https://graph.microsoft.com/.default"
}
token_r = requests.post(token_url, data=token_data)
return token_r.json()["access_token"]
access_token = get_access_token()
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# GET SITE ID
site_url = f"https://graph.microsoft.com/v1.0/sites/{site_domain}:/sites/{site_name}"
site_r = requests.get(site_url, headers=headers)
site_id = site_r.json()["id"]
# GET DRIVE IDs
drive_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
drive_r = requests.get(drive_url, headers=headers)
drives = drive_r.json()["value"]
source_drive_id = None
target_drive_id = None
for d in drives:
if d["name"].lower() == source_drive_name.lower():
source_drive_id = d["id"]
elif d["name"].lower() == target_drive_name.lower():
target_drive_id = d["id"]
if not source_drive_id:
raise Exception(f"Source drive '{source_drive_name}' not found.")
if not target_drive_id:
raise Exception(f"Target drive '{target_drive_name}' not found.")
# HELPER FUNCTIONS
def extract_number_from_filename(filename):
"""Extract the number from filename pattern [number]_[file_name].ext"""
# Match pattern: starts with digits, followed by underscore
match = re.match(r'^(\d+)_', filename)
if match:
return match.group(1)
else:
# If pattern doesn't match, return None or handle as needed
print(f"Warning: Filename '{filename}' doesn't match expected pattern [number]_[file_name].ext")
return None
def get_file_title(filename):
"""Extract number from filename (new logic for number-based organization)"""
return extract_number_from_filename(filename)
def get_folder_contents(path="", drive_id=None):
"""Get all items (files and folders) in a specific path"""
if not drive_id:
drive_id = source_drive_id
if path:
folder_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{path}:/children"
else:
folder_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
response = requests.get(folder_url, headers=headers)
if response.status_code != 200:
print(f"Error getting folder contents: {response.status_code}")
print(response.text)
return []
return response.json().get("value", [])
def folder_exists(folder_name, parent_path="", drive_id=None):
"""Check if a folder exists in the given parent path"""
if not drive_id:
drive_id = target_drive_id # Check in target drive by default
contents = get_folder_contents(parent_path, drive_id)
for item in contents:
if item.get("folder") and item["name"].lower() == folder_name.lower():
return True, item["id"]
return False, None
def create_folder(folder_name, parent_path="", drive_id=None):
"""Create a new folder"""
if not drive_id:
drive_id = target_drive_id # Create in target drive by default
if parent_path:
create_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{parent_path}:/children"
else:
create_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
folder_data = {
"name": folder_name,
"folder": {}
}
response = requests.post(create_url, headers=headers, json=folder_data)
if response.status_code == 201:
print(f"Created folder: {folder_name} in {target_drive_name}")
return response.json()["id"]
else:
print(f"Error creating folder {folder_name}: {response.status_code}")
print(response.text)
return None
def move_file(file_id, target_folder_id, original_filename, new_filename=None):
"""Move a file to a target folder (can be cross-drive) with optional rename"""
move_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{source_drive_id}/items/{file_id}"
move_data = {
"parentReference": {
"driveId": target_drive_id,
"id": target_folder_id
}
}
# If new filename is provided and different from original, include it in the move operation
if new_filename and new_filename != original_filename:
move_data["name"] = new_filename
display_name = new_filename
else:
display_name = original_filename
response = requests.patch(move_url, headers=headers, json=move_data)
if response.status_code == 200:
if new_filename and new_filename != original_filename:
print(f"Moved and renamed file: {original_filename} -> {display_name} to {target_drive_name}")
else:
print(f"Moved file: {display_name} to {target_drive_name}")
return True
else:
print(f"✗ Error moving file {original_filename} -> {display_name}: {response.status_code}")
print(response.text)
return False
def file_exists_in_folder(filename, folder_path_full, drive_id=None):
"""Check if a file already exists in the target folder"""
if not drive_id:
drive_id = target_drive_id
contents = get_folder_contents(folder_path_full, drive_id)
for item in contents:
if item.get("file") and item["name"].lower() == filename.lower():
return True
return False
def generate_unique_filename(original_filename, folder_path_full, drive_id=None):
"""Generate a unique filename by adding a suffix if needed"""
if not file_exists_in_folder(original_filename, folder_path_full, drive_id):
return original_filename
name, ext = os.path.splitext(original_filename)
counter = 1
while True:
new_filename = f"{name}_{counter}{ext}"
if not file_exists_in_folder(new_filename, folder_path_full, drive_id):
return new_filename
counter += 1
# MAIN PROCESSING
def organize_files():
print("Starting file organization...")
# Get all items in the Files directory
contents = get_folder_contents(folder_path)
# Separate files from existing folders (only in root directory)
files_to_organize = []
existing_folders = {}
for item in contents:
if item.get("file"): # It's a file in the root directory
files_to_organize.append(item)
elif item.get("folder"): # It's a folder in the root directory
existing_folders[item["name"].lower()] = item["id"]
print("Found files in root Files directory to organize:", len(files_to_organize))
print("Found existing folders in root Files directory:", len(existing_folders))
# Debug: Show what files were found
if files_to_organize:
print("\nFiles found in root directory:")
for file_item in files_to_organize:
print(f" - {file_item['name']}")
if existing_folders:
print("\nExisting folders in root directory:")
for folder_name in existing_folders.keys():
print(f" - {folder_name}")
if not files_to_organize:
print("No files found in root directory to organize.")
return []
# Process each file
results = []
for file_item in files_to_organize:
filename = file_item["name"]
file_id = file_item["id"]
file_number = get_file_title(filename) # This now extracts the number
print(f"\nProcessing: {filename}")
print(f"Extracted Number: {file_number}")
# Skip files that don't match the expected pattern
if file_number is None:
print(f"Skipping file '{filename}' - doesn't match [number]_[file_name] pattern")
results.append((filename, "N/A", "SKIPPED - Invalid filename pattern"))
continue
# Check if folder with this number already exists
folder_exists_flag, folder_id = folder_exists(file_number, folder_path)
if folder_exists_flag:
print(f"Folder '{file_number}' already exists")
target_folder_id = folder_id
target_folder_path = f"{folder_path}/{file_number}" if folder_path else file_number
# Check if file already exists and generate unique name if needed
new_filename = generate_unique_filename(filename, target_folder_path)
if new_filename != filename:
print(f"File already exists, renaming to: {new_filename}")
else:
print(f"Creating new folder: {file_number}")
target_folder_id = create_folder(file_number, folder_path)
if not target_folder_id:
print(f"Failed to create folder for {filename}")
results.append((filename, file_number, "FAILED - Could not create folder"))
continue
new_filename = filename
# Move the file with the determined filename
success = move_file(file_id, target_folder_id, filename, new_filename)
if success:
results.append((filename, file_number, "SUCCESS"))
else:
results.append((filename, file_number, "FAILED - Could not move file"))
# Small delay to avoid rate limiting
time.sleep(0.5)
return results
# EXECUTE ORGANIZATION
try:
results = organize_files()
# Create summary DataFrame
df = spark.createDataFrame(results, ["FileName", "ExtractedNumber", "Status"])
print("\n" + "="*60)
print("ORGANIZATION SUMMARY")
print("="*60)
df.show(truncate=False)
# Show statistics
success_count = len([r for r in results if r[2] == "SUCCESS"])
total_count = len(results)
skipped_count = len([r for r in results if r[2].startswith("SKIPPED")])
print(f"\nSUMMARY STATISTICS:")
print(f"Total files processed: {total_count}")
print(f"Successfully organized: {success_count}")
print(f"Skipped (invalid pattern): {skipped_count}")
print(f"Failed: {total_count - success_count - skipped_count}")
if success_count == total_count - skipped_count:
print("All valid files successfully organized!")
elif skipped_count > 0:
print("Some files were skipped due to invalid filename patterns. Check the details above.")
else:
print("Some files could not be organized. Check the details above.")
except Exception as e:
print(f"An error occurred: {str(e)}")
import traceback
traceback.print_exc()
finally:
spark.stop()