Spark S3 Connection¶
Bases: SparkFileDFConnection
Spark connection to S3 filesystem.
Based on Hadoop-AWS module and Spark integration with Cloud Infrastructures.
See also
Before using this connector please take into account Spark S3 prerequisites
Note
Supports only reading files as Spark DataFrame and writing DataFrame to files.
Does NOT support file operations, like create, delete, rename, etc. For these operations, use S3 connection.
Added in 0.9.0
Parameters:
-
host(str) –Host of S3 source. For example:
domain.com -
port(int) –Port of S3 source
-
bucket(str) –Bucket name in the S3 file source
-
protocol(str, default:https) –Connection protocol. Allowed values:
httpsorhttp -
access_key(str) –Access key (aka user ID) of an account in the S3 service
-
secret_key(str) –Secret key (aka password) of an account in the S3 service
-
region(str) –Region name of bucket in S3 service Optional for some S3 implementations (MinIO, Ozone), but could be mandatory for others.
-
path_style_access(bool) –Trueto connect to bucket asprotocol://host/bucket,Falseto useprotocol://bucket.hostinstead. This depends on S3 implementation. -
session_token(str) –Session token generated by S3 STS service, if used.
-
extra(dict) –A dictionary of additional properties to be used when connecting to S3.
These are Hadoop AWS specific properties, see links below:
Options are passed without prefixes
spark.hadoop.,fs.s3a.andfs.s3a.bucket.$BUCKET., for example:extra = { "committer.magic.enabled": True, "committer.name": "magic", "connection.timeout": 300000, }Warning
Options that populated from connection attributes (like
endpoint,access.key) are not allowed to override.But you may override
aws.credentials.providerand pass custom credential options. -
spark(SparkSession) –Spark session
Examples:
from onetl.connection import SparkS3
from pyspark.sql import SparkSession
# Create Spark session with Hadoop AWS libraries loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.8")
# Some packages are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = SparkS3.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(excluded_packages))
.config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
.config("spark.hadoop.fs.s3a.committer.name", "magic")
.config(
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
)
.config(
"spark.sql.parquet.output.committer.class",
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
)
.config(
"spark.sql.sources.commitProtocolClass",
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
)
.getOrCreate()
)
# Create connection
s3 = SparkS3(
host="domain.com",
protocol="http",
bucket="my-bucket",
access_key="ACCESS_KEY",
secret_key="SECRET_KEY",
path_style_access=False,
region="us-east-1",
spark=spark,
).check()
# Create Spark session with Hadoop AWS libraries loaded
...
# Create connection
s3 = SparkS3(
host="domain.com",
protocol="http",
bucket="my-bucket",
access_key="ACCESS_KEY",
secret_key="SECRET_KEY",
path_style_access=True,
region="us-east-1",
spark=spark,
).check()
check()
¶
close()
¶
Close all connections created to S3.
Also resets all fs.s3a.bucket.$BUCKET.* properties of Hadoop configuration.
Note
Connection can be used again after it was closed.
Returns:
-
self–Connection itself.
Examples:
Close connection automatically:
with connection:
...
connection.close()
get_packages(spark_version, scala_version=None)
classmethod
¶
Get package names to be downloaded by Spark.
Added in 0.9.0
Parameters:
-
spark_version(str) –Spark version in format
major.minor.patch. -
scala_version(str, default:None) –Scala version in format
major.minor.If
None,spark_versionis used to determine Scala version.
Examples:
from onetl.connection import SparkS3
SparkS3.get_packages(spark_version="3.5.8")
SparkS3.get_packages(spark_version="3.5.8", scala_version="2.12")