Amazon Web Services (AWS)

EC2Cluster([region, availability_zone, ...])

使用 EC2 部署 Dask 集群。

ECSCluster([fargate_scheduler, ...])

使用 ECS 部署 Dask 集群

FargateCluster(**kwargs)

在 ECS 上使用 Fargate 部署 Dask 集群

概述

身份验证

要在 AWS 上创建集群,您需要设置访问密钥、秘密密钥和区域。最简单的方法是使用 aws 命令行工具。

$ pip install awscli
$ aws configure

凭证

为了让您的 Dask Worker 能够连接到其他 AWS 资源(例如 S3),它们将需要凭证。

这可以通过将 IAM 角色附加到单个资源或将凭证作为环境变量传递来完成。有关更多信息,请参阅每个集群管理器的文档字符串。

Elastic Compute Cloud (EC2)

class dask_cloudprovider.aws.EC2Cluster(region=None, availability_zone=None, bootstrap=None, auto_shutdown=None, ami=None, instance_type=None, scheduler_instance_type=None, worker_instance_type=None, vpc=None, subnet_id=None, security_groups=None, filesystem_size=None, key_name=None, iam_instance_profile=None, docker_image=None, debug=False, instance_tags=None, volume_tags=None, use_private_ip=None, enable_detailed_monitoring=None, **kwargs)[source]

使用 EC2 部署 Dask 集群。

这将在 EC2 实例上创建 Dask 调度器和 Worker。

所有实例都将运行一个可配置的 Docker 容器,该容器应包含有效的 Python 环境,其中包含 Dask 和任何其他依赖项。

所有可选参数也可以在您的 Dask 配置目录中的 cloudprovider.yaml 文件中或通过环境变量进行配置。

例如,可以通过 DASK_CLOUDPROVIDER__EC2__AMI 设置 ami

有关更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html

参数
region: 字符串(可选)

启动集群的区域。默认情况下,将从您的配置中检测到。

availability_zone: 字符串或 List(字符串)(可选)

启动集群的可用区。默认情况下,AWS 将选择可用容量最多的可用区 (AZ)。如果您指定多个可用区,则调度器和 Worker VM 将被随机分配到您选择的一个可用区中。

bootstrap: 布尔值(可选)

假定 ami 未安装 Docker(或 GPU 实例的 NVIDIA 驱动程序)。如果 bootstrapTrue,则这些依赖项将在实例启动时安装。如果您使用的是已包含这些依赖项的自定义 AMI,请将其设置为 False。

worker_command: 字符串(可选)

Worker 启动时应运行的命令。默认情况下为 "dask-worker",除非 instance_type 是 GPU 实例,在这种情况下将使用 dask-cuda-worker

ami: 字符串(可选)

用于调度器和 Worker 的基础操作系统 AMI。

这必须是 Debian 风格的发行版。默认情况下,这将是 canonical 发布的最新官方 Ubuntu 20.04 LTS 版本。

如果 AMI 不包含 Docker,则将在运行时安装。如果 instance_type 是 GPU 实例,则 NVIDIA 驱动程序和 Docker GPU 运行时将在运行时安装。

instance_type: 字符串(可选)

有效的 EC2 实例类型。这将决定可供调度器和所有 Worker 使用的资源。如果提供此参数,则不能指定 scheduler_instance_typeworker_instance_type

请参阅 https://aws.amazon.com/ec2/instance-types/

默认情况下将使用 t2.micro

scheduler_instance_type: 字符串(可选)

有效的 EC2 实例类型。这将决定可供调度器使用的资源。

请参阅 https://aws.amazon.com/ec2/instance-types/

默认情况下将使用 t2.micro

worker_instance_type: 字符串(可选)

有效的 EC2 实例类型。这将决定可供所有 Worker 使用的资源。

请参阅 https://aws.amazon.com/ec2/instance-types/

默认情况下将使用 t2.micro

vpc: 字符串(可选)

启动实例所在的 VPC ID。

如果未指定,将检测并使用默认 VPC。

subnet_id: 字符串(可选)

启动实例所在的子网 ID。

如果未指定,将使用该 VPC 的所有子网。

security_groups: List(字符串)(可选)

将附加到 Worker 的安全组 ID。

必须允许安全组中实例之间的所有流量,以及调度器实例与您调用 EC2Cluster 的位置之间的 8786 和 8787 端口流量。

默认情况下,将创建一个 Dask 安全组,开放 8786 和 8787 端口到互联网。

filesystem_size: int(可选)

实例的文件系统大小(单位:GB)。

默认为 40

key_name: str(可选)

要分配给集群管理器创建的所有实例的 SSH 密钥对名称。您可以使用 aws ec2 describe-key-pairs  --query 'KeyPairs[*].KeyName' --output text 命令列出您现有的密钥对名称。

注意:您需要确保您的安全组允许访问 22 端口。如果未设置 security_groups,默认组将不包含此规则,您需要手动添加。

iam_instance_profile: 字典(可选)

要分配给 VM 的 IAM 配置文件。可用于允许访问其他 AWS 资源,例如 S3。请参阅 https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html

n_workers: int

初始化集群的 Worker 数量。默认为 0

worker_module: str

Worker 要运行的 Python 模块。默认为 distributed.cli.dask_worker

worker_options: 字典

要传递给 Worker 类的参数。有关默认 Worker 类,请参阅 distributed.worker.Worker。如果您设置了 worker_module,则请参考自定义 Worker 类的文档字符串。

scheduler_options: 字典

要传递给调度器类的参数。请参阅 distributed.scheduler.Scheduler

docker_image: 字符串(可选)

要在所有实例上运行的 Docker 镜像。

此镜像必须具有有效的 Python 环境,并且已安装 dask,以便 dask-schedulerdask-worker 命令可用。建议 Python 环境与您创建 EC2Cluster 的本地环境匹配。

对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和 dask-cuda

默认情况下将使用 daskdev/dask:latest 镜像。

docker_args: 字符串(可选)

要传递给 Docker 的额外命令行参数。

env_vars: 字典(可选)

要传递给 Worker 的环境变量。

silence_logs: 布尔值

在设置集群时是否应静默日志记录。

asynchronous: 布尔值

如果这旨在直接在带有 async/await 的事件循环中使用

security安全对象或布尔值,可选

配置此集群中的通信安全。可以是安全对象或布尔值 True。如果为 True,将自动创建临时的自签名凭证。默认为 True

debug: 布尔值,可选

在构建集群时将打印更多信息以启用调试。

instance_tags: 字典,可选

创建时应用于所有 EC2 实例的标签。默认情况下,包含“createdBy”: “dask-cloudprovider”

volume_tags: 字典,可选

创建时应用于所有 EBS 卷的标签。默认情况下,包含“createdBy”: “dask-cloudprovider”

use_private_ip: 布尔值(可选)

是使用私有 IP(如果为 True)还是公有 IP(如果为 False)。

默认为 False

enable_detailed_monitoring: 布尔值(可选)

是否为创建的实例启用详细监控。请参阅 https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html。默认为 False

注意

创建的资源

资源

名称

用途

费用

EC2 实例

dask-scheduler-{cluster uuid}

Dask 调度器

EC2 定价

EC2 实例

dask-worker-{cluster uuid}-{worker uuid}

Dask Worker

EC2 定价

凭证

为了让 Dask Worker 访问 AWS 资源(例如 S3),它们将需要凭证。

执行此操作的最佳实践方式是传递一个供 worker 使用的 IAM 角色。更多信息请参阅 iam_instance_profile 关键字。

或者,您可以读取使用 aws configure 创建的本地凭据,并将它们作为环境变量传递。这里有一个小例子来帮助您完成此操作。

>>> def get_aws_credentials():
...     parser = configparser.RawConfigParser()
...     parser.read(os.path.expanduser('~/.aws/config'))
...     config = parser.items('default')
...     parser.read(os.path.expanduser('~/.aws/credentials'))
...     credentials = parser.items('default')
...     all_credentials = {key.upper(): value for key, value in [*config, *credentials]}
...     with contextlib.suppress(KeyError):
...         all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
...     return all_credentials
>>> cluster = EC2Cluster(env_vars=get_aws_credentials())

手动清理

如果由于某些原因,集群管理器在未能执行清理的情况下终止,EC2Cluster 的默认行为是调度器和 worker 超时。这将导致主机 VM 关闭。此集群管理器创建的实例也带有“关机时终止”设置,因此所有资源都应自动移除。

如果由于某些原因您选择覆盖这些设置并禁用自动清理,您可以使用以下 CLI 命令销毁资源。

export CLUSTER_ID="cluster id printed during creation"
aws ec2 describe-instances \
    --filters "Name=tag:Dask Cluster,Values=${CLUSTER_ID}" \
    --query "Reservations[*].Instances[*].[InstanceId]" \
    --output text | xargs aws ec2 terminate-instances --instance-ids

启用 SSH 进行调试

>>> from dask_cloudprovider.aws import EC2Cluster
>>> cluster = EC2Cluster(key_name="myawesomekey",
                         # Security group which allows ports 22, 8786, 8787 and all internal traffic
                         security_groups=["sg-aabbcc112233"])

# 您现在可以使用 ssh ubuntu@public_ip 通过 SSH 连接到实例

>>> cluster.close()
属性
asynchronous

我们正在事件循环中运行吗?

auto_shutdown
bootstrap
called_from_running_loop
command
dashboard_link
docker_image
gpu_instance
loop
name
observed
plan
requested
scheduler_address
scheduler_class
worker_class

方法

adapt([Adaptive, minimum, maximum, ...])

开启自适应功能

call_async(f, *args, **kwargs)

将阻塞函数作为协程在线程中运行。

from_name(name)

创建此类的实例以按名称表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度器和 worker 的日志

get_tags()

生成要应用于所有资源的标签。

new_worker_spec()

返回下一个 worker 的名称和 spec

scale([n, memory, cores])

将集群扩展到 n 个 worker

scale_up([n, memory, cores])

将集群扩展到 n 个 worker

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 并传递 args

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个 worker 启动后再继续

close

get_cloud_init

logs

render_cloud_init

render_process_cloud_init

scale_down

弹性容器服务 (ECS)

class dask_cloudprovider.aws.ECSCluster(fargate_scheduler=None, fargate_workers=None, fargate_spot=None, image=None, cpu_architecture='X86_64', scheduler_cpu=None, scheduler_mem=None, scheduler_port=8786, scheduler_timeout=None, scheduler_extra_args=None, scheduler_task_definition_arn=None, scheduler_task_kwargs=None, scheduler_address=None, worker_cpu=None, worker_nthreads=None, worker_mem=None, worker_gpu=None, worker_extra_args=None, worker_task_definition_arn=None, worker_task_kwargs=None, n_workers=None, workers_name_start=0, workers_name_step=1, cluster_arn=None, cluster_name_template=None, execution_role_arn=None, task_role_arn=None, task_role_policies=None, cloudwatch_logs_group=None, cloudwatch_logs_stream_prefix=None, cloudwatch_logs_default_retention=None, vpc=None, subnets=None, security_groups=None, environment=None, tags=None, skip_cleanup=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None, platform_version=None, fargate_use_private_ip=False, mount_points=None, volumes=None, mount_volumes_on_scheduler=False, **kwargs)[源代码]

使用 ECS 部署 Dask 集群

这会在现有的 ECS 集群上创建 Dask 调度器和 worker。

所有其他必需的资源,例如角色、任务定义、任务等,都将像在 FargateCluster 中一样自动创建。

参数
fargate_scheduler: bool (可选)

选择是否对调度器使用 Fargate。

默认为 False。您必须提供一个现有集群。

fargate_workers: bool (可选)

选择是否对 worker 使用 Fargate。

默认为 False。您必须提供一个现有集群。

fargate_spot: bool (可选)

选择是否使用 Fargate Spot 运行集群,worker 在 Spot 容量上运行。如果 fargate_scheduler=Truefargate_workers=True,这将确保 worker 任务使用 fargate_capacity_provider=FARGATE_SPOT 容量提供者,而调度器任务使用 fargate_capacity_provider=FARGATE 容量提供者。

默认为 False。您必须提供一个现有集群。

image: str (可选)

用于调度器和 worker 任务的 Docker 镜像。

如果设置了 worker_gpu,则默认为 daskdev/dask:latestrapidsai/rapidsai:latest

cpu_architecture: str (可选)

运行时平台的 CPU 架构。通常是 X86_64ARM64。有效值在此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/fargate-tasks-services.html#fargate-task-os

默认为 X86_64

scheduler_cpu: int (可选)

为调度器请求的 CPU 量,单位为 milli-cpu (1/1024)。

默认为 1024 (一个 vCPU)。有关此参数的有效值信息,请参阅故障排除指南

scheduler_mem: int (可选)

为调度器请求的内存量,单位为 MB。

默认为 4096 (4GB)。有关此参数的有效值信息,请参阅故障排除指南

scheduler_timeout: str (可选)

如果没有客户端连接,调度器任务将在此时长后退出。

默认为 5 minutes (5 分钟)。

scheduler_port: int (可选)

调度器应监听的端口。

默认为 8786

scheduler_extra_args: List[str] (可选)

传递给 dask-scheduler 的任何额外命令行参数,例如 ["--tls-cert", "/path/to/cert.pem"]

默认为 None,即没有额外的命令行参数。

scheduler_task_definition_arn: str (可选)

集群应使用此任务定义 ARN 来启动调度器任务。如果提供,这将覆盖 imagescheduler_cpuscheduler_mem、任何角色设置、任何网络/VPC 设置,因为这些都是任务定义的一部分。

默认为 None,表示任务定义将与集群一起创建,并在集群关闭后清理。

scheduler_task_kwargs: dict (可选)

调度器 ECS 任务的额外关键字参数。

scheduler_address: str (可选)

如果传递此参数,将不会启动调度器任务,而是 worker 会连接到传递的地址。

默认为 None,将启动一个调度器任务。

worker_cpu: int (可选)

为 worker 任务请求的 CPU 量,单位为 milli-cpu (1/1024)。

默认为 4096 (四个 vCPU)。有关此参数的有效值信息,请参阅故障排除指南

worker_nthreads: int (可选)

每个 worker 中使用的线程数。

默认为每个 vCPU 1 个线程。

worker_mem: int (可选)

为 worker 任务请求的内存量,单位为 MB。

默认为 16384 (16GB)。有关此参数的有效值信息,请参阅故障排除指南

worker_gpu: int (可选)

暴露给 worker 的 GPU 数量。

要为 worker 提供 GPU,您需要使用一个已安装 dask-cuda 并可在您的 ECS 集群中提供 GPU 节点的 GPU 就绪 Docker 镜像。目前不支持 Fargate。

默认为 None,即没有 GPU。

worker_task_definition_arn: str (可选)

集群应使用此任务定义 ARN 来启动 worker 任务。如果提供,这将覆盖 imageworker_cpuworker_mem、任何角色设置、任何网络/VPC 设置,因为这些都是任务定义的一部分。

默认为 None,表示任务定义将与集群一起创建,并在集群关闭后清理。

worker_extra_args: List[str] (可选)

传递给 dask-worker 的任何额外命令行参数,例如 ["--tls-cert", "/path/to/cert.pem"]

默认为 None,即没有额外的命令行参数。

worker_task_kwargs: dict (可选)

worker ECS 任务的额外关键字参数。

n_workers: int (可选)

集群创建时启动的 worker 数量。

默认为 None

workers_name_start: int

worker 从此编号开始命名。

默认为 0

workers_name_step: int

通过向 workers_name_start 添加 workers_name_step 的倍数来命名 worker。

默认为 1

cluster_arn: str (如果 fargate 为 true 则可选)

用于启动任务的现有 ECS 集群的 ARN。

默认为 None,这将导致为您创建一个新集群。

cluster_name_template: str (可选)

如果将 cluster_arn 设置为 None,则用于集群名称的模板。

默认为 'dask-{uuid}'

execution_role_arn: str (可选)

用于 ECS 执行的现有 IAM 角色的 ARN。

此 ARN 必须允许 ecs-tasks.amazonaws.com 执行 sts:AssumeRole,并允许以下权限:

  • ecr:GetAuthorizationToken

  • ecr:BatchCheckLayerAvailability

  • ecr:GetDownloadUrlForLayer

  • ecr:GetRepositoryPolicy

  • ecr:DescribeRepositories

  • ecr:ListImages

  • ecr:DescribeImages

  • ecr:BatchGetImage

  • logs:*

  • ec2:AuthorizeSecurityGroupIngress

  • ec2:Describe*

  • elasticloadbalancing:DeregisterInstancesFromLoadBalancer

  • elasticloadbalancing:DeregisterTargets

  • elasticloadbalancing:Describe*

  • elasticloadbalancing:RegisterInstancesWithLoadBalancer

  • elasticloadbalancing:RegisterTargets

默认为 None (将为您创建一个)。

task_role_arn: str (可选)

任务要承担的现有 IAM 角色的 ARN。这定义了 Dask worker 可以直接访问哪些 AWS 资源。如果您需要在不传递凭据的情况下从 S3 或数据库读取数据,这很有用。

默认为 None (将创建一个仅具有 S3 读取权限的角色)。

task_role_policies: List[str] (可选)

如果您未指定 task_role_arn,您可能希望列出一些 IAM 策略 ARN,以便附加到将为您创建的角色。

例如,如果您的 worker 需要从 S3 读取数据,您可以添加 arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess

默认为 None (不会向角色附加任何策略)

cloudwatch_logs_group: str (可选)

用于放置日志的现有 CloudWatch 日志组的名称。

默认为 None (将创建一个名为 dask-ecs 的日志组)

cloudwatch_logs_stream_prefix: str (可选)

日志流的前缀。

默认为集群名称。

cloudwatch_logs_default_retention: int (可选)

日志保留天数。用于自动创建日志组时。

默认为 30

vpc: str (可选)

您希望在其中启动集群的 VPC ID。

默认为 None (将使用您的默认 VPC)。

subnets: List[str] (可选)

运行任务时要使用的子网列表。

默认为 None。(将使用 VPC 中所有可用的子网)

security_groups: List[str] (可选)

启动任务时要使用的安全组 ID 列表。

默认为 None (将创建一个允许任务之间所有流量以及从任何地方访问端口 87868787 的安全组)。

environment: dict (可选)

传递给调度器和 worker 任务的额外环境变量。

如果您使用默认镜像,这对设置 EXTRA_APT_PACKAGESEXTRA_CONDA_PACKAGES`EXTRA_PIP_PACKAGES 很有用。

默认为 None

tags: dict (可选)

应用于所有自动创建资源的标签。

默认为 None。标签将始终包含 {"createdBy": "dask-cloudprovider"}

skip_cleanup: bool (可选)

跳过清理陈旧资源。如果您有很多资源并且此操作需要很长时间,这很有用。

默认为 False

platform_version: str (可选)

要使用的 AWS Fargate 平台的版本,例如“1.4.0”或“LATEST”。此设置对 EC2 启动类型没有影响。

默认为 None

fargate_use_private_ip: bool (可选)

是否使用私有 IP (如果为 True) 或公有 IP (如果为 False) 与 Fargate 结合使用。

默认为 False

mount_points: list (可选)

挂载点列表,此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html

默认为 None

volumes: list (可选)

卷列表,此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html

默认为 None

mount_volumes_on_scheduler: bool (可选)

是否也在调度器任务中挂载卷。任何指定的卷和挂载点将始终挂载在 worker 任务中。此设置控制卷是否也挂载在调度器任务中。

默认为 False

**kwargs

传递给 SpecCluster 的额外关键字参数。

示例

>>> from dask_cloudprovider.aws import ECSCluster
>>> cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")

ECSCluster 还支持 GPU 感知的 Dask 集群。为此,您需要使用支持 GPU 的实例 (来自 g3p3p3dn 系列) 创建一个 ECS 集群,并指定每个 worker 任务应拥有的 GPU 数量。

>>> from dask_cloudprovider.aws import ECSCluster
>>> cluster = ECSCluster(
...     cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>",
...     worker_gpu=1)

通过将 worker_gpu 选项设置为 None 以外的值,将导致集群运行 dask-cuda-worker 作为 worker 启动命令。设置此选项还将更改默认 Docker 镜像为 rapidsai/rapidsai:latest。如果您使用自定义镜像,必须确保安装了与主机匹配的 NVIDIA CUDA 工具包以及 dask-cuda

属性
asynchronous

我们正在事件循环中运行吗?

called_from_running_loop
dashboard_link
loop
name
observed
plan
requested
scheduler_address
tags

方法

adapt([Adaptive, minimum, maximum, ...])

开启自适应功能

from_name(name)

创建此类的实例以按名称表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度器和 worker 的日志

new_worker_spec()

返回下一个 worker 的名称和 spec

scale([n, memory, cores])

将集群扩展到 n 个 worker

scale_up([n, memory, cores])

将集群扩展到 n 个 worker

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 并传递 args

update_attr_from_config(attr, private)

如果尚未设置,则根据配置更新给定集群的类属性。

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个 worker 启动后再继续

close

logs

scale_down

Fargate

class dask_cloudprovider.aws.FargateCluster(**kwargs)[源代码]

在 ECS 上使用 Fargate 部署 Dask 集群

这会在 Fargate 支持的 ECS 集群上创建 Dask 调度器和 worker。如果您未配置集群,系统将为您创建一个具有合理默认设置的集群。

参数
kwargs

要传递给 ECSCluster 的关键字参数。

注意

IAM 权限

要创建 FargateCluster,集群管理器需要使用各种 AWS 资源,从 IAM 角色到 VPC,再到 ECS 任务。根据您的用例,您可能希望集群为您创建所有这些资源,或者您可能希望提前自行指定它们。

以下是创建整个集群所需的完整最小 IAM 策略

{
    "Statement": [
        {
            "Action": [
                "ec2:AuthorizeSecurityGroupIngress",
                "ec2:CreateSecurityGroup",
                "ec2:CreateTags",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:DeleteSecurityGroup",
                "ecs:CreateCluster",
                "ecs:DescribeTasks",
                "ecs:ListAccountSettings",
                "ecs:RegisterTaskDefinition",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:ListClusters",
                "ecs:DescribeClusters",
                "ecs:DeleteCluster",
                "ecs:ListTaskDefinitions",
                "ecs:DescribeTaskDefinition",
                "ecs:DeregisterTaskDefinition",
                "iam:AttachRolePolicy",
                "iam:CreateRole",
                "iam:TagRole",
                "iam:PassRole",
                "iam:DeleteRole",
                "iam:ListRoles",
                "iam:ListRoleTags",
                "iam:ListAttachedRolePolicies",
                "iam:DetachRolePolicy",
                "logs:DescribeLogGroups",
                "logs:GetLogEvents",
                "logs:CreateLogGroup",
                "logs:PutRetentionPolicy"
            ],
            "Effect": "Allow",
            "Resource": [
                "*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

如果您自行指定所有资源,则需要一个最小策略,如下所示:

{
    "Statement": [
        {
            "Action": [
                "ec2:CreateTags",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ecs:DescribeTasks",
                "ecs:ListAccountSettings",
                "ecs:RegisterTaskDefinition",
                "ecs:RunTask",
                "ecs:StopTask",
                "ecs:ListClusters",
                "ecs:DescribeClusters",
                "ecs:ListTaskDefinitions",
                "ecs:DescribeTaskDefinition",
                "ecs:DeregisterTaskDefinition",
                "iam:ListRoles",
                "iam:ListRoleTags",
                "logs:DescribeLogGroups",
                "logs:GetLogEvents"
            ],
            "Effect": "Allow",
            "Resource": [
                "*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

示例

FargateCluster 默认会创建一个新的 Fargate ECS 集群,以及运行所需的所有 IAM 角色、安全组等。

>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster()

请注意,在许多情况下,您会希望为 FargateCluster 指定自定义 Docker 镜像,以便 Dask 拥有执行工作流所需的软件包。

>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster(image="<hub-user>/<repo-name>[:<tag>]")

要运行 worker 使用 Fargate Spot (https://aws.amazon.com/blogs/aws/aws-fargate-spot-now-generally-available/) 的集群,请设置 fargate_spot=True

>>> from dask_cloudprovider.aws import FargateCluster
>>> cluster = FargateCluster(fargate_spot=True)

确保自定义环境和 Docker 容器之间软件包版本匹配的一种策略是:从 environment.yml 文件创建环境,使用 conda list --export > package-list.txt 导出该环境的精确软件包列表,然后在 Dockerfile 中使用 package-list.txt 中固定的软件包版本。您可以将默认的 Dask Dockerfile 用作模板,然后简单地添加您固定的额外软件包。

属性
asynchronous

我们正在事件循环中运行吗?

called_from_running_loop
dashboard_link
loop
name
observed
plan
requested
scheduler_address
tags

方法

adapt([Adaptive, minimum, maximum, ...])

开启自适应功能

from_name(name)

创建此类的实例以按名称表示现有集群。

get_client()

返回集群的客户端

get_logs([cluster, scheduler, workers])

返回集群、调度器和 worker 的日志

new_worker_spec()

返回下一个 worker 的名称和 spec

scale([n, memory, cores])

将集群扩展到 n 个 worker

scale_up([n, memory, cores])

将集群扩展到 n 个 worker

sync(func, *args[, asynchronous, ...])

根据调用上下文同步或异步调用 func 并传递 args

update_attr_from_config(attr, private)

如果尚未设置,则根据配置更新给定集群的类属性。

wait_for_workers(n_workers[, timeout])

阻塞调用,等待 n 个 worker 启动后再继续

close

logs

scale_down