We needed a background script running within our sharepoint directory that would read through a list of files and organize them according to their ids: creating a parent folder with the name of the id or loading the file into an existing id folder.

Script:

from pyspark.sql import SparkSession
import requests
import json
import os
import time
import re

# Initialize Spark
spark = SparkSession.builder.appName("SharePointFileOrganizer").getOrCreate()

# Key Vault Access
KeyVaultSecret = mssparkutils.credentials.getSecret('[keyvaultpath]','scret') 
KeyVaultTenantId = mssparkutils.credentials.getSecret('[keyvaultpath]','scret') 
KeyVaultClientId = mssparkutils.credentials.getSecret('[keyvaultpath]','scret') 


# CONFIG - Need to be properly handled by Azure key vault
tenant_id = KeyVaultTenantId
client_id = KeyVaultClientId
client_secret = KeyVaultSecret
site_name = "[Sharepoint site name]"       
site_domain = "[Site domain name]"    
source_drive_name = "Files"              # Drive to read files from
target_drive_name = "Training Files"     # Drive to create folders and move files to
folder_path = "Test"                     # Folder within source drive to organize

# AUTHENTICATION
def get_access_token():
    token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    token_data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "scope": "https://graph.microsoft.com/.default"
    }
    token_r = requests.post(token_url, data=token_data)
    return token_r.json()["access_token"]

access_token = get_access_token()
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

# GET SITE ID
site_url = f"https://graph.microsoft.com/v1.0/sites/{site_domain}:/sites/{site_name}"
site_r = requests.get(site_url, headers=headers)
site_id = site_r.json()["id"]

# GET DRIVE IDs
drive_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives"
drive_r = requests.get(drive_url, headers=headers)
drives = drive_r.json()["value"]

source_drive_id = None
target_drive_id = None

for d in drives:
    if d["name"].lower() == source_drive_name.lower():
        source_drive_id = d["id"]
    elif d["name"].lower() == target_drive_name.lower():
        target_drive_id = d["id"]

if not source_drive_id:
    raise Exception(f"Source drive '{source_drive_name}' not found.")
if not target_drive_id:
    raise Exception(f"Target drive '{target_drive_name}' not found.")

# HELPER FUNCTIONS
def extract_number_from_filename(filename):
    """Extract the number from filename pattern [number]_[file_name].ext"""
    # Match pattern: starts with digits, followed by underscore
    match = re.match(r'^(\d+)_', filename)
    if match:
        return match.group(1)
    else:
        # If pattern doesn't match, return None or handle as needed
        print(f"Warning: Filename '{filename}' doesn't match expected pattern [number]_[file_name].ext")
        return None

def get_file_title(filename):
    """Extract number from filename (new logic for number-based organization)"""
    return extract_number_from_filename(filename)

def get_folder_contents(path="", drive_id=None):
    """Get all items (files and folders) in a specific path"""
    if not drive_id:
        drive_id = source_drive_id
        
    if path:
        folder_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{path}:/children"
    else:
        folder_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
    
    response = requests.get(folder_url, headers=headers)
    if response.status_code != 200:
        print(f"Error getting folder contents: {response.status_code}")
        print(response.text)
        return []
    
    return response.json().get("value", [])

def folder_exists(folder_name, parent_path="", drive_id=None):
    """Check if a folder exists in the given parent path"""
    if not drive_id:
        drive_id = target_drive_id  # Check in target drive by default
        
    contents = get_folder_contents(parent_path, drive_id)
    for item in contents:
        if item.get("folder") and item["name"].lower() == folder_name.lower():
            return True, item["id"]
    return False, None

def create_folder(folder_name, parent_path="", drive_id=None):
    """Create a new folder"""
    if not drive_id:
        drive_id = target_drive_id  # Create in target drive by default
        
    if parent_path:
        create_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root:/{parent_path}:/children"
    else:
        create_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{drive_id}/root/children"
    
    folder_data = {
        "name": folder_name,
        "folder": {}
    }
    
    response = requests.post(create_url, headers=headers, json=folder_data)
    if response.status_code == 201:
        print(f"Created folder: {folder_name} in {target_drive_name}")
        return response.json()["id"]
    else:
        print(f"Error creating folder {folder_name}: {response.status_code}")
        print(response.text)
        return None

def move_file(file_id, target_folder_id, original_filename, new_filename=None):
    """Move a file to a target folder (can be cross-drive) with optional rename"""
    move_url = f"https://graph.microsoft.com/v1.0/sites/{site_id}/drives/{source_drive_id}/items/{file_id}"
    
    move_data = {
        "parentReference": {
            "driveId": target_drive_id,
            "id": target_folder_id
        }
    }
    
    # If new filename is provided and different from original, include it in the move operation
    if new_filename and new_filename != original_filename:
        move_data["name"] = new_filename
        display_name = new_filename
    else:
        display_name = original_filename
    
    response = requests.patch(move_url, headers=headers, json=move_data)
    if response.status_code == 200:
        if new_filename and new_filename != original_filename:
            print(f"Moved and renamed file: {original_filename} -> {display_name} to {target_drive_name}")
        else:
            print(f"Moved file: {display_name} to {target_drive_name}")
        return True
    else:
        print(f"✗ Error moving file {original_filename} -> {display_name}: {response.status_code}")
        print(response.text)
        return False

def file_exists_in_folder(filename, folder_path_full, drive_id=None):
    """Check if a file already exists in the target folder"""
    if not drive_id:
        drive_id = target_drive_id
    
    contents = get_folder_contents(folder_path_full, drive_id)
    for item in contents:
        if item.get("file") and item["name"].lower() == filename.lower():
            return True
    return False

def generate_unique_filename(original_filename, folder_path_full, drive_id=None):
    """Generate a unique filename by adding a suffix if needed"""
    if not file_exists_in_folder(original_filename, folder_path_full, drive_id):
        return original_filename
    
    name, ext = os.path.splitext(original_filename)
    counter = 1
    
    while True:
        new_filename = f"{name}_{counter}{ext}"
        if not file_exists_in_folder(new_filename, folder_path_full, drive_id):
            return new_filename
        counter += 1


# MAIN PROCESSING
def organize_files():
    print("Starting file organization...")
    
    # Get all items in the Files directory
    contents = get_folder_contents(folder_path)
    
    # Separate files from existing folders (only in root directory)
    files_to_organize = []
    existing_folders = {}
    
    for item in contents:
        if item.get("file"):  # It's a file in the root directory
            files_to_organize.append(item)
        elif item.get("folder"):  # It's a folder in the root directory
            existing_folders[item["name"].lower()] = item["id"]
    
    print("Found files in root Files directory to organize:", len(files_to_organize))
    print("Found existing folders in root Files directory:", len(existing_folders))
    
    # Debug: Show what files were found
    if files_to_organize:
        print("\nFiles found in root directory:")
        for file_item in files_to_organize:
            print(f"  - {file_item['name']}")
    
    if existing_folders:
        print("\nExisting folders in root directory:")
        for folder_name in existing_folders.keys():
            print(f"  - {folder_name}")
    
    if not files_to_organize:
        print("No files found in root directory to organize.")
        return []
    
    # Process each file
    results = []
    for file_item in files_to_organize:
        filename = file_item["name"]
        file_id = file_item["id"]
        file_number = get_file_title(filename)  # This now extracts the number
        
        print(f"\nProcessing: {filename}")
        print(f"Extracted Number: {file_number}")
        
        # Skip files that don't match the expected pattern
        if file_number is None:
            print(f"Skipping file '{filename}' - doesn't match [number]_[file_name] pattern")
            results.append((filename, "N/A", "SKIPPED - Invalid filename pattern"))
            continue
        
        # Check if folder with this number already exists
        folder_exists_flag, folder_id = folder_exists(file_number, folder_path)
        
        if folder_exists_flag:
            print(f"Folder '{file_number}' already exists")
            target_folder_id = folder_id
            target_folder_path = f"{folder_path}/{file_number}" if folder_path else file_number
            
            # Check if file already exists and generate unique name if needed
            new_filename = generate_unique_filename(filename, target_folder_path)
            if new_filename != filename:
                print(f"File already exists, renaming to: {new_filename}")
            
        else:
            print(f"Creating new folder: {file_number}")
            target_folder_id = create_folder(file_number, folder_path)
            if not target_folder_id:
                print(f"Failed to create folder for {filename}")
                results.append((filename, file_number, "FAILED - Could not create folder"))
                continue
            new_filename = filename

        # Move the file with the determined filename
        success = move_file(file_id, target_folder_id, filename, new_filename)
        
        if success:
            results.append((filename, file_number, "SUCCESS"))
        else:
            results.append((filename, file_number, "FAILED - Could not move file"))
        
        # Small delay to avoid rate limiting
        time.sleep(0.5)
    
    return results

# EXECUTE ORGANIZATION
try:
    results = organize_files()
    
    # Create summary DataFrame
    df = spark.createDataFrame(results, ["FileName", "ExtractedNumber", "Status"])
    
    print("\n" + "="*60)
    print("ORGANIZATION SUMMARY")
    print("="*60)
    df.show(truncate=False)
    
    # Show statistics
    success_count = len([r for r in results if r[2] == "SUCCESS"])
    total_count = len(results)
    skipped_count = len([r for r in results if r[2].startswith("SKIPPED")])
    
    print(f"\nSUMMARY STATISTICS:")
    print(f"Total files processed: {total_count}")
    print(f"Successfully organized: {success_count}")
    print(f"Skipped (invalid pattern): {skipped_count}")
    print(f"Failed: {total_count - success_count - skipped_count}")
    
    if success_count == total_count - skipped_count:
        print("All valid files successfully organized!")
    elif skipped_count > 0:
        print("Some files were skipped due to invalid filename patterns. Check the details above.")
    else:
        print("Some files could not be organized. Check the details above.")

except Exception as e:
    print(f"An error occurred: {str(e)}")
    import traceback
    traceback.print_exc()

finally:
    spark.stop()

Leave a Reply