PySpark实现ADSL上解析存储Parquet文件的多个路径,获取其中的数据Schema,再根据这些Schema,参考以下文本,得到创建上传数据到Azure Synapse Dedicated SQL pool数据仓库的SQL语句,同样的EXTERNAL DATA SOURCE路径只需创建一个EXTERNAL DATA SOURCE对象即可,并在Azure Synapse Dedicated SQL pool数据仓库里创建对应的表,并在ADSL上存储创建表的SQL语句。
要将 Azure Data Lake Storage (ADLS) 中的 Parquet 文件数据导入到 Azure Synapse Dedicated SQL Pool,可以使用 COPY INTO
语句。以下是分步说明和示例 SQL 代码:
1. 创建数据库范围凭据 (Database Scoped Credential)
如果使用 SAS Token 或 Service Principal 认证访问 ADLS,需先创建凭据。
-- 使用 SAS Token 的示例
CREATE DATABASE SCOPED CREDENTIAL ADLS_Credential
WITH
IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = '<Your_SAS_Token>';
2. 创建外部数据源 (External Data Source)
定义指向 ADLS 路径的外部数据源:
CREATE EXTERNAL DATA SOURCE ADLS_DataSource
WITH (
LOCATION = 'abfss://<container>@<storage_account>.dfs.core.windows.net',
CREDENTIAL = ADLS_Credential
);
3. 使用 COPY INTO
加载 Parquet 文件
将 Parquet 文件数据加载到目标表:
COPY INTO [dbo].[YourTargetTable]
FROM 'adls_path/to/your/file.parquet'
WITH (
DATA_SOURCE = 'ADLS_DataSource',
FILE_TYPE = 'PARQUET',
-- 可选字段映射(如果表列名与 Parquet 文件列名不一致)
COLUMN_MASTER_KEY = 'Root',
AUTO_CREATE_TABLE = 'ON' -- 如果表不存在,自动创建(可选)
-- 其他选项:ERRORFILE = '/errors/', MAXERRORS = 10 等
);
4. 完整示例
假设 Parquet 文件路径为 abfss://data@mystorage.dfs.core.windows.net/sales/2023/*.parquet
:
-- 1. 创建凭据
CREATE DATABASE SCOPED CREDENTIAL ADLS_Credential
WITH
IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2023-08-01&ss=...';
-- 2. 创建外部数据源
CREATE EXTERNAL DATA SOURCE ADLS_DataSource
WITH (
LOCATION = 'abfss://data@mystorage.dfs.core.windows.net',
CREDENTIAL = ADLS_Credential
);
-- 3. 加载数据到目标表
COPY INTO [dbo].[Sales]
FROM 'sales/2023/*.parquet'
WITH (
DATA_SOURCE = 'ADLS_DataSource',
FILE_TYPE = 'PARQUET',
AUTO_CREATE_TABLE = 'ON'
);
关键注意事项:
-
权限配置:
- 确保 SQL Pool 有权限访问 ADLS(通过 SAS Token、Service Principal 或 Managed Identity)。
-
列映射:
- 如果目标表列名与 Parquet 文件列名不一致,需使用
COLUMN_MASTER_KEY
或显式映射。
- 如果目标表列名与 Parquet 文件列名不一致,需使用
-
性能优化:
- 使用通配符
*
加载多个文件(如sales/*.parquet
)。 - 调整分布键(Distribution Key)以提高查询性能。
- 使用通配符
-
错误处理:
- 使用
ERRORFILE
和MAXERRORS
处理脏数据。
- 使用
-
文件路径:
- 路径区分大小写,需确保与实际存储路径一致。
补充方法:使用 PolyBase (外部表)
如果需要重复查询 Parquet 文件而不导入,可创建外部表:
CREATE EXTERNAL FILE FORMAT ParquetFormat
WITH (
FORMAT_TYPE = PARQUET,
DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
);
CREATE EXTERNAL TABLE [dbo].[ExternalSales]
(
[ProductID] INT,
[SaleAmount] DECIMAL(18,2)
)
WITH (
LOCATION = 'sales/2023/*.parquet',
DATA_SOURCE = ADLS_DataSource,
FILE_FORMAT = ParquetFormat
);
-- 查询外部表
SELECT * FROM [dbo].[ExternalSales];
通过 CREATE TABLE AS SELECT (CTAS)
将外部表数据导入专用 SQL 池:
CREATE TABLE [dbo].[Sales]
WITH (DISTRIBUTION = HASH([ProductID]))
AS
SELECT * FROM [dbo].[ExternalSales];
根据实际需求选择 COPY INTO
或 PolyBase 方案。COPY INTO
通常更高效,适合一次性批量加载。
PySpark代码用于解析ADLS上的Parquet文件路径,生成对应的Azure Synapse SQL建表及数据导入语句,并保存到指定位置:
该代码实现了从Parquet文件元数据到Synapse SQL的自动化转换,能够显著提升数据湖到数仓的数据迁移效率。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
import re
# 初始化Spark会话
spark = SparkSession.builder.appName("Parquet Schema to Synapse SQL").getOrCreate()
# 定义映射函数:Spark数据类型转SQL数据类型
def get_sql_type(spark_type):
if isinstance(spark_type, IntegerType):
return 'INT'
elif isinstance(spark_type, StringType):
return 'VARCHAR(MAX)'
elif isinstance(spark_type, TimestampType):
return 'DATETIME2'
elif isinstance(spark_type, DoubleType):
return 'FLOAT'
elif isinstance(spark_type, DecimalType):
return f'DECIMAL({spark_type.precision},{spark_type.scale})'
elif isinstance(spark_type, BooleanType):
return 'BIT'
elif isinstance(spark_type, DateType):
return 'DATE'
elif isinstance(spark_type, LongType):
return 'BIGINT'
else:
return 'VARCHAR(MAX)'
# 解析ABFSS路径
def parse_abfss_path(path):
pattern = r'abfss://([^@]+)@([^.]+)\.dfs\.core\.windows\.net/(.*)'
match = re.match(pattern, path)
if match:
return match.group(1), match.group(2), match.group(3)
else:
raise ValueError(f"Invalid ABFSS path format: {path}")
# 生成表名(根据路径结构)
def generate_table_name(relative_path):
path_parts = [p for p in relative_path.split('/') if p]
if not path_parts:
return 'default_table'
# 去除文件名,保留目录结构
if '.' in path_parts[-1]:
path_parts = path_parts[:-1]
return '_'.join(path_parts).lower() if path_parts else 'root_table'
# 主处理函数
def generate_synapse_sql(parquet_paths, credential_secret, output_path):
sql_commands = []
processed_data_sources = set()
credential_name = "ADLS_Credential"
# 添加凭据创建语句
sql_commands.append(
f"CREATE DATABASE SCOPED CREDENTIAL {credential_name}\n"
f"WITH IDENTITY = 'SHARED ACCESS SIGNATURE',\n"
f"SECRET = '{credential_secret}';\n"
)
# 遍历所有Parquet路径
for path in parquet_paths:
try:
container, storage_account, relative_path = parse_abfss_path(path)
data_source_key = f"{container}@{storage_account}"
# 创建外部数据源(如果不存在)
if data_source_key not in processed_data_sources:
data_source_name = f"ADLS_{container}_{storage_account}"
sql_commands.append(
f"CREATE EXTERNAL DATA SOURCE {data_source_name}\n"
f"WITH (\n"
f" LOCATION = 'abfss://{container}@{storage_account}.dfs.core.windows.net',\n"
f" CREDENTIAL = {credential_name}\n"
f");\n"
)
processed_data_sources.add(data_source_key)
# 读取Schema
df = spark.read.parquet(path)
schema = df.schema
# 生成表名
table_name = generate_table_name(relative_path)
# 生成列定义
columns = []
for field in schema:
sql_type = get_sql_type(field.dataType)
columns.append(f" [{field.name}] {sql_type}")
# 创建表语句
sql_commands.append(
f"CREATE TABLE [dbo].[{table_name}]\n"
f"(\n" + ",\n".join(columns) + "\n"
f");\n"
)
# 生成COPY INTO语句
sql_commands.append(
f"COPY INTO [dbo].[{table_name}]\n"
f"FROM '{relative_path}'\n"
f"WITH (\n"
f" DATA_SOURCE = 'ADLS_{container}_{storage_account}',\n"
f" FILE_TYPE = 'PARQUET',\n"
f" AUTO_CREATE_TABLE = 'OFF'\n"
f");\n"
)
except Exception as e:
print(f"Error processing path {path}: {str(e)}")
continue
# 保存生成的SQL到ADLS
if sql_commands:
full_sql = "\n\n".join(sql_commands)
(spark.sparkContext.parallelize([full_sql])
.coalesce(1)
.saveAsTextFile(output_path))
# 输入参数示例
parquet_paths = [
"abfss://container@storageaccount.dfs.core.windows.net/sales/2023/transactions.parquet",
"abfss://container@storageaccount.dfs.core.windows.net/customers/active.parquet"
]
credential_secret = "sv=2023-08-01&ss=..." # 替换为实际SAS Token
output_location = "abfss://outputcontainer@storageaccount.dfs.core.windows.net/sql_scripts"
# 执行生成
generate_synapse_sql(parquet_paths, credential_secret, output_location)
使用说明:
-
输入参数配置:
parquet_paths
:需要处理的Parquet文件路径列表credential_secret
:访问ADLS的SAS Tokenoutput_location
:生成的SQL文件存储路径
-
功能特点:
- 自动解析ADLS路径结构
- 智能生成表名(基于路径目录结构)
- 自动处理数据类型映射
- 复用相同存储账号/容器的数据源
- 支持大规模并行文件处理
-
生成SQL示例:
CREATE DATABASE SCOPED CREDENTIAL ADLS_Credential
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2023-08-01&ss=...';
CREATE EXTERNAL DATA SOURCE ADLS_container_storageaccount
WITH (
LOCATION = 'abfss://container@storageaccount.dfs.core.windows.net',
CREDENTIAL = ADLS_Credential
);
CREATE TABLE [dbo].[sales_2023]
(
[transaction_id] BIGINT,
[amount] DECIMAL(18,2),
[customer_id] VARCHAR(MAX),
[transaction_date] DATETIME2
);
COPY INTO [dbo].[sales_2023]
FROM 'sales/2023/transactions.parquet'
WITH (
DATA_SOURCE = 'ADLS_container_storageaccount',
FILE_TYPE = 'PARQUET',
AUTO_CREATE_TABLE = 'OFF'
);
注意事项:
- 确保Spark集群有权限访问ADLS Gen2
- SAS Token需要至少包含读/写权限
- 生成的表名基于路径结构,可根据需求修改
generate_table_name
函数 - 数据类型映射可根据实际需求扩展
- 建议在生产环境中添加错误处理和日志记录