DigitalOcean

DigitalOcean

DropletCluster([region, size, image, debug])

在 Digital Ocean Droplets 上运行的集群。

概述

身份验证

要使用 DigitalOcean 进行身份验证,您必须首先生成一个个人访问令牌

然后您必须将此令牌放在 Dask 配置的 cloudprovider.digitalocean.token 中。这可以通过将令牌添加到 YAML 配置或导出环境变量来完成。

# ~/.config/dask/cloudprovider.yaml

cloudprovider:
  digitalocean:
    token: "yourtoken"
$ export DASK_CLOUDPROVIDER__DIGITALOCEAN__TOKEN="yourtoken"

Droplet

class dask_cloudprovider.digitalocean.DropletCluster(region: str = None, size: str = None, image: str = None, debug: bool = False, **kwargs)[source]

在 Digital Ocean Droplets 上运行的集群。

DigitalOcean (DO) 中的虚拟机被称为 Droplets。此集群管理器可在虚拟机上构建 Dask 集群。

配置集群时,您可能会发现安装 doctl 工具来查询 DO API 以获取可用选项很有用。

https://www.digitalocean.com/docs/apis-clis/doctl/how-to/install/

参数
region: str

启动集群所在的 DO 区域。可以使用 doctl compute region list 获取完整列表。

size: str

虚拟机大小 slug。您可以使用 doctl compute size list 获取完整列表。默认值为 s-1vcpu-1gb,即 1GB RAM 和 1 vCPU。

image: str

用于宿主操作系统的镜像 ID。这应该是 Ubuntu 变体。您可以使用 doctl compute image list --public | grep ubuntu.*x64 列出可用镜像。

worker_module: str

要在工作节点虚拟机上启动的 Dask worker 模块。

n_workers: int

用于初始化集群的工作节点数量。默认为 0

worker_module: str

用于运行 worker 的 Python 模块。默认为 distributed.cli.dask_worker

worker_options: dict

要传递给 worker 类的参数。请参阅 distributed.worker.Worker 获取默认 worker 类。如果您设置了 worker_module,请参阅自定义 worker 类的文档字符串。

scheduler_options: dict

要传递给 scheduler 类的参数。请参阅 distributed.scheduler.Scheduler

docker_image: string (可选)

要在所有实例上运行的 Docker 镜像。

此镜像必须具有有效的 Python 环境并已安装 dask,以便 dask-schedulerdask-worker 命令可用。建议 Python 环境与创建 EC2Cluster 的本地环境一致。

对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和 dask-cuda

默认情况下将使用 daskdev/dask:latest 镜像。

docker_args: string (可选)

要传递给 Docker 的额外命令行参数。

extra_bootstrap: list[str] (可选)

在引导阶段要运行的额外命令。

env_vars: dict (可选)

要传递给 worker 的环境变量。

silence_logs: bool

设置集群时是否应静默日志记录。

asynchronous: bool

如果打算在事件循环中直接与 async/await 一起使用

securitySecurity 或 bool,可选

配置此集群中的通信安全。可以是 security 对象,也可以是 True。如果为 True,则会自动创建临时的自签名凭据。默认为 True

debug: bool, 可选

构建集群时将打印更多信息以启用调试。

示例

创建集群。

>>> from dask_cloudprovider.digitalocean import DropletCluster
>>> cluster = DropletCluster(n_workers=1)
Creating scheduler instance
Created droplet dask-38b817c1-scheduler
Waiting for scheduler to run
Scheduler is running
Creating worker instance
Created droplet dask-38b817c1-worker-dc95260d

连接客户端。

>>> from dask.distributed import Client
>>> client = Client(cluster)

执行一些工作。

>>> import dask.array as da
>>> arr = da.random.random((1000, 1000), chunks=(100, 100))
>>> arr.mean().compute()
0.5001550986751964

关闭集群

>>> client.close()
>>> cluster.close()
Terminated droplet dask-38b817c1-worker-dc95260d
Terminated droplet dask-38b817c1-scheduler

您还可以一次性使用上下文管理器来确保集群被创建和清理。

>>> with DropletCluster(n_workers=1) as cluster:
...     with Client(cluster) as client:
...         print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute())
Creating scheduler instance
Created droplet dask-48efe585-scheduler
Waiting for scheduler to run
Scheduler is running
Creating worker instance
Created droplet dask-48efe585-worker-5181aaf1
0.5000558682356162
Terminated droplet dask-48efe585-worker-5181aaf1
Terminated droplet dask-48efe585-scheduler
属性
asynchronous

我们是否在事件循环中运行?

auto_shutdown
bootstrap
called_from_running_loop
command
dashboard_link
docker_image
gpu_instance
loop
name
observed
plan
requested
scheduler_address
scheduler_class
worker_class

方法

adapt([Adaptive, minimum, maximum, ...])

开启自适应性

call_async(f, *args, **kwargs)

在线程中以协程方式运行阻塞函数。

from_name(name)

创建此类的一个实例,按名称表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度器和工作节点的日志

get_tags()

生成要应用于所有资源的标签。

new_worker_spec()

返回下一个工作节点的名称和规范

scale([n, memory, cores])

将集群扩展到 n 个工作节点

scale_up([n, memory, cores])

将集群扩展到 n 个工作节点

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 并传入 args

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个工作节点就绪后继续

close

get_cloud_init

logs

render_cloud_init

render_process_cloud_init

scale_down