Skip to content

Spark S3 Connection

Bases: SparkFileDFConnection

Spark connection to S3 filesystem. support hooks

Based on Hadoop-AWS module and Spark integration with Cloud Infrastructures.

See also

Before using this connector please take into account Spark S3 prerequisites

Note

Supports only reading files as Spark DataFrame and writing DataFrame to files.

Does NOT support file operations, like create, delete, rename, etc. For these operations, use S3 connection.

Added in 0.9.0

Parameters:

  • host (str) –

    Host of S3 source. For example: domain.com

  • port (int) –

    Port of S3 source

  • bucket (str) –

    Bucket name in the S3 file source

  • protocol (str, default: https ) –

    Connection protocol. Allowed values: https or http

  • access_key (str) –

    Access key (aka user ID) of an account in the S3 service

  • secret_key (str) –

    Secret key (aka password) of an account in the S3 service

  • region (str) –

    Region name of bucket in S3 service Optional for some S3 implementations (MinIO, Ozone), but could be mandatory for others.

  • path_style_access (bool) –

    True to connect to bucket as protocol://host/bucket, False to use protocol://bucket.host instead. This depends on S3 implementation.

  • session_token (str) –

    Session token generated by S3 STS service, if used.

  • extra (dict) –

    A dictionary of additional properties to be used when connecting to S3.

    These are Hadoop AWS specific properties, see links below:

    Options are passed without prefixes spark.hadoop., fs.s3a. and fs.s3a.bucket.$BUCKET., for example:

    extra = {
        "committer.magic.enabled": True,
        "committer.name": "magic",
        "connection.timeout": 300000,
    }
    

    Warning

    Options that populated from connection attributes (like endpoint, access.key) are not allowed to override.

    But you may override aws.credentials.provider and pass custom credential options.

  • spark (SparkSession) –

    Spark session

Examples:

from onetl.connection import SparkS3
from pyspark.sql import SparkSession

# Create Spark session with Hadoop AWS libraries loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.8")
# Some packages are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = SparkS3.get_exclude_packages()
spark = (
    SparkSession.builder.appName("spark-app-name")
    .config("spark.jars.packages", ",".join(maven_packages))
    .config("spark.jars.excludes", ",".join(excluded_packages))
    .config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
    .config("spark.hadoop.fs.s3a.committer.name", "magic")
    .config(
        "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a",
        "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
    )
    .config(
        "spark.sql.parquet.output.committer.class",
        "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
    )
    .config(
        "spark.sql.sources.commitProtocolClass",
        "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol",
    )
    .getOrCreate()
)

# Create connection
s3 = SparkS3(
    host="domain.com",
    protocol="http",
    bucket="my-bucket",
    access_key="ACCESS_KEY",
    secret_key="SECRET_KEY",
    path_style_access=False,
    region="us-east-1",
    spark=spark,
).check()
# Create Spark session with Hadoop AWS libraries loaded
...

# Create connection
s3 = SparkS3(
    host="domain.com",
    protocol="http",
    bucket="my-bucket",
    access_key="ACCESS_KEY",
    secret_key="SECRET_KEY",
    path_style_access=True,
    region="us-east-1",
    spark=spark,
).check()

check()

Check source availability. support hooks

If not, an exception will be raised.

Returns:

  • self

    Connection itself.

Raises:

  • RuntimeError

    If the connection is not available

Examples:

connection.check()

close()

Close all connections created to S3. support hooks

Also resets all fs.s3a.bucket.$BUCKET.* properties of Hadoop configuration.

Note

Connection can be used again after it was closed.

Returns:

  • self

    Connection itself.

Examples:

Close connection automatically:

with connection:
    ...
Close connection manually:

connection.close()

get_packages(spark_version, scala_version=None) classmethod

Get package names to be downloaded by Spark. support hooks

Added in 0.9.0

Parameters:

  • spark_version (str) –

    Spark version in format major.minor.patch.

  • scala_version (str, default: None ) –

    Scala version in format major.minor.

    If None, spark_version is used to determine Scala version.

Examples:

from onetl.connection import SparkS3

SparkS3.get_packages(spark_version="3.5.8")
SparkS3.get_packages(spark_version="3.5.8", scala_version="2.12")

get_exclude_packages() classmethod

Get package names to be excluded by Spark. support hooks

Added in 0.13.0

Examples:

from onetl.connection import SparkS3

SparkS3.get_exclude_packages()