torchrec.distributed.planner¶
Torchrec Planner
The planner provides the specifications necessary for a module to be sharded, considering the possible options to build an optimized plan.
- The features includes:
generating all possible sharding options.
estimating perf and storage for every shard.
estimating peak memory usage to eliminate sharding plans that might OOM.
customizability for parameter constraints, partitioning, proposers, or performance modeling.
automatically building and selecting an optimized sharding plan.
torchrec.distributed.planner.constants¶
- torchrec.distributed.planner.constants.kernel_bw_lookup(compute_device: str, compute_kernel: str, hbm_mem_bw: float, ddr_mem_bw: float, caching_ratio: Optional[float] = None, prefetch_pipeline: bool = False) Optional[float] ¶
Calculates the device bandwidth based on given compute device, compute kernel, and caching ratio.
- Parameters:
compute_kernel (str) – compute kernel.
compute_device (str) – compute device.
hbm_mem_bw (float) – the bandwidth of the device HBM.
ddr_mem_bw (float) – the bandwidth of the system DDR memory.
caching_ratio (Optional[float]) – caching ratio used to determine device bandwidth if UVM caching is enabled.
prefetch_pipeline (bool) – whether prefetch pipeline is enabled.
- Returns:
the device bandwidth.
- Return type:
Optional[float]
torchrec.distributed.planner.enumerators¶
- class torchrec.distributed.planner.enumerators.EmbeddingEnumerator(topology: Topology, batch_size: int, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None, use_exact_enumerate_order: Optional[bool] = False)¶
Bases:
Enumerator
Generates embedding sharding options for given nn.Module, considering user provided constraints.
- Parameters:
topology (Topology) – device topology.
batch_size (int) – batch size.
constraints (Optional[Dict[str, ParameterConstraints]]) – dict of parameter names to provided ParameterConstraints.
estimator (Optional[Union[ShardEstimator, List[ShardEstimator]]]) – shard performance estimators.
use_exact_enumerate_order (bool) – whether to enumerate shardable parameters in the exact name_children enumeration order
- enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption] ¶
Generates relevant sharding options given module and sharders.
- Parameters:
module (nn.Module) – module to be sharded.
sharders (List[ModuleSharder[nn.Module]]) – provided sharders for module.
- Returns:
valid sharding options with values populated.
- Return type:
List[ShardingOption]
- populate_estimates(sharding_options: List[ShardingOption]) None ¶
See class description.
- torchrec.distributed.planner.enumerators.get_partition_by_type(sharding_type: str) str ¶
Gets corresponding partition by type for provided sharding type.
- Parameters:
sharding_type (str) – sharding type string.
- Returns:
the corresponding PartitionByType value.
- Return type:
str
torchrec.distributed.planner.partitioners¶
- class torchrec.distributed.planner.partitioners.GreedyPerfPartitioner(sort_by: SortBy = SortBy.STORAGE, balance_modules: bool = False)¶
Bases:
Partitioner
Greedy Partitioner
- Parameters:
sort_by (SortBy) – Sort sharding options by storage or perf in descending order (i.e., large tables will be placed first).
balance_modules (bool) – Whether to sort by modules first, where smaller modules will be sorted first. In effect, this will place tables in each module in a balanced way.
- partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption] ¶
Places sharding options on topology based on each sharding option’s partition_by attribute. The topology, storage, and perfs are updated at the end of the placement.
- Parameters:
proposal (List[ShardingOption]) – list of populated sharding options.
storage_constraint (Topology) – device topology.
- Returns:
list of sharding options for selected plan.
- Return type:
List[ShardingOption]
Example:
sharding_options = [ ShardingOption(partition_by="uniform", shards=[ Shards(storage=1, perf=1), Shards(storage=1, perf=1), ]), ShardingOption(partition_by="uniform", shards=[ Shards(storage=2, perf=2), Shards(storage=2, perf=2), ]), ShardingOption(partition_by="device", shards=[ Shards(storage=3, perf=3), Shards(storage=3, perf=3), ]) ShardingOption(partition_by="device", shards=[ Shards(storage=4, perf=4), Shards(storage=4, perf=4), ]), ] topology = Topology(world_size=2) # First [sharding_options[0] and sharding_options[1]] will be placed on the # topology with the uniform strategy, resulting in topology.devices[0].perf.total = (1,2) topology.devices[1].perf.total = (1,2) # Finally sharding_options[2] and sharding_options[3]] will be placed on the # topology with the device strategy (see docstring of `partition_by_device` for # more details). topology.devices[0].perf.total = (1,2) + (3,4) topology.devices[1].perf.total = (1,2) + (3,4) # The topology updates are done after the end of all the placements (the other # in the example is just for clarity).
- class torchrec.distributed.planner.partitioners.MemoryBalancedPartitioner(max_search_count: int = 10, tolerance: float = 0.02, balance_modules: bool = False)¶
Bases:
Partitioner
Memory balanced Partitioner.
- Parameters:
max_search_count (int) – Maximum number of times to call the GreedyPartitioner.
tolerance (float) – The maximum acceptable difference between the original plan and the new plan. If tolerance is 1, that means a new plan will be rejected if its perf is 200% of the original plan (i.e., the plan is 100% worse).
balance_modules (bool) – Whether to sort by modules first, where smaller modules will be sorted first. In effect, this will place tables in each module in a balanced way.
- partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption] ¶
Repeatedly calls the GreedyPerfPartitioner to find a plan with perf within the tolerance of the original plan that uses the least amount of memory.
- class torchrec.distributed.planner.partitioners.OrderedDeviceHardware(device: torchrec.distributed.planner.types.DeviceHardware, local_world_size: int)¶
Bases:
object
- device: DeviceHardware¶
- local_world_size: int¶
- class torchrec.distributed.planner.partitioners.ShardingOptionGroup(sharding_options: List[torchrec.distributed.planner.types.ShardingOption], storage_sum: torchrec.distributed.planner.types.Storage, perf_sum: float, param_count: int)¶
Bases:
object
- param_count: int¶
- perf_sum: float¶
- sharding_options: List[ShardingOption]¶
torchrec.distributed.planner.perf_models¶
- class torchrec.distributed.planner.perf_models.NoopPerfModel(topology: Topology)¶
Bases:
PerfModel
A no-op model that returns the maximum perf among all shards. Here, no-op means we estimate the performance of a model without actually running it.
- rate(plan: List[ShardingOption]) float ¶
- class torchrec.distributed.planner.perf_models.NoopStorageModel(topology: Topology)¶
Bases:
PerfModel
A no-op model that returns the maximum hbm usage among all shards. Here, no-op means we estimate the performance of a model without actually running it.
- rate(plan: List[ShardingOption]) float ¶
torchrec.distributed.planner.planners¶
- class torchrec.distributed.planner.planners.EmbeddingShardingPlanner(topology: Optional[Topology] = None, batch_size: Optional[int] = None, enumerator: Optional[Enumerator] = None, storage_reservation: Optional[StorageReservation] = None, proposer: Optional[Union[Proposer, List[Proposer]]] = None, partitioner: Optional[Partitioner] = None, performance_model: Optional[PerfModel] = None, stats: Optional[Union[Stats, List[Stats]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True)¶
Bases:
ShardingPlanner
Provides an optimized sharding plan for a given module with shardable parameters according to the provided sharders, topology, and constraints.
- collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan ¶
Call self.plan(…) on rank 0 and broadcast
- plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan ¶
Plans sharding for provided module and given sharders.
- Parameters:
module (nn.Module) – module that sharding is planned for.
sharders (List[ModuleSharder[nn.Module]]) – provided sharders for module.
- Returns:
the computed sharding plan.
- Return type:
- class torchrec.distributed.planner.planners.HeteroEmbeddingShardingPlanner(topology_groups: Optional[Dict[str, Topology]] = None, batch_size: Optional[int] = None, enumerators: Optional[Dict[str, Enumerator]] = None, storage_reservations: Optional[Dict[str, StorageReservation]] = None, proposers: Optional[Dict[str, Union[Proposer, List[Proposer]]]] = None, partitioners: Optional[Dict[str, Partitioner]] = None, performance_models: Optional[Dict[str, PerfModel]] = None, stats: Optional[Dict[str, Union[Stats, List[Stats]]]] = None, constraints: Optional[Dict[str, ParameterConstraints]] = None, debug: bool = True)¶
Bases:
ShardingPlanner
Provides an optimized sharding plan for a given module with shardable parameters according to the provided sharders, topology, and constraints.
- collective_plan(module: Module, sharders: Optional[List[ModuleSharder[Module]]] = None, pg: Optional[ProcessGroup] = None) ShardingPlan ¶
Call self.plan(…) on rank 0 and broadcast
- plan(module: Module, sharders: List[ModuleSharder[Module]]) ShardingPlan ¶
Plans sharding for provided module and given sharders.
- Parameters:
module (nn.Module) – module that sharding is planned for.
sharders (List[ModuleSharder[nn.Module]]) – provided sharders for module.
- Returns:
the computed sharding plan.
- Return type:
torchrec.distributed.planner.proposers¶
- class torchrec.distributed.planner.proposers.EmbeddingOffloadScaleupProposer(use_depth: bool = True)¶
Bases:
Proposer
- static allocate_budget(model: Tensor, clfs: Tensor, budget: int, allocation_priority: Tensor) Tensor ¶
- static build_affine_storage_model(uvm_caching_sharding_options: List[ShardingOption], enumerator: Enumerator) Tensor ¶
- static clf_to_bytes(model: Tensor, clfs: Union[float, Tensor]) Tensor ¶
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
- static get_budget(proposal: List[ShardingOption], storage_constraint: Topology) int ¶
returns additional HBM budget available for GPU caches.
- static get_cacheability(sharding_option: ShardingOption) Optional[float] ¶
- static get_expected_lookups(sharding_option: ShardingOption) Optional[float] ¶
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
- static next_plan(starting_proposal: List[ShardingOption], budget: Optional[int], enumerator: Optional[Enumerator]) Optional[List[ShardingOption]] ¶
- static promote_high_prefetch_overheaad_table_to_hbm(enumerator: Optional[Enumerator], proposal: List[ShardingOption]) None ¶
Prefetch overhead is related to IO. When it’s larger than saved memory from embedding offloading, we’d undo offloading and promote to HBM for better memory efficiency.
This function will end up updating proposal.
- propose() Optional[List[ShardingOption]] ¶
- class torchrec.distributed.planner.proposers.GreedyProposer(use_depth: bool = True, threshold: Optional[int] = None)¶
Bases:
Proposer
Proposes sharding plans in greedy fashion.
Sorts sharding options for each shardable parameter by perf. On each iteration, finds parameter with largest current storage usage and tries its next sharding option.
- Parameters:
use_depth (bool) – When enabled, sharding_options of a fqn are sorted based on max(shard.perf.total), otherwise sharding_options are sorted by sum(shard.perf.total).
threshold (Optional[int]) – Threshold for early stopping. When specified, the proposer stops proposing when the proposals have consecutive worse perf_rating than best_perf_rating.
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
- propose() Optional[List[ShardingOption]] ¶
- class torchrec.distributed.planner.proposers.GridSearchProposer(max_proposals: int = 10000)¶
Bases:
Proposer
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
- propose() Optional[List[ShardingOption]] ¶
- class torchrec.distributed.planner.proposers.UniformProposer(use_depth: bool = True)¶
Bases:
Proposer
Proposes uniform sharding plans, plans that have the same sharding type for all sharding options.
- feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
- load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
- propose() Optional[List[ShardingOption]] ¶
- torchrec.distributed.planner.proposers.proposers_to_proposals_list(proposers_list: List[Proposer], search_space: List[ShardingOption]) List[List[ShardingOption]] ¶
only works for static_feedback proposers (the path of proposals to check is independent of the performance of the proposals)
torchrec.distributed.planner.shard_estimators¶
- class torchrec.distributed.planner.shard_estimators.EmbeddingOffloadStats(cacheability: float, expected_lookups: int, mrc_hist_counts: Tensor, height: int)¶
Bases:
CacheStatistics
Computes cache statistics for uvm_fused_cache tables.
Args:
- cachebility (float):
The area-under-the-curve of miss-ratio curve.
- expected_lookups (float):
The expected number of unique embedding ids per global batch.
- mrc_hist_counts (torch.Tensor):
A 1d tensor (size n) holding a histogram of LRU miss ratio curve. Each bin represents 1/nth of possible LRU cache sizes (from load_factor 0 to load_factor 1.0). The bin contains the number of expected LRU operations that could be handled without a cache miss if the LRU load_factor was at least that size.
- height (int):
The height (num_embeddings) of the embedding table.
- property cacheability: float¶
Summarized measure of the difficulty to cache a dataset that is independent of cache size. A score of 0 means the dataset is very cacheable (e.g. high locality between accesses), a score of 1 is very difficult to cache.
- static estimate_cache_miss_rate(cache_sizes: Tensor, hist: Tensor, bins: Tensor) Tensor ¶
Calculate estimated cache miss ratio for the proposed cache_sizes, given the MRC histogram.
- property expected_lookups: int¶
Number of expected cache lookups per training step.
This is the expected number of distinct values in a global training batch.
- expected_miss_rate(clf: float) float ¶
Expected cache lookup miss rate for a given cache size.
When clf (cache load factor) is 0, returns 1.0 (100% miss). When clf is 1.0, returns 0 (100% hit). For values of clf between these extremes, returns the estimated miss rate of the cache, e.g. based on knowledge of the statistical properties of the training data set.
- class torchrec.distributed.planner.shard_estimators.EmbeddingPerfEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, is_inference: bool = False)¶
Bases:
ShardEstimator
Embedding Wall Time Perf Estimator
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None ¶
- classmethod perf_func_emb_wall_time(shard_sizes: List[List[int]], compute_kernel: str, compute_device: str, sharding_type: str, batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], input_data_type_size: float, table_data_type_size: float, output_data_type_size: float, fwd_a2a_comm_data_type_size: float, bwd_a2a_comm_data_type_size: float, fwd_sr_comm_data_type_size: float, bwd_sr_comm_data_type_size: float, num_poolings: List[float], hbm_mem_bw: float, ddr_mem_bw: float, intra_host_bw: float, inter_host_bw: float, bwd_compute_multiplier: float, weighted_feature_bwd_compute_multiplier: float, is_pooled: bool, is_weighted: bool = False, caching_ratio: Optional[float] = None, is_inference: bool = False, prefetch_pipeline: bool = False, expected_cache_fetches: float = 0, uneven_sharding_perf_multiplier: float = 1.0) List[Perf] ¶
Attempts to model perfs as a function of relative wall times.
- Parameters:
shard_sizes (List[List[int]]) – the list of (local_rows, local_cols) of each shard.
compute_kernel (str) – compute kernel.
compute_device (str) – compute device.
sharding_type (str) – tw, rw, cw, twrw, dp.
batch_sizes (List[int]) – batch size for each input feature.
world_size (int) – the number of devices for all hosts.
local_world_size (int) – the number of the device for each host.
input_lengths (List[float]) – the list of the average number of lookups of each input query feature.
input_data_type_size (float) – the data type size of the distributed data_parallel input.
table_data_type_size (float) – the data type size of the table.
output_data_type_size (float) – the data type size of the output embeddings.
fwd_comm_data_type_size (float) – the data type size of the distributed data_parallel input during forward communication.
bwd_comm_data_type_size (float) – the data type size of the distributed data_parallel input during backward communication.
num_poolings (List[float]) – number of poolings per sample, typically 1.0.
hbm_mem_bw (float) – the bandwidth of the device HBM.
ddr_mem_bw (float) – the bandwidth of the system DDR memory.
intra_host_bw (float) – the bandwidth within a single host like multiple threads.
inter_host_bw (float) – the bandwidth between two hosts like multiple machines.
is_pooled (bool) – True if embedding output is pooled (ie. EmbeddingBag), False if unpooled/sequential (ie. Embedding).
is_weighted (bool = False) – if the module is an EBC and is weighted, typically signifying an id score list feature.
is_inference (bool = False) – if planning for inference.
caching_ratio (Optional[float] = None) – cache ratio to determine the bandwidth of device.
prefetch_pipeline (bool = False) – whether prefetch pipeline is enabled.
expected_cache_fetches (float) – number of expected cache fetches across global batch
- Returns:
the list of perf for each shard.
- Return type:
List[float]
- class torchrec.distributed.planner.shard_estimators.EmbeddingStorageEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None, pipeline_type: PipelineType = PipelineType.NONE, is_inference: bool = False)¶
Bases:
ShardEstimator
Embedding Storage Usage Estimator
- estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None ¶
- torchrec.distributed.planner.shard_estimators.calculate_pipeline_io_cost(input_size: int, output_size: int, prefetch_size: int, pipeline_type: PipelineType, multipass_prefetch_max_pass: Optional[int], is_inference: bool = False) int ¶
- torchrec.distributed.planner.shard_estimators.calculate_shard_storages(sharder: ModuleSharder[Module], sharding_type: str, tensor: Tensor, compute_device: str, compute_kernel: str, shard_sizes: List[List[int]], batch_sizes: List[int], world_size: int, local_world_size: int, input_lengths: List[float], num_poolings: List[float], caching_ratio: float, is_pooled: bool, input_data_type_size: float, output_data_type_size: float, pipeline_type: PipelineType = PipelineType.NONE, is_inference: bool = False, multipass_prefetch_max_pass: Optional[int] = None) List[Storage] ¶
Calculates estimated storage sizes for each sharded tensor, comprised of input, output, tensor, gradient, and optimizer sizes.
- Parameters:
sharder (ModuleSharder[nn.Module]) – sharder for module that supports sharding.
sharding_type (str) – provided ShardingType value.
tensor (torch.Tensor) – tensor to be sharded.
compute_device (str) – compute device to be used.
compute_kernel (str) – compute kernel to be used.
shard_sizes (List[List[int]]) – list of dimensions of each sharded tensor.
batch_sizes (List[int]) – batch size for each input feature.
world_size (int) – total number of devices in topology.
local_world_size (int) – total number of devices in host group topology.
input_lengths (List[float]) – average input lengths synonymous with pooling factors.
num_poolings (List[float]) – average number of poolings per sample (typically 1.0).
caching_ratio (float) – ratio of HBM to DDR memory for UVM caching.
is_pooled (bool) – True if embedding output is pooled (ie. EmbeddingBag), False if unpooled/sequential (ie. Embedding).
input_data_type_size (int) – number of bytes of input data type.
output_data_type_size (int) – number of bytes of output data type.
pipeline_type – PipelineType: pipeline type if for training.
is_inference – bool, whether the model is for inference.
- Returns:
storage object for each device in topology.
- Return type:
List[Storage]
torchrec.distributed.planner.stats¶
- class torchrec.distributed.planner.stats.EmbeddingStats¶
Bases:
Stats
Stats for a sharding planner execution.
- log(sharding_plan: ShardingPlan, topology: Topology, batch_size: int, storage_reservation: StorageReservation, num_proposals: int, num_plans: int, run_time: float, best_plan: List[ShardingOption], constraints: Optional[Dict[str, ParameterConstraints]] = None, sharders: Optional[List[ModuleSharder[Module]]] = None, debug: bool = True) None ¶
Logs stats for a given sharding plan.
Provides a tabular view of stats for the given sharding plan with per device storage usage (HBM and DDR), perf, input, output, and number/type of shards.
- Parameters:
sharding_plan (ShardingPlan) – sharding plan chosen by the planner.
topology (Topology) – device topology.
batch_size (int) – batch size.
storage_reservation (StorageReservation) – reserves storage for unsharded parts of the model
num_proposals (int) – number of proposals evaluated.
num_plans (int) – number of proposals successfully partitioned.
run_time (float) – time taken to find plan (in seconds).
best_plan (List[ShardingOption]) – plan with expected performance.
constraints (Optional[Dict[str, ParameterConstraints]]) – dict of parameter names to provided ParameterConstraints.
debug (bool) – whether to enable debug mode.
- class torchrec.distributed.planner.stats.NoopEmbeddingStats¶
Bases:
Stats
Noop Stats for a sharding planner execution.
- log(sharding_plan: ShardingPlan, topology: Topology, batch_size: int, storage_reservation: StorageReservation, num_proposals: int, num_plans: int, run_time: float, best_plan: List[ShardingOption], constraints: Optional[Dict[str, ParameterConstraints]] = None, sharders: Optional[List[ModuleSharder[Module]]] = None, debug: bool = True) None ¶
See class description
- torchrec.distributed.planner.stats.round_to_one_sigfig(x: float) str ¶
torchrec.distributed.planner.storage_reservations¶
- class torchrec.distributed.planner.storage_reservations.FixedPercentageStorageReservation(percentage: float)¶
Bases:
StorageReservation
- reserve(topology: Topology, batch_size: int, module: Module, sharders: List[ModuleSharder[Module]], constraints: Optional[Dict[str, ParameterConstraints]] = None) Topology ¶
- class torchrec.distributed.planner.storage_reservations.HeuristicalStorageReservation(percentage: float, parameter_multiplier: float = 6.0, dense_tensor_estimate: Optional[int] = None)¶
Bases:
StorageReservation
Reserves storage for model to be sharded with heuristical calculation. The storage reservation is comprised of dense tensor storage, KJT storage, and an extra percentage of total storage.
- Parameters:
percentage (float) – extra storage percent to reserve that acts as a margin of error beyond heuristic calculation of storage.
parameter_multiplier (float) – heuristic multiplier for total parameter storage.
dense_tensor_estimate (Optional[int]) – storage estimate for dense tensors, uses default heuristic estimate if not provided.
- reserve(topology: Topology, batch_size: int, module: Module, sharders: List[ModuleSharder[Module]], constraints: Optional[Dict[str, ParameterConstraints]] = None) Topology ¶
- class torchrec.distributed.planner.storage_reservations.InferenceStorageReservation(percentage: float, dense_tensor_estimate: Optional[int] = None)¶
Bases:
StorageReservation
Reserves storage for model to be sharded for inference. The storage reservation is comprised of dense tensor storage, KJT storage, and an extra percentage of total storage. Note that when estimating for storage, dense modules are assumed to be on GPUs and replicated across ranks. If this is not the case, please override the estimates with dense_tensor_estimate.
- Parameters:
percentage (float) – extra storage percentage to reserve that acts as a margin of error beyond storage calculation.
dense_tensor_estimate (Optional[int]) – storage estimate for dense tensors, use default heuristic estimate if not provided.
- reserve(topology: Topology, batch_size: int, module: Module, sharders: List[ModuleSharder[Module]], constraints: Optional[Dict[str, ParameterConstraints]] = None) Topology ¶
torchrec.distributed.planner.types¶
- class torchrec.distributed.planner.types.CustomTopologyData(data: Dict[str, List[int]], world_size: int)¶
Bases:
object
Custom device data for individual device in a topology.
- get_data(key: str) List[int] ¶
- has_data(key: str) bool ¶
- supported_fields = ['ddr_cap', 'hbm_cap']¶
- class torchrec.distributed.planner.types.DeviceHardware(rank: int, storage: Storage, perf: Perf)¶
Bases:
object
Representation of a device in a process group. ‘perf’ is an estimation of network, CPU, and storage usages.
- rank: int¶
- class torchrec.distributed.planner.types.Enumerator(topology: Topology, batch_size: int = 512, constraints: Optional[Dict[str, ParameterConstraints]] = None, estimator: Optional[Union[ShardEstimator, List[ShardEstimator]]] = None)¶
Bases:
ABC
Generates all relevant sharding options for given topology, constraints, nn.Module, and sharders.
- abstract enumerate(module: Module, sharders: List[ModuleSharder[Module]]) List[ShardingOption] ¶
See class description.
- abstract populate_estimates(sharding_options: List[ShardingOption]) None ¶
See class description.
- class torchrec.distributed.planner.types.ParameterConstraints(sharding_types: ~typing.Optional[~typing.List[str]] = None, compute_kernels: ~typing.Optional[~typing.List[str]] = None, min_partition: ~typing.Optional[int] = None, pooling_factors: ~typing.List[float] = <factory>, num_poolings: ~typing.Optional[~typing.List[float]] = None, batch_sizes: ~typing.Optional[~typing.List[int]] = None, is_weighted: bool = False, cache_params: ~typing.Optional[~torchrec.distributed.types.CacheParams] = None, enforce_hbm: ~typing.Optional[bool] = None, stochastic_rounding: ~typing.Optional[bool] = None, bounds_check_mode: ~typing.Optional[~fbgemm_gpu.split_table_batched_embeddings_ops_common.BoundsCheckMode] = None, feature_names: ~typing.Optional[~typing.List[str]] = None, output_dtype: ~typing.Optional[~torchrec.types.DataType] = None, device_group: ~typing.Optional[str] = None, key_value_params: ~typing.Optional[~torchrec.distributed.types.KeyValueParams] = None)¶
Bases:
object
Stores user provided constraints around the sharding plan.
If provided, pooling_factors, num_poolings, and batch_sizes must match in length, as per sample.
- sharding_types¶
sharding types allowed for the table. Values of enum ShardingType.
- Type:
Optional[List[str]]
- compute_kernels¶
compute kernels allowed for the table. Values of enum EmbeddingComputeKernel.
- Type:
Optional[List[str]]
- min_partition¶
lower bound for dimension of column wise shards. Planner will search for the column wise shard dimension in the range of [min_partition, embedding_dim], as long as the column wise shard dimension divides embedding_dim and is divisible by 4. Used for column wise sharding only.
- Type:
Optional[int]
- pooling_factors¶
pooling factors for each feature of the table. This is the average number of values each sample has for the feature. Length of pooling_factors should match the number of features.
- Type:
Optional[List[float]]
- num_poolings¶
number of poolings for each feature of the table. Length of num_poolings should match the number of features.
- Type:
OptionalList[float]]
- batch_sizes¶
batch sizes for each feature of the table. Length of batch_sizes should match the number of features.
- Type:
Optional[List[int]]
- is_weighted¶
whether the table is weighted.
- Type:
Optional[bool]
- cache_params¶
cache parameters to be used by this table. These are passed to FBGEMM’s Split TBE kernel.
- Type:
Optional[CacheParams]
- enforce_hbm¶
whether to place all weights/momentums in HBM when using cache.
- Type:
Optional[bool]
- stochastic_rounding¶
whether to do stochastic rounding. This is passed to FBGEMM’s Split TBE kernel. Stochastic rounding is non-deterministic, but important to maintain accuracy in longer term with FP16 embedding tables.
- Type:
Optional[bool]
- bounds_check_mode¶
bounds check mode to be used by FBGEMM’s Split TBE kernel. Bounds check means checking if values (i.e. row id) is within the table size. If row id exceeds table size, it will be set to 0.
- Type:
Optional[BoundsCheckMode]
- feature_names¶
list of feature names for this table.
- Type:
Optional[List[str]]
- output_dtype¶
output dtype to be used by this table. The default is FP32. If not None, the output dtype will also be used by the planner to produce a more balanced plan.
- Type:
Optional[DataType]
- device_group¶
device group to be used by this table. It can be cpu or cuda. This specifies if the table should be placed on a cpu device or a gpu device.
- Type:
Optional[str]
- key_value_params¶
key value params for SSD TBE, either for SSD or PS.
- Type:
Optional[KeyValueParams]
- batch_sizes: Optional[List[int]] = None¶
- bounds_check_mode: Optional[BoundsCheckMode] = None¶
- cache_params: Optional[CacheParams] = None¶
- compute_kernels: Optional[List[str]] = None¶
- device_group: Optional[str] = None¶
- enforce_hbm: Optional[bool] = None¶
- feature_names: Optional[List[str]] = None¶
- is_weighted: bool = False¶
- key_value_params: Optional[KeyValueParams] = None¶
- min_partition: Optional[int] = None¶
- num_poolings: Optional[List[float]] = None¶
- output_dtype: Optional[DataType] = None¶
- pooling_factors: List[float]¶
- sharding_types: Optional[List[str]] = None¶
- stochastic_rounding: Optional[bool] = None¶
- class torchrec.distributed.planner.types.PartitionByType(value)¶
Bases:
Enum
Well-known partition types.
- DEVICE = 'device'¶
- HOST = 'host'¶
- UNIFORM = 'uniform'¶
- class torchrec.distributed.planner.types.Partitioner¶
Bases:
ABC
Partitions shards.
Today we have multiple strategies ie. (Greedy, BLDM, Linear).
- abstract partition(proposal: List[ShardingOption], storage_constraint: Topology) List[ShardingOption] ¶
- class torchrec.distributed.planner.types.Perf(fwd_compute: float, fwd_comms: float, bwd_compute: float, bwd_comms: float, prefetch_compute: float = 0.0)¶
Bases:
object
Representation of the breakdown of the perf estimate a single shard of an embedding table.
- bwd_comms: float¶
- bwd_compute: float¶
- fwd_comms: float¶
- fwd_compute: float¶
- prefetch_compute: float = 0.0¶
- property total: float¶
- class torchrec.distributed.planner.types.PerfModel¶
Bases:
ABC
- abstract rate(plan: List[ShardingOption]) float ¶
- exception torchrec.distributed.planner.types.PlannerError(message: str, error_type: PlannerErrorType = PlannerErrorType.OTHER)¶
Bases:
Exception
- class torchrec.distributed.planner.types.PlannerErrorType(value)¶
Bases:
Enum
Classify PlannerError based on the following cases.
- INSUFFICIENT_STORAGE = 'insufficient_storage'¶
- OTHER = 'other'¶
- PARTITION = 'partition'¶
- STRICT_CONSTRAINTS = 'strict_constraints'¶
- class torchrec.distributed.planner.types.Proposer¶
Bases:
ABC
Prosposes complete lists of sharding options which can be parititioned to generate a plan.
- abstract feedback(partitionable: bool, plan: Optional[List[ShardingOption]] = None, perf_rating: Optional[float] = None, storage_constraint: Optional[Topology] = None) None ¶
- abstract load(search_space: List[ShardingOption], enumerator: Optional[Enumerator] = None) None ¶
- abstract propose() Optional[List[ShardingOption]] ¶
- class torchrec.distributed.planner.types.Shard(size: List[int], offset: List[int], storage: Optional[Storage] = None, perf: Optional[Perf] = None, rank: Optional[int] = None)¶
Bases:
object
Representation of a subset of an embedding table. ‘size’ and ‘offset’ fully determine the tensors in the shard. ‘storage’ is an estimation of how much it takes to store the shard with an estimation ‘perf’.
- offset: List[int]¶
- rank: Optional[int] = None¶
- size: List[int]¶
- class torchrec.distributed.planner.types.ShardEstimator(topology: Topology, constraints: Optional[Dict[str, ParameterConstraints]] = None)¶
Bases:
ABC
Estimates shard perf or storage, requires fully specified sharding options.
- abstract estimate(sharding_options: List[ShardingOption], sharder_map: Optional[Dict[str, ModuleSharder[Module]]] = None) None ¶
- class torchrec.distributed.planner.types.ShardingOption(name: str, tensor: Tensor, module: Tuple[str, Module], input_lengths: List[float], batch_size: int, sharding_type: str, partition_by: str, compute_kernel: str, shards: List[Shard], cache_params: Optional[CacheParams] = None, enforce_hbm: Optional[bool] = None, stochastic_rounding: Optional[bool] = None, bounds_check_mode: Optional[BoundsCheckMode] = None, dependency: Optional[str] = None, is_pooled: Optional[bool] = None, feature_names: Optional[List[str]] = None, output_dtype: Optional[DataType] = None, key_value_params: Optional[KeyValueParams] = None)¶
Bases:
object
One way of sharding an embedding table. In the enumerator, we generate multiple sharding options per table, but in the planner output, there should only be one sharding option per table.
- name¶
name of the sharding option.
- Type:
str
- tensor¶
tensor of the sharding option. Usually on meta device.
- Type:
torch.Tensor
- module¶
module and its fqn that contains the table.
- Type:
Tuple[str, nn.Module]
- input_lengths¶
list of pooling factors of the feature for the table.
- Type:
List[float]
- batch_size¶
batch size of training / eval job.
- Type:
int
- sharding_type¶
sharding type of the table. Value of enum ShardingType.
- Type:
str
- compute_kernel¶
compute kernel of the table. Value of enum EmbeddingComputeKernel.
- Type:
str
- cache_params¶
cache parameters to be used by this table. These are passed to FBGEMM’s Split TBE kernel.
- Type:
Optional[CacheParams]
- enforce_hbm¶
whether to place all weights/momentums in HBM when using cache.
- Type:
Optional[bool]
- stochastic_rounding¶
whether to do stochastic rounding. This is passed to FBGEMM’s Split TBE kernel. Stochastic rounding is non-deterministic, but important to maintain accuracy in longer term with FP16 embedding tables.
- Type:
Optional[bool]
- bounds_check_mode¶
bounds check mode to be used by FBGEMM’s Split TBE kernel. Bounds check means checking if values (i.e. row id) is within the table size. If row id exceeds table size, it will be set to 0.
- Type:
Optional[BoundsCheckMode]
- dependency¶
dependency of the table. Related to Embedding tower.
- Type:
Optional[str]
- is_pooled¶
whether the table is pooled. Pooling can be sum pooling or mean pooling. Unpooled tables are also known as sequence embeddings.
- Type:
Optional[bool]
- feature_names¶
list of feature names for this table.
- Type:
Optional[List[str]]
- output_dtype¶
output dtype to be used by this table. The default is FP32. If not None, the output dtype will also be used by the planner to produce a more balanced plan.
- Type:
Optional[DataType]
- key_value_params¶
Params for SSD TBE, either for SSD or PS.
- Type:
Optional[KeyValueParams]
- property cache_load_factor: Optional[float]¶
- property fqn: str¶
- property is_pooled: bool¶
- property module: Tuple[str, Module]¶
- static module_pooled(module: Module, sharding_option_name: str) bool ¶
Determine if module pools output (e.g. EmbeddingBag) or uses unpooled/sequential output.
- property num_inputs: int¶
- property num_shards: int¶
- property path: str¶
- property tensor: Tensor¶
- property total_perf: float¶
- class torchrec.distributed.planner.types.Stats¶
Bases:
ABC
Logs statistics related to the sharding plan.
- abstract log(sharding_plan: ShardingPlan, topology: Topology, batch_size: int, storage_reservation: StorageReservation, num_proposals: int, num_plans: int, run_time: float, best_plan: List[ShardingOption], constraints: Optional[Dict[str, ParameterConstraints]] = None, sharders: Optional[List[ModuleSharder[Module]]] = None, debug: bool = False) None ¶
See class description
- class torchrec.distributed.planner.types.Storage(hbm: int, ddr: int)¶
Bases:
object
Representation of the storage capacities of a hardware used in training.
- ddr: int¶
- hbm: int¶
- class torchrec.distributed.planner.types.StorageReservation¶
Bases:
ABC
Reserves storage space for non-sharded parts of the model.
- abstract reserve(topology: Topology, batch_size: int, module: Module, sharders: List[ModuleSharder[Module]], constraints: Optional[Dict[str, ParameterConstraints]] = None) Topology ¶
- class torchrec.distributed.planner.types.Topology(world_size: int, compute_device: str, hbm_cap: Optional[int] = None, ddr_cap: Optional[int] = None, local_world_size: Optional[int] = None, hbm_mem_bw: float = 963146416.128, ddr_mem_bw: float = 54760833.024, intra_host_bw: float = 644245094.4, inter_host_bw: float = 13421772.8, bwd_compute_multiplier: float = 2, custom_topology_data: Optional[CustomTopologyData] = None, weighted_feature_bwd_compute_multiplier: float = 1, uneven_sharding_perf_multiplier: float = 1.0)¶
Bases:
object
- property bwd_compute_multiplier: float¶
- property compute_device: str¶
- property ddr_mem_bw: float¶
- property devices: List[DeviceHardware]¶
- property hbm_mem_bw: float¶
- property inter_host_bw: float¶
- property intra_host_bw: float¶
- property local_world_size: int¶
- property uneven_sharding_perf_multiplier: float¶
- property weighted_feature_bwd_compute_multiplier: float¶
- property world_size: int¶
torchrec.distributed.planner.utils¶
- class torchrec.distributed.planner.utils.BinarySearchPredicate(A: int, B: int, tolerance: int)¶
Bases:
object
Generates values of X between A & B to invoke on an external predicate F(X) to discover the largest X for which F(X) is true. Uses binary search to minimize the number of invocations of F. Assumes F is a step function, i.e. if F(X) is false, there is no point trying F(X+1).
- next(prior_result: bool) Optional[int] ¶
next() returns the next value to probe, given the result of the prior probe. The first time next() is invoked the prior_result is ignored. Returns None if entire range explored or threshold reached.
- class torchrec.distributed.planner.utils.LuusJaakolaSearch(A: float, B: float, max_iterations: int, seed: int = 42, left_cost: Optional[float] = None)¶
Bases:
object
Implements a clamped variant of Luus Jaakola search.
See https://en.wikipedia.org/wiki/Luus-Jaakola.
- best() Tuple[float, float] ¶
Return the best position so far, and its associated cost.
- clamp(x: float) float ¶
Clamp x into range [left, right]
- next(fy: float) Optional[float] ¶
Return the next probe point ‘y’ to evaluate, given the previous result.
The first time around fy is ignored. Subsequent invocations should provide the result of evaluating the function being minimized, i.e. f(y).
Returns None when the maximum number of iterations has been reached.
- shrink_right(B: float) None ¶
Shrink right boundary given [B,infinity) -> infinity
- uniform(A: float, B: float) float ¶
Return a random uniform position in range [A,B].
- torchrec.distributed.planner.utils.bytes_to_gb(num_bytes: int) float ¶
- torchrec.distributed.planner.utils.bytes_to_mb(num_bytes: Union[float, int]) float ¶
- torchrec.distributed.planner.utils.gb_to_bytes(gb: float) int ¶
- torchrec.distributed.planner.utils.placement(compute_device: str, rank: int, local_size: int) str ¶
Returns placement, formatted as string
- torchrec.distributed.planner.utils.prod(iterable: Iterable[int]) int ¶
- torchrec.distributed.planner.utils.reset_shard_rank(proposal: List[ShardingOption]) None ¶
- torchrec.distributed.planner.utils.sharder_name(t: Type[Any]) str ¶