Microsoft Azure

AzureVMCluster([location, resource_group, ...])

在 Azure 虚拟机上运行的集群。

概览

身份验证

为了在 Azure 上创建集群,您需要设置身份验证凭据。您可以通过 az 命令行工具 来完成此操作。

$ az login

注意

使用 az configure 将默认输出设置为 table 会使 az 工具更容易使用。

资源组

要在 Azure 上创建资源,必须将其放置在资源组中。Dask Cloudprovider 需要一个组来创建 Dask 组件。

您可以通过 CLI 列出现有组。

$ az group list

如果您没有现有资源组,也可以创建一个新的。

$ az group create --location <location> --name <resource group name> --subscription <subscription>

您可以使用 az account list-locations 获取位置的完整列表,使用 az account list 获取订阅列表。

记下您的资源组名称以备后用。

虚拟网络

Azure 上的计算资源必须放置在虚拟网络 (vnet) 中。Dask Cloudprovider 需要一个现有的 vnet 来连接计算资源。

您可以通过 CLI 列出现有 vnet。

$ az network vnet list

您也可以通过 CLI 创建一个新的 vnet。

$ az network vnet create -g <resource group name> -n <vnet name> --address-prefix 10.0.0.0/16 \
      --subnet-name <subnet name> --subnet-prefix 10.0.0.0/24

此命令将在您的资源组中创建一个新的 vnet,其中包含一个具有 10.0.0.0/24 前缀的子网。对于超过 255 个计算资源,您将需要额外的子网。

记下您的 vnet 名称以备后用。

安全组

为了允许网络流量到达您的 Dask 集群,您需要创建一个安全组,该安全组允许来自您所在位置的端口 8786-8787 上的流量。

您可以通过 CLI 列出现有安全组。

$ az network nsg list

或者您可以创建一个新的安全组。

$ az network nsg create -g <resource group name> --name <security group name>
$ az network nsg rule create -g <resource group name> --nsg-name <security group name> -n MyNsgRuleWithAsg \
      --priority 500 --source-address-prefixes Internet --destination-port-ranges 8786 8787 \
      --destination-address-prefixes '*' --access Allow --protocol Tcp --description "Allow Internet to Dask on ports 8786,8787."

此示例允许来自互联网的所有流量到达 8786-8787 端口。建议您通过将其限制到您的公司网络或特定 IP 来使您的规则更具限制性。

同样,记下此安全组名称以备后用。

额外选项

为了进一步自定义创建的 VM,您可以为 AzureVMCluster 提供 extra_vm_options。例如,要将虚拟机的身份设置为(之前创建的)用户分配的身份,请创建一个 azure.mgmt.compute.models.VirtualMachineIdentity

>>> import os
>>> import azure.identity
>>> import dask_cloudprovider.azure
>>> import azure.mgmt.compute.models

>>> subscription_id = os.environ["DASK_CLOUDPROVIDER__AZURE__SUBSCRIPTION_ID"]
>>> rg_name = os.environ["DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP"]
>>> identity_name = "dask-cloudprovider-identity"
>>> v = azure.mgmt.compute.models.UserAssignedIdentitiesValue()
>>> user_assigned_identities = {
...     f"/subscriptions/{subscription_id}/resourcegroups/{rg_name}/providers/Microsoft.ManagedIdentity/userAssignedIdentities/{identity_name}": v
... }
>>> identity = azure.mgmt.compute.models.VirtualMachineIdentity(
...     type="UserAssigned",
...     user_assigned_identities=user_assigned_identities
... )

然后将其提供给 AzureVMCluster

>>> cluster = dask_cloudprovider.azure.AzureVMCluster(extra_vm_options={"identity": identity.as_dict()})
>>> cluster.scale(1)

Dask 配置

创建 AzureVMCluster 时,您将提供 Azure 资源的名称或 ID。您可以手动指定这些值,或使用 Dask 的配置系统。例如,可以使用环境变量指定 resource_group

$ export DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP="<resource group name>"
$ python

或者您可以在 YAML 配置文件中设置它。

cloudprovider:
  azure:
    resource_group: "<resource group name>"
    azurevm:
     vnet: "<vnet name>"

请注意,控制 VM 的选项位于 cloudprovider.azure.azurevm 键下。

有关更多信息,请参阅配置

AzureVM

class dask_cloudprovider.azure.AzureVMCluster(location: str = None, resource_group: str = None, vnet: str = None, subnet: str = None, security_group: str = None, public_ingress: bool = None, vm_size: str = None, scheduler_vm_size: str = None, vm_image: dict = {}, disk_size: int = None, bootstrap: bool = None, auto_shutdown: bool = None, docker_image=None, debug: bool = False, marketplace_plan: dict = {}, subscription_id: Optional[str] = None, extra_vm_options: Optional[dict] = None, **kwargs)[source]

在 Azure 虚拟机上运行的集群。

此集群管理器构建一个在 Azure 虚拟机上运行的 Dask 集群。

配置集群时,您可能会发现安装 az 工具以查询 Azure API 可用选项非常有用。

https://docs.microsoft.com/en-us/cli/azure/install-azure-cli

参数
location: str

启动集群的 Azure 位置。使用 az account list-locations 列出可用位置。

resource_group: str

创建组件的资源组。使用 az group list 列出您的资源组。

vnet: str

附加 VM 网络接口的 vnet。使用 az network vnet list 列出您的 vnet。

subnet: str(可选)

附加 VM 网络接口的 vnet 子网。如果省略,它将自动使用您的 vnet 中的第一个子网。

security_group: str

应用于您的 VM 的安全组。此安全组必须允许来自您运行此程序的位置的端口 8786-8787。使用 az network nsg list 列出您的安全组。

public_ingress: bool

为调度程序分配一个公共 IP 地址。默认为 True

vm_size: str

调度程序和工作节点使用的 Azure VM 大小。默认为 Standard_DS1_v2。使用 az vm list-sizes --location <location> 列出可用的 VM 大小。

disk_size: int

指定 VM 主机操作系统磁盘的大小(以千兆字节为单位)。默认为 50。此值不能大于 1023

scheduler_vm_size: str

调度程序使用的 Azure VM 大小。如果未设置,将使用 vm_size

vm_image: dict

默认情况下,所有 VM 将使用最新的 Ubuntu LTS 版本,配置如下

{"publisher": "Canonical", "offer": "UbuntuServer","sku": "18.04-LTS", "version": "latest"}

您可以通过在此处传递具有匹配键的字典来覆盖其中任何选项。例如,如果您希望尝试 Ubuntu 19.04,您可以传递 {"sku": "19.04"},并将使用默认的 publisherofferversion

bootstrap: bool(可选)

假定 VHD 未安装 Docker(或 GPU 实例的 NVIDIA 驱动程序)。如果 bootstrap 设置为 True,则在实例启动时将安装这些依赖项。如果您正在使用已安装这些依赖项的自定义 VHD,请将其设置为 False。

auto_shutdown: bool(可选)

如果 Dask 进程退出,则关闭 VM。默认为 True

worker_module: str

要在工作节点 VM 上启动的 Dask worker 模块。

n_workers: int

初始化集群的工作节点数量。默认为 0

worker_module: str

为工作节点运行的 Python 模块。默认为 distributed.cli.dask_worker

worker_options: dict

要传递给 worker 类的参数。有关默认 worker 类,请参阅 distributed.worker.Worker。如果您设置了 worker_module,则请参阅自定义 worker 类的 docstring。

scheduler_options: dict

要传递给 scheduler 类的参数。请参阅 distributed.scheduler.Scheduler

docker_image: string(可选)

要在所有实例上运行的 Docker 镜像。

此镜像必须具有有效的 Python 环境并已安装 dask,以便 dask-schedulerdask-worker 命令可用。建议 Python 环境与您创建 AzureVMCluster 的本地环境匹配。

对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和 dask-cuda

默认情况下将使用 daskdev/dask:latest 镜像。

docker_args: string(可选)

传递给 Docker 的额外命令行参数。

extra_bootstrap: list[str](可选)

在引导阶段运行的额外命令。

silence_logs: bool

在设置集群时是否应静默日志记录。

asynchronous: bool

如果打算在事件循环中直接使用 async/await

securitySecurity 或 bool,可选

配置此集群中的通信安全。可以是 security 对象,或 True。如果为 True,将自动创建临时的自签名凭据。默认为 True

debug: bool,可选

构建集群时将打印更多信息以启用调试。

marketplace_plan: dict(可选)

从 Azure Marketplace 镜像或源自带有计划的 Marketplace 镜像的自定义镜像创建虚拟机所需的计划信息字典。默认为 {}。

如果设置了,字典中必须传递“name”、“publisher”、“product”这三个字段。例如

{"name": "ngc-base-version-21-02-2", "publisher": "nvidia","product": "ngc_azure_17_11"}

subscription_id: str(可选)

创建虚拟机的 Azure 订阅 ID。如果未指定,则 dask-cloudprovider 将尝试使用 Azure CLI 的配置默认值。使用 az account list 列出您的订阅。

extra_vm_options: dict[str, Any]

在创建调度程序和工作节点 VM 时,提供给 Azure 的 VirtualMachinesOperations.begin_create_or_update 的额外参数。

示例

最小示例

创建集群

>>> from dask_cloudprovider.azure import AzureVMCluster
>>> cluster = AzureVMCluster(resource_group="<resource group>",
...                          vnet="<vnet>",
...                          security_group="<security group>",
...                          n_workers=1)
Creating scheduler instance
Assigned public IP
Network interface ready
Creating VM
Created VM dask-5648cc8b-scheduler
Waiting for scheduler to run
Scheduler is running
Creating worker instance
Network interface ready
Creating VM
Created VM dask-5648cc8b-worker-e1ebfc0e

连接客户端。

>>> from dask.distributed import Client
>>> client = Client(cluster)

做一些工作。

>>> import dask.array as da
>>> arr = da.random.random((1000, 1000), chunks=(100, 100))
>>> arr.mean().compute()
0.5004117488368686

关闭集群。

>>> client.close()
>>> cluster.close()
Terminated VM dask-5648cc8b-worker-e1ebfc0e
Removed disks for VM dask-5648cc8b-worker-e1ebfc0e
Deleted network interface
Terminated VM dask-5648cc8b-scheduler
Removed disks for VM dask-5648cc8b-scheduler
Deleted network interface
Unassigned public IP

您也可以使用上下文管理器一次性完成所有操作,以确保集群被创建和清理。

>>> with AzureVMCluster(resource_group="<resource group>",
...                     vnet="<vnet>",
...                     security_group="<security group>",
...                     n_workers=1) as cluster:
...     with Client(cluster) as client:
...             print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute())
Creating scheduler instance
Assigned public IP
Network interface ready
Creating VM
Created VM dask-1e6dac4e-scheduler
Waiting for scheduler to run
Scheduler is running
Creating worker instance
Network interface ready
Creating VM
Created VM dask-1e6dac4e-worker-c7c4ca23
0.4996427609642539
Terminated VM dask-1e6dac4e-worker-c7c4ca23
Removed disks for VM dask-1e6dac4e-worker-c7c4ca23
Deleted network interface
Terminated VM dask-1e6dac4e-scheduler
Removed disks for VM dask-1e6dac4e-scheduler
Deleted network interface
Unassigned public IP

RAPIDS 示例

您也可以使用 AzureVMCluster 运行启用 GPU 的集群并利用 RAPIDS 加速库。

>>> cluster = AzureVMCluster(resource_group="<resource group>",
...                          vnet="<vnet>",
...                          security_group="<security group>",
...                          n_workers=1,
...                          vm_size="Standard_NC12s_v3",  # Or any NVIDIA GPU enabled size
...                          docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9",
...                          worker_class="dask_cuda.CUDAWorker")
>>> from dask.distributed import Client
>>> client = Client(cluster)

运行一些 GPU 代码。

>>> def get_gpu_model():
...     import pynvml
...     pynvml.nvmlInit()
...     return pynvml.nvmlDeviceGetName(pynvml.nvmlDeviceGetHandleByIndex(0))
>>> client.submit(get_gpu_model).result()
b'Tesla V100-PCIE-16GB'

关闭集群。

>>> client.close()
>>> cluster.close()
属性
asynchronous

我们正在事件循环中运行吗?

auto_shutdown
bootstrap
called_from_running_loop
command
dashboard_link
docker_image
gpu_instance
loop
name
observed
plan
requested
scheduler_address
scheduler_class
worker_class

方法

adapt([Adaptive, minimum, maximum, ...])

开启自适应

call_async(f, *args, **kwargs)

在线程中作为协程运行阻塞函数。

from_name(name)

创建此类实例以按名称表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度程序和工作节点的日志

get_tags()

生成要应用于所有资源的标签。

new_worker_spec()

返回下一个工作节点的名称和规范

scale([n, memory, cores])

将集群扩展到 n 个工作节点

scale_up([n, memory, cores])

将集群扩展到 n 个工作节点

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 funcargs

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个工作节点后继续

close

get_cloud_init

logs

render_cloud_init

render_process_cloud_init

scale_down

Azure Spot 实例插件

class dask_cloudprovider.azure.AzurePreemptibleWorkerPlugin(poll_interval_s=1, metadata_url=None, termination_events=None, termination_offset_minutes=0)[source]

用于 Azure Spot 实例的工作节点插件

此工作节点插件将轮询 Azure 的元数据服务以获取抢占通知。当节点被抢占时,插件将尝试优雅地关闭该节点上的所有工作节点。

此插件可用于在 Azure Spot 实例上运行的任何工作节点,而不仅仅是由 dask-cloudprovider 创建的工作节点。

有关 Azure Spot 实例的更多详细信息,请参阅:https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events

参数
poll_interval_s: int(可选)

插件轮询元数据服务的速率,以秒为单位。

默认为 1

metadata_url: str(可选)

要轮询的元数据服务的 URL。

默认为“http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01

termination_events: List[str](可选)

将触发优雅关闭的事件类型

默认为 ['Preempt', 'Terminate']

termination_offset_minutes: int(可选)

应用于抢占日期的额外偏移量。这可以是负数,以便在 NotBefore 日期之前开始优雅关闭。它也可以是正数,以便在 NotBefore 日期之后开始关闭,但这需要您自担风险。

默认为 0

示例

假设您有一个集群和一个客户端实例。例如使用 dask_kubernetes.KubeCluster

>>> from dask_kubernetes import KubeCluster
>>> from distributed import Client
>>> cluster = KubeCluster()
>>> client = Client(cluster)

您可以使用以下方式添加工作节点插件

>>> from dask_cloudprovider.azure import AzurePreemptibleWorkerPlugin
>>> client.register_worker_plugin(AzurePreemptibleWorkerPlugin())

方法

setup(worker)

插件附加到工作节点时运行。

teardown(worker)

当附加插件的工作节点关闭时运行,或当插件被移除时运行。

transition(key, start, finish, **kwargs)

在任务的整个生命周期中(参阅 工作节点状态),工作节点会收到调度程序的指令来计算某些任务,从而导致每个任务状态的转换。

poll_status

setup(worker)[source]

插件附加到工作节点时运行。这发生在插件注册并附加到现有工作节点时,或在插件注册后创建工作节点时。

teardown(worker)[source]

当附加插件的工作节点关闭时运行,或当插件被移除时运行。