Microsoft Azure
目录
Microsoft Azure¶
|
在 Azure 虚拟机上运行的集群。 |
概览¶
身份验证¶
为了在 Azure 上创建集群,您需要设置身份验证凭据。您可以通过 az
命令行工具 来完成此操作。
$ az login
注意
使用 az configure
将默认输出设置为 table
会使 az
工具更容易使用。
资源组¶
要在 Azure 上创建资源,必须将其放置在资源组中。Dask Cloudprovider 需要一个组来创建 Dask 组件。
您可以通过 CLI 列出现有组。
$ az group list
如果您没有现有资源组,也可以创建一个新的。
$ az group create --location <location> --name <resource group name> --subscription <subscription>
您可以使用 az account list-locations
获取位置的完整列表,使用 az account list
获取订阅列表。
记下您的资源组名称以备后用。
虚拟网络¶
Azure 上的计算资源必须放置在虚拟网络 (vnet) 中。Dask Cloudprovider 需要一个现有的 vnet 来连接计算资源。
您可以通过 CLI 列出现有 vnet。
$ az network vnet list
您也可以通过 CLI 创建一个新的 vnet。
$ az network vnet create -g <resource group name> -n <vnet name> --address-prefix 10.0.0.0/16 \
--subnet-name <subnet name> --subnet-prefix 10.0.0.0/24
此命令将在您的资源组中创建一个新的 vnet,其中包含一个具有 10.0.0.0/24
前缀的子网。对于超过 255 个计算资源,您将需要额外的子网。
记下您的 vnet 名称以备后用。
安全组¶
为了允许网络流量到达您的 Dask 集群,您需要创建一个安全组,该安全组允许来自您所在位置的端口 8786-8787 上的流量。
您可以通过 CLI 列出现有安全组。
$ az network nsg list
或者您可以创建一个新的安全组。
$ az network nsg create -g <resource group name> --name <security group name>
$ az network nsg rule create -g <resource group name> --nsg-name <security group name> -n MyNsgRuleWithAsg \
--priority 500 --source-address-prefixes Internet --destination-port-ranges 8786 8787 \
--destination-address-prefixes '*' --access Allow --protocol Tcp --description "Allow Internet to Dask on ports 8786,8787."
此示例允许来自互联网的所有流量到达 8786-8787 端口。建议您通过将其限制到您的公司网络或特定 IP 来使您的规则更具限制性。
同样,记下此安全组名称以备后用。
额外选项¶
为了进一步自定义创建的 VM,您可以为 AzureVMCluster
提供 extra_vm_options
。例如,要将虚拟机的身份设置为(之前创建的)用户分配的身份,请创建一个 azure.mgmt.compute.models.VirtualMachineIdentity
>>> import os
>>> import azure.identity
>>> import dask_cloudprovider.azure
>>> import azure.mgmt.compute.models
>>> subscription_id = os.environ["DASK_CLOUDPROVIDER__AZURE__SUBSCRIPTION_ID"]
>>> rg_name = os.environ["DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP"]
>>> identity_name = "dask-cloudprovider-identity"
>>> v = azure.mgmt.compute.models.UserAssignedIdentitiesValue()
>>> user_assigned_identities = {
... f"/subscriptions/{subscription_id}/resourcegroups/{rg_name}/providers/Microsoft.ManagedIdentity/userAssignedIdentities/{identity_name}": v
... }
>>> identity = azure.mgmt.compute.models.VirtualMachineIdentity(
... type="UserAssigned",
... user_assigned_identities=user_assigned_identities
... )
然后将其提供给 AzureVMCluster
>>> cluster = dask_cloudprovider.azure.AzureVMCluster(extra_vm_options={"identity": identity.as_dict()})
>>> cluster.scale(1)
Dask 配置¶
创建 AzureVMCluster
时,您将提供 Azure 资源的名称或 ID。您可以手动指定这些值,或使用 Dask 的配置系统。例如,可以使用环境变量指定 resource_group
值
$ export DASK_CLOUDPROVIDER__AZURE__RESOURCE_GROUP="<resource group name>"
$ python
或者您可以在 YAML 配置文件中设置它。
cloudprovider:
azure:
resource_group: "<resource group name>"
azurevm:
vnet: "<vnet name>"
请注意,控制 VM 的选项位于 cloudprovider.azure.azurevm 键下。
有关更多信息,请参阅配置。
AzureVM¶
- class dask_cloudprovider.azure.AzureVMCluster(location: str = None, resource_group: str = None, vnet: str = None, subnet: str = None, security_group: str = None, public_ingress: bool = None, vm_size: str = None, scheduler_vm_size: str = None, vm_image: dict = {}, disk_size: int = None, bootstrap: bool = None, auto_shutdown: bool = None, docker_image=None, debug: bool = False, marketplace_plan: dict = {}, subscription_id: Optional[str] = None, extra_vm_options: Optional[dict] = None, **kwargs)[source]¶
在 Azure 虚拟机上运行的集群。
此集群管理器构建一个在 Azure 虚拟机上运行的 Dask 集群。
配置集群时,您可能会发现安装
az
工具以查询 Azure API 可用选项非常有用。https://docs.microsoft.com/en-us/cli/azure/install-azure-cli
- 参数
- location: str
启动集群的 Azure 位置。使用
az account list-locations
列出可用位置。- resource_group: str
创建组件的资源组。使用
az group list
列出您的资源组。- vnet: str
附加 VM 网络接口的 vnet。使用
az network vnet list
列出您的 vnet。- subnet: str(可选)
附加 VM 网络接口的 vnet 子网。如果省略,它将自动使用您的 vnet 中的第一个子网。
- security_group: str
应用于您的 VM 的安全组。此安全组必须允许来自您运行此程序的位置的端口 8786-8787。使用
az network nsg list
列出您的安全组。- public_ingress: bool
为调度程序分配一个公共 IP 地址。默认为
True
。- vm_size: str
调度程序和工作节点使用的 Azure VM 大小。默认为
Standard_DS1_v2
。使用az vm list-sizes --location <location>
列出可用的 VM 大小。- disk_size: int
指定 VM 主机操作系统磁盘的大小(以千兆字节为单位)。默认为
50
。此值不能大于1023
。- scheduler_vm_size: str
调度程序使用的 Azure VM 大小。如果未设置,将使用
vm_size
。- vm_image: dict
默认情况下,所有 VM 将使用最新的 Ubuntu LTS 版本,配置如下
{"publisher": "Canonical", "offer": "UbuntuServer","sku": "18.04-LTS", "version": "latest"}
您可以通过在此处传递具有匹配键的字典来覆盖其中任何选项。例如,如果您希望尝试 Ubuntu 19.04,您可以传递
{"sku": "19.04"}
,并将使用默认的publisher
、offer
和version
。- bootstrap: bool(可选)
假定
VHD
未安装 Docker(或 GPU 实例的 NVIDIA 驱动程序)。如果bootstrap
设置为True
,则在实例启动时将安装这些依赖项。如果您正在使用已安装这些依赖项的自定义 VHD,请将其设置为False。
- auto_shutdown: bool(可选)
如果 Dask 进程退出,则关闭 VM。默认为
True
。- worker_module: str
要在工作节点 VM 上启动的 Dask worker 模块。
- n_workers: int
初始化集群的工作节点数量。默认为
0
。- worker_module: str
为工作节点运行的 Python 模块。默认为
distributed.cli.dask_worker
- worker_options: dict
要传递给 worker 类的参数。有关默认 worker 类,请参阅
distributed.worker.Worker
。如果您设置了worker_module
,则请参阅自定义 worker 类的 docstring。- scheduler_options: dict
要传递给 scheduler 类的参数。请参阅
distributed.scheduler.Scheduler
。- docker_image: string(可选)
要在所有实例上运行的 Docker 镜像。
此镜像必须具有有效的 Python 环境并已安装
dask
,以便dask-scheduler
和dask-worker
命令可用。建议 Python 环境与您创建AzureVMCluster
的本地环境匹配。对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和
dask-cuda
。默认情况下将使用
daskdev/dask:latest
镜像。- docker_args: string(可选)
传递给 Docker 的额外命令行参数。
- extra_bootstrap: list[str](可选)
在引导阶段运行的额外命令。
- silence_logs: bool
在设置集群时是否应静默日志记录。
- asynchronous: bool
如果打算在事件循环中直接使用 async/await
- securitySecurity 或 bool,可选
配置此集群中的通信安全。可以是 security 对象,或 True。如果为 True,将自动创建临时的自签名凭据。默认为
True
。- debug: bool,可选
构建集群时将打印更多信息以启用调试。
- marketplace_plan: dict(可选)
从 Azure Marketplace 镜像或源自带有计划的 Marketplace 镜像的自定义镜像创建虚拟机所需的计划信息字典。默认为 {}。
如果设置了,字典中必须传递“name”、“publisher”、“product”这三个字段。例如
{"name": "ngc-base-version-21-02-2", "publisher": "nvidia","product": "ngc_azure_17_11"}
- subscription_id: str(可选)
创建虚拟机的 Azure 订阅 ID。如果未指定,则 dask-cloudprovider 将尝试使用 Azure CLI 的配置默认值。使用
az account list
列出您的订阅。- extra_vm_options: dict[str, Any]
在创建调度程序和工作节点 VM 时,提供给 Azure 的
VirtualMachinesOperations.begin_create_or_update
的额外参数。
示例
最小示例
创建集群
>>> from dask_cloudprovider.azure import AzureVMCluster >>> cluster = AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1) Creating scheduler instance Assigned public IP Network interface ready Creating VM Created VM dask-5648cc8b-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Network interface ready Creating VM Created VM dask-5648cc8b-worker-e1ebfc0e
连接客户端。
>>> from dask.distributed import Client >>> client = Client(cluster)
做一些工作。
>>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() 0.5004117488368686
关闭集群。
>>> client.close() >>> cluster.close() Terminated VM dask-5648cc8b-worker-e1ebfc0e Removed disks for VM dask-5648cc8b-worker-e1ebfc0e Deleted network interface Terminated VM dask-5648cc8b-scheduler Removed disks for VM dask-5648cc8b-scheduler Deleted network interface Unassigned public IP
您也可以使用上下文管理器一次性完成所有操作,以确保集群被创建和清理。
>>> with AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Creating scheduler instance Assigned public IP Network interface ready Creating VM Created VM dask-1e6dac4e-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Network interface ready Creating VM Created VM dask-1e6dac4e-worker-c7c4ca23 0.4996427609642539 Terminated VM dask-1e6dac4e-worker-c7c4ca23 Removed disks for VM dask-1e6dac4e-worker-c7c4ca23 Deleted network interface Terminated VM dask-1e6dac4e-scheduler Removed disks for VM dask-1e6dac4e-scheduler Deleted network interface Unassigned public IP
RAPIDS 示例
您也可以使用
AzureVMCluster
运行启用 GPU 的集群并利用 RAPIDS 加速库。>>> cluster = AzureVMCluster(resource_group="<resource group>", ... vnet="<vnet>", ... security_group="<security group>", ... n_workers=1, ... vm_size="Standard_NC12s_v3", # Or any NVIDIA GPU enabled size ... docker_image="rapidsai/rapidsai:cuda11.0-runtime-ubuntu18.04-py3.9", ... worker_class="dask_cuda.CUDAWorker") >>> from dask.distributed import Client >>> client = Client(cluster)
运行一些 GPU 代码。
>>> def get_gpu_model(): ... import pynvml ... pynvml.nvmlInit() ... return pynvml.nvmlDeviceGetName(pynvml.nvmlDeviceGetHandleByIndex(0))
>>> client.submit(get_gpu_model).result() b'Tesla V100-PCIE-16GB'
关闭集群。
>>> client.close() >>> cluster.close()
- 属性
asynchronous
我们正在事件循环中运行吗?
- auto_shutdown
- bootstrap
- called_from_running_loop
- command
- dashboard_link
- docker_image
- gpu_instance
- loop
- name
- observed
- plan
- requested
- scheduler_address
- scheduler_class
- worker_class
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应
call_async
(f, *args, **kwargs)在线程中作为协程运行阻塞函数。
from_name
(name)创建此类实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度程序和工作节点的日志
get_tags
()生成要应用于所有资源的标签。
new_worker_spec
()返回下一个工作节点的名称和规范
scale
([n, memory, cores])将集群扩展到 n 个工作节点
scale_up
([n, memory, cores])将集群扩展到 n 个工作节点
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 和 args
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个工作节点后继续
close
get_cloud_init
logs
render_cloud_init
render_process_cloud_init
scale_down
Azure Spot 实例插件¶
- class dask_cloudprovider.azure.AzurePreemptibleWorkerPlugin(poll_interval_s=1, metadata_url=None, termination_events=None, termination_offset_minutes=0)[source]¶
用于 Azure Spot 实例的工作节点插件
此工作节点插件将轮询 Azure 的元数据服务以获取抢占通知。当节点被抢占时,插件将尝试优雅地关闭该节点上的所有工作节点。
此插件可用于在 Azure Spot 实例上运行的任何工作节点,而不仅仅是由
dask-cloudprovider
创建的工作节点。有关 Azure Spot 实例的更多详细信息,请参阅:https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events
- 参数
- poll_interval_s: int(可选)
插件轮询元数据服务的速率,以秒为单位。
默认为
1
- metadata_url: str(可选)
要轮询的元数据服务的 URL。
默认为“http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01”
- termination_events: List[str](可选)
将触发优雅关闭的事件类型
默认为
['Preempt', 'Terminate']
- termination_offset_minutes: int(可选)
应用于抢占日期的额外偏移量。这可以是负数,以便在
NotBefore
日期之前开始优雅关闭。它也可以是正数,以便在NotBefore
日期之后开始关闭,但这需要您自担风险。默认为
0
示例
假设您有一个集群和一个客户端实例。例如使用
dask_kubernetes.KubeCluster
>>> from dask_kubernetes import KubeCluster >>> from distributed import Client >>> cluster = KubeCluster() >>> client = Client(cluster)
您可以使用以下方式添加工作节点插件
>>> from dask_cloudprovider.azure import AzurePreemptibleWorkerPlugin >>> client.register_worker_plugin(AzurePreemptibleWorkerPlugin())
方法
setup
(worker)插件附加到工作节点时运行。
teardown
(worker)当附加插件的工作节点关闭时运行,或当插件被移除时运行。
transition
(key, start, finish, **kwargs)在任务的整个生命周期中(参阅 工作节点状态),工作节点会收到调度程序的指令来计算某些任务,从而导致每个任务状态的转换。
poll_status