配置

每次为 YarnCluster 构造函数指定所有参数可能会容易出错,尤其是在与新用户共享此工作流程时。或者,您可以在配置文件中提供默认值,这些文件通常位于 ~/.config/dask/yarn.yaml/etc/dask/yarn.yaml。请注意,此配置是可选的,并且仅在构造函数中未指定时更改默认值。您只需设置您关心的字段,未设置的字段将回退到默认配置

示例

# ~/.config/dask/yarn.yaml
yarn:
  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to

  environment: /path/to/my-env.tar.gz

  scheduler:                 # Specifications of scheduler container
    vcores: 1
    memory: 4GiB

  worker:                   # Specifications of worker containers
    vcores: 2
    memory: 8GiB

用户现在可以创建 YarnCluster,而无需指定任何额外信息。

from dask_yarn import YarnCluster

cluster = YarnCluster()
cluster.scale(20)

有关 Dask 配置的更多信息,请参阅Dask 配置文档

提供自定义 Skein 规范

有时,您需要比上述配置字段提供的对部署更多的控制。在这种情况下,您可以向 yarn.specification 字段提供自定义 Skein 规范的路径。如果此字段存在于配置中,只要没有参数传递给 YarnCluster 构造函数,就会使用此规范。请注意,这相当于程序化地调用 YarnCluster.from_specification()

# /home/username/.config/dask/yarn.yaml
yarn:
  specification: /path/to/spec.yaml

此规范要求至少有一个名为 dask.workerService,用于描述如何启动单个 worker。如果提供了额外的服务 dask.scheduler,则假定它用于启动调度器。如果不存在 dask.scheduler,则会在本地启动调度器。

在每个服务的 script 部分,应使用适当的 dask-yarn CLI 文档 命令

  • 使用 dask-yarn services worker 启动 worker
  • 使用 dask-yarn services scheduler 启动调度器

除此之外,您对如何定义规范拥有完全的灵活性。有关更多信息,请参阅 Skein 文档。下面提供了一些示例

示例:带有节点标签限制的本地部署模式

此规范类似于指定 deploy_mode='local' 时自动创建的规范(调度器在本地运行,仅指定 worker 服务),不同之处在于它为 worker 添加了 node_label 限制。在这里,我们将 worker 限制为仅在标记为 GPU 的节点上运行。

# /path/to/spec.yaml
name: dask
queue: myqueue

services:
  dask.worker:
    # Restrict workers to GPU nodes only
    node_label: GPU
    # Don't start any workers initially
    instances: 0
    # Workers can infinite number of times
    max_restarts: -1
    # Restrict workers to 4 GiB and 2 cores each
    resources:
      memory: 4 GiB
      vcores: 2
    # Distribute this python environment to every worker node
    files:
      environment: /path/to/my/environment.tar.gz
    # The bash script to start the worker
    # Here we activate the environment, then start the worker
    script: |
      source environment/bin/activate
      dask-yarn services worker

示例:带有自定义设置的远程部署模式

此规范类似于指定 deploy_mode='remote' 时自动创建的规范(调度器和 worker 都运行在 YARN 容器内),不同之处在于它在启动每个服务之前运行一个初始化脚本。

# /path/to/spec.yaml
name: dask
queue: myqueue

services:
  dask.scheduler:
    # Restrict scheduler to 2 GiB and 1 core
    resources:
      memory: 2 GiB
      vcores: 1
    # The bash script to start the scheduler.
    # Here we have dask-yarn already installed on the node,
    # and also run a custom script before starting the service
    script: |
      some-custom-initialization-script
      dask-yarn services worker

  dask.worker:
    # Don't start any workers initially
    instances: 0
    # Workers can infinite number of times
    max_restarts: -1
    # Workers should only be started after the scheduler starts
    depends:
      - dask.scheduler
    # Restrict workers to 4 GiB and 2 cores each
    resources:
      memory: 4 GiB
      vcores: 2
    # The bash script to start the worker.
    # Here we have dask-yarn already installed on the node,
    # and also run a custom script before starting the service
    script: |
      some-custom-initialization-script
      dask-yarn services worker

默认配置

默认配置文件如下

yarn:
  specification: null        # A path to a skein specification yaml file.
                             # Overrides the following configuration if given.

  name: dask                 # Application name
  queue: default             # Yarn queue to deploy to
  deploy-mode: remote        # The deploy mode to use (either remote or local)
  environment: null          # The Python environment to use
  tags: []                   # List of strings to tag applications
  user: ''                   # The user to submit the application on behalf of,
                             # leave as empty string for current user.

  host: "0.0.0.0"            # The scheduler host, when in deploy-mode=local
  port: 0                    # The scheduler port, when in deploy-mode=local
  dashboard-address: ":0"    # The dashboard address, when in deploy-mode=local

  scheduler:                 # Specifications of scheduler container
    vcores: 1
    memory: 2GiB
    gpus: 0                 # Number of GPUs requested

  worker:                   # Specifications of worker containers
    vcores: 1
    memory: 2GiB
    count: 0                # Number of workers to start on initialization
    restarts: -1            # Allowed number of restarts, -1 for unlimited
    env: {}                 # A map of environment variables to set on the worker
    gpus: 0                 # Number of GPUs requested
    worker_class: "dask.distributed.Nanny" # The kind of worker to launch
    worker_options: {}      # A map of options to pass to the worker

  client:                   # Specification of client container
    vcores: 1
    memory: 2GiB
    gpus: 0                 # Number of GPUs requested
    env: {}                 # A map of environment variables ot set on the client