DigitalOcean
目录
DigitalOcean¶
|
在 Digital Ocean Droplets 上运行的集群。 |
概述¶
Droplet¶
- class dask_cloudprovider.digitalocean.DropletCluster(region: str = None, size: str = None, image: str = None, debug: bool = False, **kwargs)[source]¶
在 Digital Ocean Droplets 上运行的集群。
DigitalOcean (DO) 中的虚拟机被称为 Droplets。此集群管理器可在虚拟机上构建 Dask 集群。
配置集群时,您可能会发现安装
doctl
工具来查询 DO API 以获取可用选项很有用。https://www.digitalocean.com/docs/apis-clis/doctl/how-to/install/
- 参数
- region: str
启动集群所在的 DO 区域。可以使用
doctl compute region list
获取完整列表。- size: str
虚拟机大小 slug。您可以使用
doctl compute size list
获取完整列表。默认值为s-1vcpu-1gb
,即 1GB RAM 和 1 vCPU。- image: str
用于宿主操作系统的镜像 ID。这应该是 Ubuntu 变体。您可以使用
doctl compute image list --public | grep ubuntu.*x64
列出可用镜像。- worker_module: str
要在工作节点虚拟机上启动的 Dask worker 模块。
- n_workers: int
用于初始化集群的工作节点数量。默认为
0
。- worker_module: str
用于运行 worker 的 Python 模块。默认为
distributed.cli.dask_worker
- worker_options: dict
要传递给 worker 类的参数。请参阅
distributed.worker.Worker
获取默认 worker 类。如果您设置了worker_module
,请参阅自定义 worker 类的文档字符串。- scheduler_options: dict
要传递给 scheduler 类的参数。请参阅
distributed.scheduler.Scheduler
。- docker_image: string (可选)
要在所有实例上运行的 Docker 镜像。
此镜像必须具有有效的 Python 环境并已安装
dask
,以便dask-scheduler
和dask-worker
命令可用。建议 Python 环境与创建EC2Cluster
的本地环境一致。对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和
dask-cuda
。默认情况下将使用
daskdev/dask:latest
镜像。- docker_args: string (可选)
要传递给 Docker 的额外命令行参数。
- extra_bootstrap: list[str] (可选)
在引导阶段要运行的额外命令。
- env_vars: dict (可选)
要传递给 worker 的环境变量。
- silence_logs: bool
设置集群时是否应静默日志记录。
- asynchronous: bool
如果打算在事件循环中直接与 async/await 一起使用
- securitySecurity 或 bool,可选
配置此集群中的通信安全。可以是 security 对象,也可以是 True。如果为 True,则会自动创建临时的自签名凭据。默认为
True
。- debug: bool, 可选
构建集群时将打印更多信息以启用调试。
示例
创建集群。
>>> from dask_cloudprovider.digitalocean import DropletCluster >>> cluster = DropletCluster(n_workers=1) Creating scheduler instance Created droplet dask-38b817c1-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Created droplet dask-38b817c1-worker-dc95260d
连接客户端。
>>> from dask.distributed import Client >>> client = Client(cluster)
执行一些工作。
>>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() 0.5001550986751964
关闭集群
>>> client.close() >>> cluster.close() Terminated droplet dask-38b817c1-worker-dc95260d Terminated droplet dask-38b817c1-scheduler
您还可以一次性使用上下文管理器来确保集群被创建和清理。
>>> with DropletCluster(n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Creating scheduler instance Created droplet dask-48efe585-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance Created droplet dask-48efe585-worker-5181aaf1 0.5000558682356162 Terminated droplet dask-48efe585-worker-5181aaf1 Terminated droplet dask-48efe585-scheduler
- 属性
asynchronous
我们是否在事件循环中运行?
- auto_shutdown
- bootstrap
- called_from_running_loop
- command
- dashboard_link
- docker_image
- gpu_instance
- loop
- name
- observed
- plan
- requested
- scheduler_address
- scheduler_class
- worker_class
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应性
call_async
(f, *args, **kwargs)在线程中以协程方式运行阻塞函数。
from_name
(name)创建此类的一个实例,按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和工作节点的日志
get_tags
()生成要应用于所有资源的标签。
new_worker_spec
()返回下一个工作节点的名称和规范
scale
([n, memory, cores])将集群扩展到 n 个工作节点
scale_up
([n, memory, cores])将集群扩展到 n 个工作节点
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 并传入 args
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个工作节点就绪后继续
close
get_cloud_init
logs
render_cloud_init
render_process_cloud_init
scale_down