Essential Spark Catalog Functions
Essential Spark Catalog Functions (Python & SQL)
Quick, practical reference for listing catalogs, databases, tables, views, columns, and performing advanced catalog operations in Apache Spark.
1) Catalog Management Functions
Note: For full catalog features (SHOW/CREATE VIEW persisted, etc.), enable Hive support or use an external metastore (e.g., AWS Glue, Unity Catalog).
Python: Initialize SparkSession with Hive support
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CatalogFunctions") \
.enableHiveSupport() \
.getOrCreate()
# Get the catalog object
catalog = spark.catalog
2) List Catalogs
Python & SQL
# List all available catalogs
catalogs = catalog.listCatalogs()
print(f"List of Catalogs using spark.catalog\n{catalogs}\n")
# Alternative: Using SQL
spark.sql("SHOW CATALOGS").show()
3) List Databases
Python & SQL
# List databases in current catalog
databases = catalog.listDatabases()
print(f"List of Databases using spark.catalog\n{databases}\n")
# List databases with pattern
databases = catalog.listDatabases("default*")
print(f"List of Databases using spark.catalog with pattern\n{databases}\n")
# Using SQL
spark.sql("SHOW DATABASES").show()
spark.sql("SHOW DATABASES LIKE 'default*'").show()
4) List Tables
Python & SQL
# List tables in current database
tables = catalog.listTables()
print(f"List of tables using spark.catalog in current database\n{tables}\n")
# List tables in specific database
tables = catalog.listTables("default")
print(f"List of tables using spark.catalog with specific database\n{tables}\n")
# List tables with pattern
tables = catalog.listTables("default", "order*").show()
print(f"List of tables using spark.catalog with pattern\n{tables}\n")
# Using SQL
spark.sql("SHOW TABLES").show()
spark.sql("SHOW TABLES IN default LIKE 'order*'").show()
5) List Views
Python workaround (reliable) & SQL
Note: Spark’s Python Catalog API doesn’t provide a direct “list views” method. The snippet below enumerates tables and filters to views safely.
# List views in a given database by enumerating tables
database_name = "learn_spark_db"
# Get list of tables (CatalogTable objects)
all_tables = spark.catalog.listTables(database_name)
# Normalize to a list of dictionaries for DataFrame creation
tables_dict_list = []
for t in all_tables:
tables_dict_list.append({
"name": t.name,
"catalog": getattr(t, "catalog", None),
"database": (t.namespace[0] if getattr(t, "namespace", None) else t.database if hasattr(t, "database") else database_name),
"tableType": t.tableType,
"isTemporary": t.isTemporary,
"description": (t.description or "")
})
# Create a DataFrame and filter to views
all_tables_df = spark.createDataFrame(tables_dict_list)
views_df = all_tables_df.filter(all_tables_df.tableType == "VIEW")
views_df.show(truncate=False)
# PySpark equivalent
spark.sql("SHOW VIEWS").show()
spark.sql("SHOW VIEWS IN learn_spark_db").show()
6) List Columns
Python & SQL
# List columns of a table
columns = catalog.listColumns("table_name")
columns.show()
# List columns with specific database
catalog.listColumns("table_name", "database_name").show()
# Using SQL
spark.sql("DESCRIBE TABLE extended database_name.table_name").show()
Complete Practical Example
End-to-end demo: DB, table, view, describe
from pyspark.sql import SparkSession
def demonstrate_catalog_functions():
spark = SparkSession.builder \
.appName("CatalogDemo") \\
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()
catalog = spark.catalog
print("=== CATALOGS ===")
catalogs = catalog.listCatalogs()
catalogs.show(truncate=False)
print("=== DATABASES ===")
databases = catalog.listDatabases()
databases.show(truncate=False)
# Create some test data
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
spark.sql("USE test_db")
# Create sample tables
sample_data = [(1, "Alice"), (2, "Bob")]
df = spark.createDataFrame(sample_data, ["id", "name"])
df.write.mode("overwrite").saveAsTable("test_db.users")
# Create a view
spark.sql("""
CREATE OR REPLACE VIEW test_db.user_view AS
SELECT id, name FROM test_db.users WHERE id > 1
""")
print("=== TABLES IN test_db ===")
tables = catalog.listTables("test_db")
tables.show(truncate=False)
print("=== COLUMNS IN users TABLE ===")
columns = catalog.listColumns("users", "test_db")
columns.show(truncate=False)
print("=== TABLE DETAILS ===")
spark.sql("DESCRIBE EXTENDED test_db.users").show(truncate=False)
return spark
# Run the demonstration
spark = demonstrate_catalog_functions()
Advanced Catalog Operations
1) Set Current Catalog/Database
Python & SQL
# Set current catalog
spark.sql("USE CATALOG spark_catalog")
# Set current database
spark.sql("USE default")
catalog.setCurrentDatabase("default")
# Get current database
current_db = spark.sql("SELECT current_database()")
current_db.show()
2) Check if Database/Table Exists
Python helpers
def database_exists(db_name):
return any(db.name == db_name for db in catalog.listDatabases())
def table_exists(table_name, db_name=None):
if db_name:
return any(t.name == table_name for t in catalog.listTables(db_name))
else:
return any(t.name == table_name for t in catalog.listTables())
# Usage
print(f"Default database exists: {database_exists('default')}")
print(f"Users table exists: {table_exists('users', 'test_db')}")
3) Get Table Metadata
Python helper
def get_table_metadata(table_name, db_name=None):
"""Get detailed table metadata"""
tables = catalog.listTables(db_name) if db_name else catalog.listTables()
table_info = [t for t in tables if t.name == table_name]
if table_info:
return table_info[0]
return None
# Usage
metadata = get_table_metadata("users", "test_db")
if metadata:
print(f"Table type: {metadata.tableType}")
print(f"Is temporary: {metadata.isTemporary}")
4) Cache and Uncache Operations
Python
# Cache a table
catalog.cacheTable("test_db.users")
# Check (example pattern; listTables doesn't show cache state directly)
cached_tables = catalog.listTables().filter("isTemporary = false")
cached_tables.show()
# Uncache table
catalog.uncacheTable("test_db.users")
# Clear all cache
catalog.clearCache()
5) Refresh Metadata
Python
# Refresh table metadata (useful when external data changes)
catalog.refreshTable("test_db.users")
# Refresh by data path (for unmanaged/external tables)
catalog.refreshByPath("/path/to/data")
SQL Equivalent Commands
Pure SQL versions of the same ops
-- Show catalogs
SHOW CATALOGS;
-- Show databases with pattern
SHOW DATABASES LIKE 'test*';
-- Show tables with details
SHOW TABLES FROM test_db;
-- Describe table
DESCRIBE EXTENDED test_db.users;
-- Get table properties
SHOW TBLPROPERTIES test_db.users;
Practical Use Case: Catalog Explorer
Python: iterate catalogs → DBs → tables → columns
def explore_catalog():
"""Comprehensive catalog exploration function"""
catalog = spark.catalog
print("🎯 SPARK CATALOG EXPLORER")
print("=" * 50)
# List catalogs
print("\\n📚 Available Catalogs:")
for cat in catalog.listCatalogs().collect():
print(f" - {cat.name} (description: {cat.description})")
# List databases
print("\\n🏢 Databases:")
for db in catalog.listDatabases().collect():
print(f" - {db.name} (location: {db.locationUri})")
# For each database, list tables
for db in catalog.listDatabases().collect():
db_name = db.name
print(f"\\n📊 Tables in database '{db_name}':")
tables = catalog.listTables(db_name).collect()
if not tables:
print(" No tables found")
continue
for table in tables:
table_type = "VIEW" if table.tableType == "VIEW" else "TABLE"
print(f" - {table.name} ({table_type})")
# Show columns for each table
columns = catalog.listColumns(table.name, db_name).collect()
col_names = [col.name for col in columns]
print(f" Columns: {', '.join(col_names)}")
# Run the catalog explorer
explore_catalog()
Key Points to Remember
Function / Command | Description |
---|---|
listCatalogs() |
Shows available catalogs (often spark_catalog by default). |
listDatabases() |
Lists databases in the current catalog. |
listTables() |
Lists tables in the current/specified database. |
listColumns() |
Lists columns for a specific table. |
Note | Full catalog operations usually require Hive support or an external metastore. |
SQL Equivalents | SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, DESCRIBE , etc. |
Metadata Refresh | Metadata may be cached; use refreshTable() or refreshByPath() when sources change. |
These catalog functions are essential for:
- Data discovery and exploration
- Metadata-driven applications
- Data governance and quality checks
- Dynamic query generation
- Database/table management utilities
🔗 Try It Yourself
You can explore all of these catalog functions hands-on in my Jupyter notebook:
Post a Comment