IBM Cloud
目录
IBM Cloud¶
|
在 IBM Code Engine 上运行的集群。 |
概览¶
Code Engine¶
- class dask_cloudprovider.ibm.IBMCodeEngineCluster(image: str = None, region: str = None, project_id: str = None, scheduler_cpu: str = None, scheduler_mem: str = None, scheduler_disk: str = None, scheduler_timeout: int = None, worker_cpu: str = None, worker_mem: str = None, worker_disk: str = None, worker_threads: int = 1, debug: bool = False, **kwargs)[source]¶
在 IBM Code Engine 上运行的集群。
此集群管理器构建一个在 IBM Code Engine 上运行的 Dask 集群。
配置集群时,您可以参考 IBM Cloud 文档以获取可用选项,这可能会有所帮助。
https://cloud.ibm.com/docs/codeengine
- 参数
- image: str
在所有实例上运行的 Docker 镜像。此镜像必须具有有效的 Python 环境并已安装
dask
,以便dask-scheduler
和dask-worker
命令可用。- region: str
在其中启动集群的 IBM Cloud 区域。
参见:https://cloud.ibm.com/docs/codeengine?topic=codeengine-regions
- project_id: str
您的 IBM Cloud 项目 ID。必须在此处或 Dask 配置中设置此项。
- scheduler_cpu: str
分配给调度器的 CPU 量。
参见:https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
- scheduler_mem: str
分配给调度器的内存量。
参见:https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
- scheduler_disk: str
分配给调度器的临时存储量。此值必须小于 scheduler_mem。
- scheduler_timeout: int
调度器的超时时间(秒)。
- worker_cpu: str
分配给每个工作节点的 CPU 量。
参见:https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
- worker_mem: str
分配给每个工作节点的内存量。
参见:https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
- worker_disk: str
分配给每个工作节点的临时存储量。此值必须小于 worker_mem。
- worker_threads: int
每个工作节点使用的线程数。
- debug: bool, 可选
构建集群时将打印更多信息以启用调试。
注意
凭证
为了使用 IBM Cloud API,您需要设置一个 API 密钥。您可以在 IBM Cloud 控制台中创建 API 密钥。
最佳实践方法是传递一个供工作节点使用的 API 密钥。您可以将此 API 密钥设置为环境变量。这里有一个小示例来帮助您完成此操作。
要公开您的 IBM API 密钥,请使用此命令:export DASK_CLOUDPROVIDER__IBM__API_KEY=xxxxx
证书
此后端需要使用 Let’s Encrypt 证书 (ISRG Root X1) 通过 websockets 连接客户端和调度器。更多信息请参见此处:https://letsencrypt.openssl.ac.cn/certificates/
示例
创建集群。
>>> from dask_cloudprovider.ibm import IBMCodeEngineCluster >>> cluster = IBMCodeEngineCluster(n_workers=1) Launching cluster with the following configuration: Source Image: daskdev/dask:latest Region: eu-de Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 Scheduler CPU: 0.25 Scheduler Memory: 1G Scheduler Disk: 400M Scheduler Timeout: 600 Worker CPU: 2 Worker Memory: 4G Worker Disk: 400M Creating scheduler dask-xxxxxxxx-scheduler Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 Scheduler is running Creating worker instance dask-xxxxxxxx-worker-xxxxxxxx
>>> from dask.distributed import Client >>> client = Client(cluster)
执行一些工作。
>>> import dask.array as da >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) >>> arr.mean().compute() 0.5001550986751964
关闭集群
>>> cluster.close() Deleting Instance: dask-xxxxxxxx-worker-xxxxxxxx Deleting Instance: dask-xxxxxxxx-scheduler
您也可以使用上下文管理器一次性完成所有操作,以确保集群被创建并清理干净。
>>> with IBMCodeEngineCluster(n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Launching cluster with the following configuration: Source Image: daskdev/dask:latest Region: eu-de Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0 Scheduler CPU: 0.25 Scheduler Memory: 1G Scheduler Disk: 400M Scheduler Timeout: 600 Worker CPU: 2 Worker Memory: 4G Worker Disk: 400M Worker Threads: 1 Creating scheduler dask-xxxxxxxx-scheduler Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443 Scheduler is running Creating worker instance dask-xxxxxxxx-worker-xxxxxxxx 0.5000812282861661 Deleting Instance: dask-xxxxxxxx-worker-xxxxxxxx Deleting Instance: dask-xxxxxxxx-scheduler
- 属性
asynchronous
是否在事件循环中运行?
- auto_shutdown
- bootstrap
- called_from_running_loop
- command
- dashboard_link
- docker_image
- gpu_instance
- loop
- name
- observed
- plan
- requested
- scheduler_address
- scheduler_class
- worker_class
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应
call_async
(f, *args, **kwargs)在线程中作为协程运行一个阻塞函数。
from_name
(name)创建此类的实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和工作节点的日志
get_tags
()生成应用于所有资源的标签。
new_worker_spec
()返回下一个工作节点的名称和规格
scale
([n, memory, cores])将集群扩展到 n 个工作节点
scale_up
([n, memory, cores])将集群扩展到 n 个工作节点
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 及 args
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个工作节点就绪后继续
close
get_cloud_init
logs
render_cloud_init
render_process_cloud_init
scale_down