管理 Python 环境

我们需要确保在 Yarn 集群上使用的库与您在本地使用的库相同。有几种方法可以指定这一点

  • 存档环境的路径(可以是 conda虚拟 环境)
  • 一个 Conda 环境的路径(例如 conda:///...
  • 一个 虚拟环境 的路径(例如 venv:///...
  • Python 可执行文件的路径(例如 python:///...

请注意,当不使用存档时,提供的路径必须在集群中的所有节点上有效。

使用存档的 Python 环境

使用 dask-yarn 最常见的方法是将存档的 Python 环境作为应用程序的一部分分发到整个 YARN 集群。打包环境以进行分发通常使用以下工具处理

这些环境可以包含您可能需要的任何 Python 包,但至少需要 dask-yarn(及其依赖项)。

使用 Conda-Pack 存档 Conda 环境

您可以使用 conda-pack 打包 conda 环境。

$ conda create -n my-env dask-yarn scikit-learn          # Create an environment

$ conda activate my-env                                  # Activate the environment

$ conda-pack                                             # Package environment
Collecting packages...
Packing environment at '/home/username/miniconda/envs/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed |  12.2s

使用 Venv-Pack 存档虚拟环境

您可以使用 venv-pack 打包虚拟环境。虚拟环境可以使用 venvvirtualenv 创建。请注意,虚拟环境中链接的 python 必须存在,并且在 YARN 集群中的每个节点上都可访问。如果环境是用不同的 Python 创建的,您可以使用 --python-prefix 标志更改链接路径。更多信息请参阅 venv-pack 文档

$ python -m venv my_env                     # Create an environment using venv
$ python -m virtualenv my_env               # Or create an environment using virtualenv

$ source my_env/bin/activate                # Activate the environment

$ pip install dask-yarn scikit-learn        # Install some packages

$ venv-pack                                 # Package environment
Collecting packages...
Packing environment at '/home/username/my-env' to 'my-env.tar.gz'
[########################################] | 100% Completed |  8.3s

指定存档的环境

现在,您可以通过将打包环境的路径传递给构造函数来启动集群,例如 YarnCluster(environment='my-env.tar.gz', ...)

请注意,如果环境是本地文件,则在启动应用程序之前,存档将自动上传到 HDFS 上的临时目录。如果您发现自己多次重用同一个环境,您可能希望事先将环境上传到 HDFS 一次,以避免为每个集群重复此过程(然后环境指定为 hdfs:///path/to/my-env.tar.gz)。

启动后,您可能希望通过以下方式验证您的版本是否匹配

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='my-env.tar.gz')
client = Client(cluster)
client.get_versions(check=True)  # check that versions match between all nodes

使用每个节点本地的 Python 环境

或者,您可以指定已经在每个节点上找到的 conda 环境虚拟环境 或 Python 可执行文件的路径

from dask_yarn import YarnCluster

# Use a conda environment at /path/to/my/conda/env
cluster = YarnCluster(environment='conda:///path/to/my/conda/env')

# Use a virtual environment at /path/to/my/virtual/env
cluster = YarnCluster(environment='venv:///path/to/my/virtual/env')

# Use a Python executable at /path/to/my/python
cluster = YarnCluster(environment='python:///path/to/my/python')

如前所述,这些环境可以包含任何 Python 包,但至少必须包含 dask-yarn(及其依赖项)。同样非常重要的是,这些环境在所有节点上必须一致;不匹配的环境可能导致难以诊断的问题。要检查这一点,您可以使用 Client.get_versions 方法

from dask.distributed import Client

client = Client(cluster)
client.get_versions(check=True)  # check that versions match between all nodes