-->

Essential Spark Catalog Functions

Essential Spark Catalog Functions (Python & SQL)

Quick, practical reference for listing catalogs, databases, tables, views, columns, and performing advanced catalog operations in Apache Spark.

1) Catalog Management Functions

Note: For full catalog features (SHOW/CREATE VIEW persisted, etc.), enable Hive support or use an external metastore (e.g., AWS Glue, Unity Catalog).

Python: Initialize SparkSession with Hive support
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CatalogFunctions") \
    .enableHiveSupport() \
    .getOrCreate()

# Get the catalog object
catalog = spark.catalog

2) List Catalogs

Python & SQL
# List all available catalogs
catalogs = catalog.listCatalogs()
print(f"List of Catalogs using spark.catalog\n{catalogs}\n")

# Alternative: Using SQL
spark.sql("SHOW CATALOGS").show()

3) List Databases

Python & SQL
# List databases in current catalog
databases = catalog.listDatabases()
print(f"List of Databases using spark.catalog\n{databases}\n")

# List databases with pattern
databases = catalog.listDatabases("default*")
print(f"List of Databases using spark.catalog with pattern\n{databases}\n")

# Using SQL
spark.sql("SHOW DATABASES").show()
spark.sql("SHOW DATABASES LIKE 'default*'").show()

4) List Tables

Python & SQL
# List tables in current database
tables = catalog.listTables()
print(f"List of tables using spark.catalog in current database\n{tables}\n")

# List tables in specific database
tables = catalog.listTables("default")
print(f"List of tables using spark.catalog with specific database\n{tables}\n")

# List tables with pattern
tables = catalog.listTables("default", "order*").show()
print(f"List of tables using spark.catalog with pattern\n{tables}\n")

# Using SQL
spark.sql("SHOW TABLES").show()
spark.sql("SHOW TABLES IN default LIKE 'order*'").show()

5) List Views

Python workaround (reliable) & SQL
Note: Spark’s Python Catalog API doesn’t provide a direct “list views” method. The snippet below enumerates tables and filters to views safely.
# List views in a given database by enumerating tables
database_name = "learn_spark_db"

# Get list of tables (CatalogTable objects)
all_tables = spark.catalog.listTables(database_name)

# Normalize to a list of dictionaries for DataFrame creation
tables_dict_list = []
for t in all_tables:
    tables_dict_list.append({
        "name": t.name,
        "catalog": getattr(t, "catalog", None),
        "database": (t.namespace[0] if getattr(t, "namespace", None) else t.database if hasattr(t, "database") else database_name),
        "tableType": t.tableType,
        "isTemporary": t.isTemporary,
        "description": (t.description or "")
    })

# Create a DataFrame and filter to views
all_tables_df = spark.createDataFrame(tables_dict_list)
views_df = all_tables_df.filter(all_tables_df.tableType == "VIEW")

views_df.show(truncate=False)
# PySpark equivalent
spark.sql("SHOW VIEWS").show()
spark.sql("SHOW VIEWS IN learn_spark_db").show()

6) List Columns

Python & SQL
# List columns of a table
columns = catalog.listColumns("table_name")
columns.show()

# List columns with specific database
catalog.listColumns("table_name", "database_name").show()

# Using SQL
spark.sql("DESCRIBE TABLE extended database_name.table_name").show()

Complete Practical Example

End-to-end demo: DB, table, view, describe
from pyspark.sql import SparkSession

def demonstrate_catalog_functions():
    spark = SparkSession.builder \
        .appName("CatalogDemo") \\
        .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
        .enableHiveSupport() \
        .getOrCreate()
    
    catalog = spark.catalog
    
    print("=== CATALOGS ===")
    catalogs = catalog.listCatalogs()
    catalogs.show(truncate=False)
    
    print("=== DATABASES ===")
    databases = catalog.listDatabases()
    databases.show(truncate=False)
    
    # Create some test data
    spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
    spark.sql("USE test_db")
    
    # Create sample tables
    sample_data = [(1, "Alice"), (2, "Bob")]
    df = spark.createDataFrame(sample_data, ["id", "name"])
    df.write.mode("overwrite").saveAsTable("test_db.users")
    
    # Create a view
    spark.sql(""" 
    CREATE OR REPLACE VIEW test_db.user_view AS
    SELECT id, name FROM test_db.users WHERE id > 1
    """)
    
    print("=== TABLES IN test_db ===")
    tables = catalog.listTables("test_db")
    tables.show(truncate=False)
    
    print("=== COLUMNS IN users TABLE ===")
    columns = catalog.listColumns("users", "test_db")
    columns.show(truncate=False)
    
    print("=== TABLE DETAILS ===")
    spark.sql("DESCRIBE EXTENDED test_db.users").show(truncate=False)
    
    return spark

# Run the demonstration
spark = demonstrate_catalog_functions()

Advanced Catalog Operations

1) Set Current Catalog/Database

Python & SQL
# Set current catalog
spark.sql("USE CATALOG spark_catalog")

# Set current database
spark.sql("USE default")
catalog.setCurrentDatabase("default")

# Get current database
current_db = spark.sql("SELECT current_database()")
current_db.show()

2) Check if Database/Table Exists

Python helpers
def database_exists(db_name):
    return any(db.name == db_name for db in catalog.listDatabases())

def table_exists(table_name, db_name=None):
    if db_name:
        return any(t.name == table_name for t in catalog.listTables(db_name))
    else:
        return any(t.name == table_name for t in catalog.listTables())

# Usage
print(f"Default database exists: {database_exists('default')}")
print(f"Users table exists: {table_exists('users', 'test_db')}")

3) Get Table Metadata

Python helper
def get_table_metadata(table_name, db_name=None):
    """Get detailed table metadata"""
    tables = catalog.listTables(db_name) if db_name else catalog.listTables()
    table_info = [t for t in tables if t.name == table_name]
    if table_info:
        return table_info[0]
    return None

# Usage
metadata = get_table_metadata("users", "test_db")
if metadata:
    print(f"Table type: {metadata.tableType}")
    print(f"Is temporary: {metadata.isTemporary}")

4) Cache and Uncache Operations

Python
# Cache a table
catalog.cacheTable("test_db.users")

# Check (example pattern; listTables doesn't show cache state directly)
cached_tables = catalog.listTables().filter("isTemporary = false")
cached_tables.show()

# Uncache table
catalog.uncacheTable("test_db.users")

# Clear all cache
catalog.clearCache()

5) Refresh Metadata

Python
# Refresh table metadata (useful when external data changes)
catalog.refreshTable("test_db.users")

# Refresh by data path (for unmanaged/external tables)
catalog.refreshByPath("/path/to/data")

SQL Equivalent Commands

Pure SQL versions of the same ops
-- Show catalogs
SHOW CATALOGS;

-- Show databases with pattern
SHOW DATABASES LIKE 'test*';

-- Show tables with details
SHOW TABLES FROM test_db;

-- Describe table
DESCRIBE EXTENDED test_db.users;

-- Get table properties
SHOW TBLPROPERTIES test_db.users;

Practical Use Case: Catalog Explorer

Python: iterate catalogs → DBs → tables → columns
def explore_catalog():
    """Comprehensive catalog exploration function"""
    catalog = spark.catalog
    
    print("🎯 SPARK CATALOG EXPLORER")
    print("=" * 50)
    
    # List catalogs
    print("\\n📚 Available Catalogs:")
    for cat in catalog.listCatalogs().collect():
        print(f"  - {cat.name} (description: {cat.description})")
    
    # List databases
    print("\\n🏢 Databases:")
    for db in catalog.listDatabases().collect():
        print(f"  - {db.name} (location: {db.locationUri})")
    
    # For each database, list tables
    for db in catalog.listDatabases().collect():
        db_name = db.name
        print(f"\\n📊 Tables in database '{db_name}':")
        
        tables = catalog.listTables(db_name).collect()
        if not tables:
            print("  No tables found")
            continue
            
        for table in tables:
            table_type = "VIEW" if table.tableType == "VIEW" else "TABLE"
            print(f"  - {table.name} ({table_type})")
            
            # Show columns for each table
            columns = catalog.listColumns(table.name, db_name).collect()
            col_names = [col.name for col in columns]
            print(f"    Columns: {', '.join(col_names)}")

# Run the catalog explorer
explore_catalog()

Key Points to Remember

Function / Command Description
listCatalogs() Shows available catalogs (often spark_catalog by default).
listDatabases() Lists databases in the current catalog.
listTables() Lists tables in the current/specified database.
listColumns() Lists columns for a specific table.
Note Full catalog operations usually require Hive support or an external metastore.
SQL Equivalents SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, DESCRIBE, etc.
Metadata Refresh Metadata may be cached; use refreshTable() or refreshByPath() when sources change.

These catalog functions are essential for:

  • Data discovery and exploration
  • Metadata-driven applications
  • Data governance and quality checks
  • Dynamic query generation
  • Database/table management utilities