1# mypy: allow-untyped-defs 2import torch 3from torch import nan 4from torch.distributions import constraints 5from torch.distributions.distribution import Distribution 6from torch.distributions.utils import lazy_property, logits_to_probs, probs_to_logits 7 8 9__all__ = ["Categorical"] 10 11 12class Categorical(Distribution): 13 r""" 14 Creates a categorical distribution parameterized by either :attr:`probs` or 15 :attr:`logits` (but not both). 16 17 .. note:: 18 It is equivalent to the distribution that :func:`torch.multinomial` 19 samples from. 20 21 Samples are integers from :math:`\{0, \ldots, K-1\}` where `K` is ``probs.size(-1)``. 22 23 If `probs` is 1-dimensional with length-`K`, each element is the relative probability 24 of sampling the class at that index. 25 26 If `probs` is N-dimensional, the first N-1 dimensions are treated as a batch of 27 relative probability vectors. 28 29 .. note:: The `probs` argument must be non-negative, finite and have a non-zero sum, 30 and it will be normalized to sum to 1 along the last dimension. :attr:`probs` 31 will return this normalized value. 32 The `logits` argument will be interpreted as unnormalized log probabilities 33 and can therefore be any real number. It will likewise be normalized so that 34 the resulting probabilities sum to 1 along the last dimension. :attr:`logits` 35 will return this normalized value. 36 37 See also: :func:`torch.multinomial` 38 39 Example:: 40 41 >>> # xdoctest: +IGNORE_WANT("non-deterministic") 42 >>> m = Categorical(torch.tensor([ 0.25, 0.25, 0.25, 0.25 ])) 43 >>> m.sample() # equal probability of 0, 1, 2, 3 44 tensor(3) 45 46 Args: 47 probs (Tensor): event probabilities 48 logits (Tensor): event log probabilities (unnormalized) 49 """ 50 arg_constraints = {"probs": constraints.simplex, "logits": constraints.real_vector} 51 has_enumerate_support = True 52 53 def __init__(self, probs=None, logits=None, validate_args=None): 54 if (probs is None) == (logits is None): 55 raise ValueError( 56 "Either `probs` or `logits` must be specified, but not both." 57 ) 58 if probs is not None: 59 if probs.dim() < 1: 60 raise ValueError("`probs` parameter must be at least one-dimensional.") 61 self.probs = probs / probs.sum(-1, keepdim=True) 62 else: 63 if logits.dim() < 1: 64 raise ValueError("`logits` parameter must be at least one-dimensional.") 65 # Normalize 66 self.logits = logits - logits.logsumexp(dim=-1, keepdim=True) 67 self._param = self.probs if probs is not None else self.logits 68 self._num_events = self._param.size()[-1] 69 batch_shape = ( 70 self._param.size()[:-1] if self._param.ndimension() > 1 else torch.Size() 71 ) 72 super().__init__(batch_shape, validate_args=validate_args) 73 74 def expand(self, batch_shape, _instance=None): 75 new = self._get_checked_instance(Categorical, _instance) 76 batch_shape = torch.Size(batch_shape) 77 param_shape = batch_shape + torch.Size((self._num_events,)) 78 if "probs" in self.__dict__: 79 new.probs = self.probs.expand(param_shape) 80 new._param = new.probs 81 if "logits" in self.__dict__: 82 new.logits = self.logits.expand(param_shape) 83 new._param = new.logits 84 new._num_events = self._num_events 85 super(Categorical, new).__init__(batch_shape, validate_args=False) 86 new._validate_args = self._validate_args 87 return new 88 89 def _new(self, *args, **kwargs): 90 return self._param.new(*args, **kwargs) 91 92 @constraints.dependent_property(is_discrete=True, event_dim=0) 93 def support(self): 94 return constraints.integer_interval(0, self._num_events - 1) 95 96 @lazy_property 97 def logits(self): 98 return probs_to_logits(self.probs) 99 100 @lazy_property 101 def probs(self): 102 return logits_to_probs(self.logits) 103 104 @property 105 def param_shape(self): 106 return self._param.size() 107 108 @property 109 def mean(self): 110 return torch.full( 111 self._extended_shape(), 112 nan, 113 dtype=self.probs.dtype, 114 device=self.probs.device, 115 ) 116 117 @property 118 def mode(self): 119 return self.probs.argmax(axis=-1) 120 121 @property 122 def variance(self): 123 return torch.full( 124 self._extended_shape(), 125 nan, 126 dtype=self.probs.dtype, 127 device=self.probs.device, 128 ) 129 130 def sample(self, sample_shape=torch.Size()): 131 if not isinstance(sample_shape, torch.Size): 132 sample_shape = torch.Size(sample_shape) 133 probs_2d = self.probs.reshape(-1, self._num_events) 134 samples_2d = torch.multinomial(probs_2d, sample_shape.numel(), True).T 135 return samples_2d.reshape(self._extended_shape(sample_shape)) 136 137 def log_prob(self, value): 138 if self._validate_args: 139 self._validate_sample(value) 140 value = value.long().unsqueeze(-1) 141 value, log_pmf = torch.broadcast_tensors(value, self.logits) 142 value = value[..., :1] 143 return log_pmf.gather(-1, value).squeeze(-1) 144 145 def entropy(self): 146 min_real = torch.finfo(self.logits.dtype).min 147 logits = torch.clamp(self.logits, min=min_real) 148 p_log_p = logits * self.probs 149 return -p_log_p.sum(-1) 150 151 def enumerate_support(self, expand=True): 152 num_events = self._num_events 153 values = torch.arange(num_events, dtype=torch.long, device=self._param.device) 154 values = values.view((-1,) + (1,) * len(self._batch_shape)) 155 if expand: 156 values = values.expand((-1,) + self._batch_shape) 157 return values 158