Skip to content

Spark HDFS Slots

Spark HDFS slots that could be implemented by third-party plugins.

Added in 0.9.0

normalize_cluster_name(cluster) staticmethod

Normalize cluster name passed into SparkHDFS constructor.

If hooks didn't return anything, cluster name is left intact.

Added in 0.9.0

Parameters:

  • cluster (str) –

    Cluster name

Returns:

  • str | None

    Normalized cluster name.

    If hook cannot be applied to a specific cluster, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.normalize_cluster_name.bind
@hook
def normalize_cluster_name(cluster: str) -> str:
    return cluster.lower()

normalize_namenode_host(host, cluster) staticmethod

Normalize namenode host passed into SparkHDFS constructor.

If hooks didn't return anything, host is left intact.

Added in 0.9.0

Parameters:

  • host (str) –

    Namenode host (raw)

  • cluster (str) –

    Cluster name (normalized)

Returns:

  • str | None

    Normalized namenode host name.

    If hook cannot be applied to a specific host name, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.normalize_namenode_host.bind
@hook
def normalize_namenode_host(host: str, cluster: str) -> str | None:
    if cluster == "rnd-dwh":
        if not host.endswith(".domain.com"):
            # fix missing domain name
            host += ".domain.com"
        return host

    return None

get_known_clusters() staticmethod

Return collection of known clusters.

Cluster passed into SparkHDFS constructor should be present in this list. If hooks didn't return anything, no validation will be performed.

Added in 0.9.0

Returns:

  • set[str] | None

    Collection of cluster names (in normalized form).

    If hook cannot be applied, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_known_clusters.bind
@hook
def get_known_clusters() -> str[str]:
    return {"rnd-dwh", "rnd-prod"}

get_cluster_namenodes(cluster) staticmethod

Return collection of known namenodes for the cluster.

Namenode host passed into SparkHDFS constructor should be present in this list. If hooks didn't return anything, no validation will be performed.

Added in 0.9.0

Parameters:

  • cluster (str) –

    Cluster name (normalized)

Returns:

  • set[str] | None

    Collection of host names (in normalized form).

    If hook cannot be applied, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_cluster_namenodes.bind
@hook
def get_cluster_namenodes(cluster: str) -> str[str] | None:
    if cluster == "rnd-dwh":
        return {"namenode1.domain.com", "namenode2.domain.com"}
    return None

get_current_cluster() staticmethod

Get current cluster name.

Used in get_current_cluster to automatically fill up cluster attribute of a connection. If hooks didn't return anything, calling the method above will raise an exception.

Added in 0.9.0

Returns:

  • str | None

    Current cluster name (in normalized form).

    If hook cannot be applied, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_current_cluster.bind
@hook
def get_current_cluster() -> str:
    # some magic here
    return "rnd-dwh"

get_ipc_port(cluster) staticmethod

Get IPC port number for a specific cluster.

Used by constructor to automatically set port number if omitted.

Added in 0.9.0

Parameters:

  • cluster (str) –

    Cluster name (normalized)

Returns:

  • int | None

    IPC port number.

    If hook cannot be applied, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.get_ipc_port.bind
@hook
def get_ipc_port(cluster: str) -> int | None:
    if cluster == "rnd-dwh":
        return 8020  # Cloudera
    return None

is_namenode_active(host, cluster) staticmethod

Check whether a namenode of a specified cluster is active (=not standby) or not.

Used for:

  • If SparkHDFS connection is created without host

    Connector will iterate over get_cluster_namenodes of a cluster to get active namenode, and then use it instead of host attribute.

  • If SparkHDFS connection is created with host

    check will determine whether this host is active.

Added in 0.9.0

Parameters:

  • host (str) –

    Namenode host (normalized)

  • cluster (str) –

    Cluster name (normalized)

Returns:

  • bool | None

    True if namenode is active, False if not.

    If hook cannot be applied, it should return None.

Examples:

from onetl.connection import SparkHDFS
from onetl.hooks import hook


@SparkHDFS.Slots.is_namenode_active.bind
@hook
def is_namenode_active(host: str, cluster: str) -> bool:
    # some magic here
    return True