API 文档

dask_yarn.YarnCluster(environment=None, n_workers=None, worker_vcores=None, worker_memory=None, worker_restarts=None, worker_env=None, worker_class=None, worker_options=None, worker_gpus=None, scheduler_vcores=None, scheduler_gpus=None, scheduler_memory=None, deploy_mode=None, name=None, queue=None, tags=None, user=None, host=None, port=None, dashboard_address=None, skein_client=None, asynchronous=False, loop=None)

在 YARN 上启动 Dask 集群。

您可以在 Dask 的 yarn.yaml 配置文件中为此定义默认值。有关更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html

参数
environment : str, 可选

要使用的 Python 环境。可以是以下之一:

  • 已归档 Python 环境的路径
  • conda 环境的路径,指定为 conda:///…
  • 虚拟环境的路径,指定为 venv:///…
  • python 可执行文件的路径,指定为 python:///…

请注意,如果不是归档文件,指定的路径必须在集群中的所有节点上有效。

n_workers : int, 可选

初始启动的 worker 数量。

worker_vcores : int, 可选

为每个 worker 分配的虚拟核心数量。

worker_memory : str, 可选

为每个 worker 分配的内存量。接受单位后缀(例如 '2 GiB' 或 '4096 MiB')。将向上取整到最接近的 MiB。

worker_restarts : int, 可选

在应用程序失败之前允许的最大 worker 重启次数。默认为无限制。

worker_env : dict, 可选

环境变量及其值的映射。这些变量将在启动 dask worker 之前在 worker 容器中设置。

worker_gpus : int, 可选

为每个 worker 分配的 gpu 数量

scheduler_vcores : int, 可选

为每个 scheduler 分配的虚拟核心数量。

scheduler_gpus : int, 可选

为每个 scheduler 分配的 gpu 数量

scheduler_memory : str, 可选

为 scheduler 分配的内存量。接受单位后缀(例如 '2 GiB' 或 '4096 MiB')。将向上取整到最接近的 MiB。

deploy_mode : {‘remote’, ‘local’}, 可选

要使用的部署模式。如果为 'remote',scheduler 将部署在 YARN 容器中。如果为 'local',scheduler 将在本地运行,这对于调试很有用。默认为 'remote'

name : str, 可选

应用程序名称。

queue : str, 可选

要部署到的队列。

tags : 序列, 可选

一组用作此应用程序标签的字符串。

user : str, 可选

代表提交应用程序的用户。默认为当前用户 - 以不同用户身份提交需要用户权限,请参阅 YARN 文档了解更多信息。

host : str, 可选

scheduler 监听的主机地址。仅在 deploy_mode='local' 时使用。默认为 '0.0.0.0'

port : int, 可选

scheduler 监听的端口。仅在 deploy_mode='local' 时使用。默认为 0 表示随机端口。

dashboard_address : str

dashboard 服务器将监听的地址。仅在 deploy_mode='local' 时使用。默认为 ':0' 表示随机端口。

skein_client : skein.Client, 可选

要使用的 skein.Client。如果未提供,将启动一个。

asynchronous : bool, 可选

如果为 True,则在异步模式下启动集群,以便在其他异步代码中使用。

loop : IOLoop, 可选

要使用的 IOLoop 实例。在异步模式下默认为当前循环,否则启动一个后台循环。

示例

>>> cluster = YarnCluster(environment='my-env.tar.gz', ...)
>>> cluster.scale(10)
adapt(minimum=0, maximum=inf, interval='1s', wait_count=3, target_duration='5s', **kwargs)

开启自适应功能

这将根据 scheduler 的活动自动扩缩 Dask 集群。

参数
minimum : int, 可选

最少 worker 数量。默认为 0

maximum : int, 可选

最多 worker 数量。默认为 inf

interval : timedelta 或 str, 可选

worker 添加/删除建议之间的时间间隔。

wait_count : int, 可选

在实际移除 worker 之前,连续建议移除该 worker 的次数。

target_duration : timedelta 或 str, 可选

我们希望计算花费的时间。这会影响我们扩缩的积极程度。

**kwargs

传递给 distributed.Scheduler.workers_to_close 的额外参数。

示例

>>> cluster.adapt(minimum=0, maximum=10)
close(**kwargs)

关闭此集群。shutdown 的别名。

另请参阅

shutdown

dask dashboard 的链接。如果 dashboard 未运行,则为 None

类方法 from_application_id(app_id, skein_client=None, asynchronous=False, loop=None)

连接到具有给定应用程序 ID 的现有 YarnCluster

参数
app_id : str

现有集群的应用程序 ID。

skein_client : skein.Client

要使用的 skein.Client。如果未提供,将启动一个。

asynchronous : bool, 可选

如果为 True,则在异步模式下启动集群,以便在其他异步代码中使用。

loop : IOLoop, 可选

要使用的 IOLoop 实例。在异步模式下默认为当前循环,否则启动一个后台循环。

返回
YarnCluster
类方法 from_current(asynchronous=False, loop=None)

从集群内部连接到现有 YarnCluster

参数
asynchronous : bool, 可选

如果为 True,则在异步模式下启动集群,以便在其他异步代码中使用。

loop : IOLoop, 可选

要使用的 IOLoop 实例。在异步模式下默认为当前循环,否则启动一个后台循环。

返回
YarnCluster
类方法 from_specification(spec, skein_client=None, asynchronous=False, loop=None)

从 skein 规范启动 dask 集群。

参数
spec : skein.ApplicationSpec, dict, 或文件名

要使用的应用程序规范。必须至少定义一个服务:'dask.worker'。如果未定义 'dask.scheduler' 服务,将在本地启动一个 scheduler。

skein_client : skein.Client, 可选

要使用的 skein.Client。如果未提供,将启动一个。

asynchronous : bool, 可选

如果为 True,则在异步模式下启动集群,以便在其他异步代码中使用。

loop : IOLoop, 可选

要使用的 IOLoop 实例。在异步模式下默认为当前循环,否则启动一个后台循环。

logs(scheduler=True, workers=True)

返回 scheduler 和/或 worker 的日志

参数
scheduler : boolean, 可选

是否收集 scheduler 的日志

workers : boolean 或 iterable, 可选

要选择的 worker 地址列表。如果为 True,默认为所有 worker;如果为 False,则不选择 worker。

返回
logs : dict

名称 -> 日志 的字典。

scale(n)

将集群扩缩到 n 个 worker。

参数
n : int

目标 worker 数量

示例

>>> cluster.scale(10)  # scale cluster to ten workers
shutdown(status='SUCCEEDED', diagnostics=None)

关闭应用程序。

参数
status : {‘SUCCEEDED’, ‘FAILED’, ‘KILLED’}, 可选

yarn 应用程序的退出状态。

diagnostics : str, 可选

应用程序退出消息,通常用于诊断故障。可以在 YARN Web UI 中已完成应用程序的“诊断”下看到。如果未提供,将使用默认值。

workers()

所有当前正在运行的 worker 容器列表。