Amazon Web Services (AWS)
目录
Amazon Web Services (AWS)¶
|
使用 EC2 部署 Dask 集群。 |
|
使用 ECS 部署 Dask 集群 |
|
在 ECS 上使用 Fargate 部署 Dask 集群 |
概述¶
凭证¶
为了让您的 Dask Worker 能够连接到其他 AWS 资源(例如 S3),它们将需要凭证。
这可以通过将 IAM 角色附加到单个资源或将凭证作为环境变量传递来完成。有关更多信息,请参阅每个集群管理器的文档字符串。
Elastic Compute Cloud (EC2)¶
- class dask_cloudprovider.aws.EC2Cluster(region=None, availability_zone=None, bootstrap=None, auto_shutdown=None, ami=None, instance_type=None, scheduler_instance_type=None, worker_instance_type=None, vpc=None, subnet_id=None, security_groups=None, filesystem_size=None, key_name=None, iam_instance_profile=None, docker_image=None, debug=False, instance_tags=None, volume_tags=None, use_private_ip=None, enable_detailed_monitoring=None, **kwargs)[source]¶
使用 EC2 部署 Dask 集群。
这将在 EC2 实例上创建 Dask 调度器和 Worker。
所有实例都将运行一个可配置的 Docker 容器,该容器应包含有效的 Python 环境,其中包含 Dask 和任何其他依赖项。
所有可选参数也可以在您的 Dask 配置目录中的 cloudprovider.yaml 文件中或通过环境变量进行配置。
例如,可以通过
DASK_CLOUDPROVIDER__EC2__AMI
设置ami
。有关更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html。
- 参数
- region: 字符串(可选)
启动集群的区域。默认情况下,将从您的配置中检测到。
- availability_zone: 字符串或 List(字符串)(可选)
启动集群的可用区。默认情况下,AWS 将选择可用容量最多的可用区 (AZ)。如果您指定多个可用区,则调度器和 Worker VM 将被随机分配到您选择的一个可用区中。
- bootstrap: 布尔值(可选)
假定
ami
未安装 Docker(或 GPU 实例的 NVIDIA 驱动程序)。如果bootstrap
为True
,则这些依赖项将在实例启动时安装。如果您使用的是已包含这些依赖项的自定义 AMI,请将其设置为False。
- worker_command: 字符串(可选)
Worker 启动时应运行的命令。默认情况下为
"dask-worker"
,除非instance_type
是 GPU 实例,在这种情况下将使用dask-cuda-worker
。- ami: 字符串(可选)
用于调度器和 Worker 的基础操作系统 AMI。
这必须是 Debian 风格的发行版。默认情况下,这将是 canonical 发布的最新官方 Ubuntu 20.04 LTS 版本。
如果 AMI 不包含 Docker,则将在运行时安装。如果
instance_type
是 GPU 实例,则 NVIDIA 驱动程序和 Docker GPU 运行时将在运行时安装。- instance_type: 字符串(可选)
有效的 EC2 实例类型。这将决定可供调度器和所有 Worker 使用的资源。如果提供此参数,则不能指定
scheduler_instance_type
或worker_instance_type
。请参阅 https://aws.amazon.com/ec2/instance-types/。
默认情况下将使用
t2.micro
。- scheduler_instance_type: 字符串(可选)
有效的 EC2 实例类型。这将决定可供调度器使用的资源。
请参阅 https://aws.amazon.com/ec2/instance-types/。
默认情况下将使用
t2.micro
。- worker_instance_type: 字符串(可选)
有效的 EC2 实例类型。这将决定可供所有 Worker 使用的资源。
请参阅 https://aws.amazon.com/ec2/instance-types/。
默认情况下将使用
t2.micro
。- vpc: 字符串(可选)
启动实例所在的 VPC ID。
如果未指定,将检测并使用默认 VPC。
- subnet_id: 字符串(可选)
启动实例所在的子网 ID。
如果未指定,将使用该 VPC 的所有子网。
- security_groups: List(字符串)(可选)
将附加到 Worker 的安全组 ID。
必须允许安全组中实例之间的所有流量,以及调度器实例与您调用
EC2Cluster
的位置之间的 8786 和 8787 端口流量。默认情况下,将创建一个 Dask 安全组,开放 8786 和 8787 端口到互联网。
- filesystem_size: int(可选)
实例的文件系统大小(单位:GB)。
默认为
40
。- key_name: str(可选)
要分配给集群管理器创建的所有实例的 SSH 密钥对名称。您可以使用
aws ec2 describe-key-pairs --query 'KeyPairs[*].KeyName' --output text
命令列出您现有的密钥对名称。注意:您需要确保您的安全组允许访问 22 端口。如果未设置
security_groups
,默认组将不包含此规则,您需要手动添加。- iam_instance_profile: 字典(可选)
要分配给 VM 的 IAM 配置文件。可用于允许访问其他 AWS 资源,例如 S3。请参阅 https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html。
- n_workers: int
初始化集群的 Worker 数量。默认为
0
。- worker_module: str
Worker 要运行的 Python 模块。默认为
distributed.cli.dask_worker
- worker_options: 字典
要传递给 Worker 类的参数。有关默认 Worker 类,请参阅
distributed.worker.Worker
。如果您设置了worker_module
,则请参考自定义 Worker 类的文档字符串。- scheduler_options: 字典
要传递给调度器类的参数。请参阅
distributed.scheduler.Scheduler
。- docker_image: 字符串(可选)
要在所有实例上运行的 Docker 镜像。
此镜像必须具有有效的 Python 环境,并且已安装
dask
,以便dask-scheduler
和dask-worker
命令可用。建议 Python 环境与您创建EC2Cluster
的本地环境匹配。对于 GPU 实例类型,Docker 镜像必须安装 NVIDIA 驱动程序和
dask-cuda
。默认情况下将使用
daskdev/dask:latest
镜像。- docker_args: 字符串(可选)
要传递给 Docker 的额外命令行参数。
- env_vars: 字典(可选)
要传递给 Worker 的环境变量。
- silence_logs: 布尔值
在设置集群时是否应静默日志记录。
- asynchronous: 布尔值
如果这旨在直接在带有 async/await 的事件循环中使用
- security安全对象或布尔值,可选
配置此集群中的通信安全。可以是安全对象或布尔值 True。如果为 True,将自动创建临时的自签名凭证。默认为
True
。- debug: 布尔值,可选
在构建集群时将打印更多信息以启用调试。
- instance_tags: 字典,可选
创建时应用于所有 EC2 实例的标签。默认情况下,包含“createdBy”: “dask-cloudprovider”
- volume_tags: 字典,可选
创建时应用于所有 EBS 卷的标签。默认情况下,包含“createdBy”: “dask-cloudprovider”
- use_private_ip: 布尔值(可选)
是使用私有 IP(如果为 True)还是公有 IP(如果为 False)。
默认为
False
。- enable_detailed_monitoring: 布尔值(可选)
是否为创建的实例启用详细监控。请参阅 https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html。默认为
False
。
注意
创建的资源
资源
名称
用途
费用
EC2 实例
dask-scheduler-{cluster uuid}
Dask 调度器
EC2 实例
dask-worker-{cluster uuid}-{worker uuid}
Dask Worker
凭证
为了让 Dask Worker 访问 AWS 资源(例如 S3),它们将需要凭证。
执行此操作的最佳实践方式是传递一个供 worker 使用的 IAM 角色。更多信息请参阅
iam_instance_profile
关键字。或者,您可以读取使用
aws configure
创建的本地凭据,并将它们作为环境变量传递。这里有一个小例子来帮助您完成此操作。>>> def get_aws_credentials(): ... parser = configparser.RawConfigParser() ... parser.read(os.path.expanduser('~/.aws/config')) ... config = parser.items('default') ... parser.read(os.path.expanduser('~/.aws/credentials')) ... credentials = parser.items('default') ... all_credentials = {key.upper(): value for key, value in [*config, *credentials]} ... with contextlib.suppress(KeyError): ... all_credentials["AWS_REGION"] = all_credentials.pop("REGION") ... return all_credentials >>> cluster = EC2Cluster(env_vars=get_aws_credentials())
手动清理
如果由于某些原因,集群管理器在未能执行清理的情况下终止,
EC2Cluster
的默认行为是调度器和 worker 超时。这将导致主机 VM 关闭。此集群管理器创建的实例也带有“关机时终止”设置,因此所有资源都应自动移除。如果由于某些原因您选择覆盖这些设置并禁用自动清理,您可以使用以下 CLI 命令销毁资源。
export CLUSTER_ID="cluster id printed during creation" aws ec2 describe-instances \ --filters "Name=tag:Dask Cluster,Values=${CLUSTER_ID}" \ --query "Reservations[*].Instances[*].[InstanceId]" \ --output text | xargs aws ec2 terminate-instances --instance-ids
启用 SSH 进行调试
>>> from dask_cloudprovider.aws import EC2Cluster >>> cluster = EC2Cluster(key_name="myawesomekey", # Security group which allows ports 22, 8786, 8787 and all internal traffic security_groups=["sg-aabbcc112233"])
# 您现在可以使用 ssh ubuntu@public_ip 通过 SSH 连接到实例
>>> cluster.close()
- 属性
asynchronous
我们正在事件循环中运行吗?
- auto_shutdown
- bootstrap
- called_from_running_loop
- command
- dashboard_link
- docker_image
- gpu_instance
- loop
- name
- observed
- plan
- requested
- scheduler_address
- scheduler_class
- worker_class
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应功能
call_async
(f, *args, **kwargs)将阻塞函数作为协程在线程中运行。
from_name
(name)创建此类的实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和 worker 的日志
get_tags
()生成要应用于所有资源的标签。
new_worker_spec
()返回下一个 worker 的名称和 spec
scale
([n, memory, cores])将集群扩展到 n 个 worker
scale_up
([n, memory, cores])将集群扩展到 n 个 worker
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 并传递 args
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个 worker 启动后再继续
close
get_cloud_init
logs
render_cloud_init
render_process_cloud_init
scale_down
弹性容器服务 (ECS)¶
- class dask_cloudprovider.aws.ECSCluster(fargate_scheduler=None, fargate_workers=None, fargate_spot=None, image=None, cpu_architecture='X86_64', scheduler_cpu=None, scheduler_mem=None, scheduler_port=8786, scheduler_timeout=None, scheduler_extra_args=None, scheduler_task_definition_arn=None, scheduler_task_kwargs=None, scheduler_address=None, worker_cpu=None, worker_nthreads=None, worker_mem=None, worker_gpu=None, worker_extra_args=None, worker_task_definition_arn=None, worker_task_kwargs=None, n_workers=None, workers_name_start=0, workers_name_step=1, cluster_arn=None, cluster_name_template=None, execution_role_arn=None, task_role_arn=None, task_role_policies=None, cloudwatch_logs_group=None, cloudwatch_logs_stream_prefix=None, cloudwatch_logs_default_retention=None, vpc=None, subnets=None, security_groups=None, environment=None, tags=None, skip_cleanup=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None, platform_version=None, fargate_use_private_ip=False, mount_points=None, volumes=None, mount_volumes_on_scheduler=False, **kwargs)[源代码]¶
使用 ECS 部署 Dask 集群
这会在现有的 ECS 集群上创建 Dask 调度器和 worker。
所有其他必需的资源,例如角色、任务定义、任务等,都将像在
FargateCluster
中一样自动创建。- 参数
- fargate_scheduler: bool (可选)
选择是否对调度器使用 Fargate。
默认为
False
。您必须提供一个现有集群。- fargate_workers: bool (可选)
选择是否对 worker 使用 Fargate。
默认为
False
。您必须提供一个现有集群。- fargate_spot: bool (可选)
选择是否使用 Fargate Spot 运行集群,worker 在 Spot 容量上运行。如果 fargate_scheduler=True 且 fargate_workers=True,这将确保 worker 任务使用 fargate_capacity_provider=FARGATE_SPOT 容量提供者,而调度器任务使用 fargate_capacity_provider=FARGATE 容量提供者。
默认为
False
。您必须提供一个现有集群。- image: str (可选)
用于调度器和 worker 任务的 Docker 镜像。
如果设置了
worker_gpu
,则默认为daskdev/dask:latest
或rapidsai/rapidsai:latest
。- cpu_architecture: str (可选)
运行时平台的 CPU 架构。通常是
X86_64
或ARM64
。有效值在此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/fargate-tasks-services.html#fargate-task-os默认为
X86_64
。- scheduler_cpu: int (可选)
为调度器请求的 CPU 量,单位为 milli-cpu (1/1024)。
默认为
1024
(一个 vCPU)。有关此参数的有效值信息,请参阅故障排除指南。- scheduler_mem: int (可选)
为调度器请求的内存量,单位为 MB。
默认为
4096
(4GB)。有关此参数的有效值信息,请参阅故障排除指南。- scheduler_timeout: str (可选)
如果没有客户端连接,调度器任务将在此时长后退出。
默认为
5 minutes
(5 分钟)。- scheduler_port: int (可选)
调度器应监听的端口。
默认为
8786
- scheduler_extra_args: List[str] (可选)
传递给 dask-scheduler 的任何额外命令行参数,例如
["--tls-cert", "/path/to/cert.pem"]
默认为 None,即没有额外的命令行参数。
- scheduler_task_definition_arn: str (可选)
集群应使用此任务定义 ARN 来启动调度器任务。如果提供,这将覆盖 image、scheduler_cpu、scheduler_mem、任何角色设置、任何网络/VPC 设置,因为这些都是任务定义的一部分。
默认为 None,表示任务定义将与集群一起创建,并在集群关闭后清理。
- scheduler_task_kwargs: dict (可选)
调度器 ECS 任务的额外关键字参数。
- scheduler_address: str (可选)
如果传递此参数,将不会启动调度器任务,而是 worker 会连接到传递的地址。
默认为 None,将启动一个调度器任务。
- worker_cpu: int (可选)
为 worker 任务请求的 CPU 量,单位为 milli-cpu (1/1024)。
默认为
4096
(四个 vCPU)。有关此参数的有效值信息,请参阅故障排除指南。- worker_nthreads: int (可选)
每个 worker 中使用的线程数。
默认为每个 vCPU 1 个线程。
- worker_mem: int (可选)
为 worker 任务请求的内存量,单位为 MB。
默认为
16384
(16GB)。有关此参数的有效值信息,请参阅故障排除指南。- worker_gpu: int (可选)
暴露给 worker 的 GPU 数量。
要为 worker 提供 GPU,您需要使用一个已安装
dask-cuda
并可在您的 ECS 集群中提供 GPU 节点的 GPU 就绪 Docker 镜像。目前不支持 Fargate。默认为 None,即没有 GPU。
- worker_task_definition_arn: str (可选)
集群应使用此任务定义 ARN 来启动 worker 任务。如果提供,这将覆盖 image、worker_cpu、worker_mem、任何角色设置、任何网络/VPC 设置,因为这些都是任务定义的一部分。
默认为 None,表示任务定义将与集群一起创建,并在集群关闭后清理。
- worker_extra_args: List[str] (可选)
传递给 dask-worker 的任何额外命令行参数,例如
["--tls-cert", "/path/to/cert.pem"]
默认为 None,即没有额外的命令行参数。
- worker_task_kwargs: dict (可选)
worker ECS 任务的额外关键字参数。
- n_workers: int (可选)
集群创建时启动的 worker 数量。
默认为
None
。- workers_name_start: int
worker 从此编号开始命名。
默认为 0。
- workers_name_step: int
通过向 workers_name_start 添加 workers_name_step 的倍数来命名 worker。
默认为 1。
- cluster_arn: str (如果 fargate 为 true 则可选)
用于启动任务的现有 ECS 集群的 ARN。
默认为
None
,这将导致为您创建一个新集群。- cluster_name_template: str (可选)
如果将
cluster_arn
设置为None
,则用于集群名称的模板。默认为
'dask-{uuid}'
- execution_role_arn: str (可选)
用于 ECS 执行的现有 IAM 角色的 ARN。
此 ARN 必须允许
ecs-tasks.amazonaws.com
执行sts:AssumeRole
,并允许以下权限:ecr:GetAuthorizationToken
ecr:BatchCheckLayerAvailability
ecr:GetDownloadUrlForLayer
ecr:GetRepositoryPolicy
ecr:DescribeRepositories
ecr:ListImages
ecr:DescribeImages
ecr:BatchGetImage
logs:*
ec2:AuthorizeSecurityGroupIngress
ec2:Describe*
elasticloadbalancing:DeregisterInstancesFromLoadBalancer
elasticloadbalancing:DeregisterTargets
elasticloadbalancing:Describe*
elasticloadbalancing:RegisterInstancesWithLoadBalancer
elasticloadbalancing:RegisterTargets
默认为
None
(将为您创建一个)。- task_role_arn: str (可选)
任务要承担的现有 IAM 角色的 ARN。这定义了 Dask worker 可以直接访问哪些 AWS 资源。如果您需要在不传递凭据的情况下从 S3 或数据库读取数据,这很有用。
默认为
None
(将创建一个仅具有 S3 读取权限的角色)。- task_role_policies: List[str] (可选)
如果您未指定
task_role_arn
,您可能希望列出一些 IAM 策略 ARN,以便附加到将为您创建的角色。例如,如果您的 worker 需要从 S3 读取数据,您可以添加
arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess
。默认为
None
(不会向角色附加任何策略)- cloudwatch_logs_group: str (可选)
用于放置日志的现有 CloudWatch 日志组的名称。
默认为
None
(将创建一个名为dask-ecs
的日志组)- cloudwatch_logs_stream_prefix: str (可选)
日志流的前缀。
默认为集群名称。
- cloudwatch_logs_default_retention: int (可选)
日志保留天数。用于自动创建日志组时。
默认为
30
。- vpc: str (可选)
您希望在其中启动集群的 VPC ID。
默认为
None
(将使用您的默认 VPC)。- subnets: List[str] (可选)
运行任务时要使用的子网列表。
默认为
None
。(将使用 VPC 中所有可用的子网)- security_groups: List[str] (可选)
启动任务时要使用的安全组 ID 列表。
默认为
None
(将创建一个允许任务之间所有流量以及从任何地方访问端口8786
和8787
的安全组)。- environment: dict (可选)
传递给调度器和 worker 任务的额外环境变量。
如果您使用默认镜像,这对设置
EXTRA_APT_PACKAGES
、EXTRA_CONDA_PACKAGES
和`EXTRA_PIP_PACKAGES
很有用。默认为
None
。- tags: dict (可选)
应用于所有自动创建资源的标签。
默认为
None
。标签将始终包含{"createdBy": "dask-cloudprovider"}
- skip_cleanup: bool (可选)
跳过清理陈旧资源。如果您有很多资源并且此操作需要很长时间,这很有用。
默认为
False
。- platform_version: str (可选)
要使用的 AWS Fargate 平台的版本,例如“1.4.0”或“LATEST”。此设置对 EC2 启动类型没有影响。
默认为
None
- fargate_use_private_ip: bool (可选)
是否使用私有 IP (如果为 True) 或公有 IP (如果为 False) 与 Fargate 结合使用。
默认为
False
。- mount_points: list (可选)
挂载点列表,此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html
默认为
None
。- volumes: list (可选)
卷列表,此处记录:https://docs.aws.amazon.com/AmazonECS/latest/developerguide/efs-volumes.html
默认为
None
。- mount_volumes_on_scheduler: bool (可选)
是否也在调度器任务中挂载卷。任何指定的卷和挂载点将始终挂载在 worker 任务中。此设置控制卷是否也挂载在调度器任务中。
默认为
False
。- **kwargs
传递给
SpecCluster
的额外关键字参数。
示例
>>> from dask_cloudprovider.aws import ECSCluster >>> cluster = ECSCluster(cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<clustername>")
ECSCluster
还支持 GPU 感知的 Dask 集群。为此,您需要使用支持 GPU 的实例 (来自g3
、p3
或p3dn
系列) 创建一个 ECS 集群,并指定每个 worker 任务应拥有的 GPU 数量。>>> from dask_cloudprovider.aws import ECSCluster >>> cluster = ECSCluster( ... cluster_arn="arn:aws:ecs:<region>:<acctid>:cluster/<gpuclustername>", ... worker_gpu=1)
通过将
worker_gpu
选项设置为None
以外的值,将导致集群运行dask-cuda-worker
作为 worker 启动命令。设置此选项还将更改默认 Docker 镜像为rapidsai/rapidsai:latest
。如果您使用自定义镜像,必须确保安装了与主机匹配的 NVIDIA CUDA 工具包以及dask-cuda
。- 属性
asynchronous
我们正在事件循环中运行吗?
- called_from_running_loop
- dashboard_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
- tags
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应功能
from_name
(name)创建此类的实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和 worker 的日志
new_worker_spec
()返回下一个 worker 的名称和 spec
scale
([n, memory, cores])将集群扩展到 n 个 worker
scale_up
([n, memory, cores])将集群扩展到 n 个 worker
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 并传递 args
update_attr_from_config
(attr, private)如果尚未设置,则根据配置更新给定集群的类属性。
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个 worker 启动后再继续
close
logs
scale_down
Fargate¶
- class dask_cloudprovider.aws.FargateCluster(**kwargs)[源代码]¶
在 ECS 上使用 Fargate 部署 Dask 集群
这会在 Fargate 支持的 ECS 集群上创建 Dask 调度器和 worker。如果您未配置集群,系统将为您创建一个具有合理默认设置的集群。
- 参数
- kwargs
要传递给
ECSCluster
的关键字参数。
注意
IAM 权限
要创建
FargateCluster
,集群管理器需要使用各种 AWS 资源,从 IAM 角色到 VPC,再到 ECS 任务。根据您的用例,您可能希望集群为您创建所有这些资源,或者您可能希望提前自行指定它们。以下是创建整个集群所需的完整最小 IAM 策略
{ "Statement": [ { "Action": [ "ec2:AuthorizeSecurityGroupIngress", "ec2:CreateSecurityGroup", "ec2:CreateTags", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DeleteSecurityGroup", "ecs:CreateCluster", "ecs:DescribeTasks", "ecs:ListAccountSettings", "ecs:RegisterTaskDefinition", "ecs:RunTask", "ecs:StopTask", "ecs:ListClusters", "ecs:DescribeClusters", "ecs:DeleteCluster", "ecs:ListTaskDefinitions", "ecs:DescribeTaskDefinition", "ecs:DeregisterTaskDefinition", "iam:AttachRolePolicy", "iam:CreateRole", "iam:TagRole", "iam:PassRole", "iam:DeleteRole", "iam:ListRoles", "iam:ListRoleTags", "iam:ListAttachedRolePolicies", "iam:DetachRolePolicy", "logs:DescribeLogGroups", "logs:GetLogEvents", "logs:CreateLogGroup", "logs:PutRetentionPolicy" ], "Effect": "Allow", "Resource": [ "*" ] } ], "Version": "2012-10-17" }
如果您自行指定所有资源,则需要一个最小策略,如下所示:
{ "Statement": [ { "Action": [ "ec2:CreateTags", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ecs:DescribeTasks", "ecs:ListAccountSettings", "ecs:RegisterTaskDefinition", "ecs:RunTask", "ecs:StopTask", "ecs:ListClusters", "ecs:DescribeClusters", "ecs:ListTaskDefinitions", "ecs:DescribeTaskDefinition", "ecs:DeregisterTaskDefinition", "iam:ListRoles", "iam:ListRoleTags", "logs:DescribeLogGroups", "logs:GetLogEvents" ], "Effect": "Allow", "Resource": [ "*" ] } ], "Version": "2012-10-17" }
示例
FargateCluster
默认会创建一个新的 Fargate ECS 集群,以及运行所需的所有 IAM 角色、安全组等。>>> from dask_cloudprovider.aws import FargateCluster >>> cluster = FargateCluster()
请注意,在许多情况下,您会希望为
FargateCluster
指定自定义 Docker 镜像,以便 Dask 拥有执行工作流所需的软件包。>>> from dask_cloudprovider.aws import FargateCluster >>> cluster = FargateCluster(image="<hub-user>/<repo-name>[:<tag>]")
要运行 worker 使用 Fargate Spot (https://aws.amazon.com/blogs/aws/aws-fargate-spot-now-generally-available/) 的集群,请设置
fargate_spot=True
>>> from dask_cloudprovider.aws import FargateCluster >>> cluster = FargateCluster(fargate_spot=True)
确保自定义环境和 Docker 容器之间软件包版本匹配的一种策略是:从
environment.yml
文件创建环境,使用conda list --export > package-list.txt
导出该环境的精确软件包列表,然后在 Dockerfile 中使用package-list.txt
中固定的软件包版本。您可以将默认的 Dask Dockerfile 用作模板,然后简单地添加您固定的额外软件包。- 属性
asynchronous
我们正在事件循环中运行吗?
- called_from_running_loop
- dashboard_link
- loop
- name
- observed
- plan
- requested
- scheduler_address
- tags
方法
adapt
([Adaptive, minimum, maximum, ...])开启自适应功能
from_name
(name)创建此类的实例以按名称表示现有集群。
get_client
()返回集群的客户端
get_logs
([cluster, scheduler, workers])返回集群、调度器和 worker 的日志
new_worker_spec
()返回下一个 worker 的名称和 spec
scale
([n, memory, cores])将集群扩展到 n 个 worker
scale_up
([n, memory, cores])将集群扩展到 n 个 worker
sync
(func, *args[, asynchronous, ...])根据调用上下文同步或异步调用 func 并传递 args
update_attr_from_config
(attr, private)如果尚未设置,则根据配置更新给定集群的类属性。
wait_for_workers
(n_workers[, timeout])阻塞调用,等待 n 个 worker 启动后再继续
close
logs
scale_down