1# mypy: allow-untyped-defs 2from numbers import Number 3 4import torch 5from torch.distributions import constraints 6from torch.distributions.distribution import Distribution 7from torch.distributions.utils import ( 8 broadcast_all, 9 lazy_property, 10 logits_to_probs, 11 probs_to_logits, 12) 13from torch.nn.functional import binary_cross_entropy_with_logits 14 15 16__all__ = ["Geometric"] 17 18 19class Geometric(Distribution): 20 r""" 21 Creates a Geometric distribution parameterized by :attr:`probs`, 22 where :attr:`probs` is the probability of success of Bernoulli trials. 23 24 .. math:: 25 26 P(X=k) = (1-p)^{k} p, k = 0, 1, ... 27 28 .. note:: 29 :func:`torch.distributions.geometric.Geometric` :math:`(k+1)`-th trial is the first success 30 hence draws samples in :math:`\{0, 1, \ldots\}`, whereas 31 :func:`torch.Tensor.geometric_` `k`-th trial is the first success hence draws samples in :math:`\{1, 2, \ldots\}`. 32 33 Example:: 34 35 >>> # xdoctest: +IGNORE_WANT("non-deterministic") 36 >>> m = Geometric(torch.tensor([0.3])) 37 >>> m.sample() # underlying Bernoulli has 30% chance 1; 70% chance 0 38 tensor([ 2.]) 39 40 Args: 41 probs (Number, Tensor): the probability of sampling `1`. Must be in range (0, 1] 42 logits (Number, Tensor): the log-odds of sampling `1`. 43 """ 44 arg_constraints = {"probs": constraints.unit_interval, "logits": constraints.real} 45 support = constraints.nonnegative_integer 46 47 def __init__(self, probs=None, logits=None, validate_args=None): 48 if (probs is None) == (logits is None): 49 raise ValueError( 50 "Either `probs` or `logits` must be specified, but not both." 51 ) 52 if probs is not None: 53 (self.probs,) = broadcast_all(probs) 54 else: 55 (self.logits,) = broadcast_all(logits) 56 probs_or_logits = probs if probs is not None else logits 57 if isinstance(probs_or_logits, Number): 58 batch_shape = torch.Size() 59 else: 60 batch_shape = probs_or_logits.size() 61 super().__init__(batch_shape, validate_args=validate_args) 62 if self._validate_args and probs is not None: 63 # Add an extra check beyond unit_interval 64 value = self.probs 65 valid = value > 0 66 if not valid.all(): 67 invalid_value = value.data[~valid] 68 raise ValueError( 69 "Expected parameter probs " 70 f"({type(value).__name__} of shape {tuple(value.shape)}) " 71 f"of distribution {repr(self)} " 72 f"to be positive but found invalid values:\n{invalid_value}" 73 ) 74 75 def expand(self, batch_shape, _instance=None): 76 new = self._get_checked_instance(Geometric, _instance) 77 batch_shape = torch.Size(batch_shape) 78 if "probs" in self.__dict__: 79 new.probs = self.probs.expand(batch_shape) 80 if "logits" in self.__dict__: 81 new.logits = self.logits.expand(batch_shape) 82 super(Geometric, new).__init__(batch_shape, validate_args=False) 83 new._validate_args = self._validate_args 84 return new 85 86 @property 87 def mean(self): 88 return 1.0 / self.probs - 1.0 89 90 @property 91 def mode(self): 92 return torch.zeros_like(self.probs) 93 94 @property 95 def variance(self): 96 return (1.0 / self.probs - 1.0) / self.probs 97 98 @lazy_property 99 def logits(self): 100 return probs_to_logits(self.probs, is_binary=True) 101 102 @lazy_property 103 def probs(self): 104 return logits_to_probs(self.logits, is_binary=True) 105 106 def sample(self, sample_shape=torch.Size()): 107 shape = self._extended_shape(sample_shape) 108 tiny = torch.finfo(self.probs.dtype).tiny 109 with torch.no_grad(): 110 if torch._C._get_tracing_state(): 111 # [JIT WORKAROUND] lack of support for .uniform_() 112 u = torch.rand(shape, dtype=self.probs.dtype, device=self.probs.device) 113 u = u.clamp(min=tiny) 114 else: 115 u = self.probs.new(shape).uniform_(tiny, 1) 116 return (u.log() / (-self.probs).log1p()).floor() 117 118 def log_prob(self, value): 119 if self._validate_args: 120 self._validate_sample(value) 121 value, probs = broadcast_all(value, self.probs) 122 probs = probs.clone(memory_format=torch.contiguous_format) 123 probs[(probs == 1) & (value == 0)] = 0 124 return value * (-probs).log1p() + self.probs.log() 125 126 def entropy(self): 127 return ( 128 binary_cross_entropy_with_logits(self.logits, self.probs, reduction="none") 129 / self.probs 130 ) 131