xref: /aosp_15_r20/external/tensorflow/tensorflow/python/training/input.py (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15
16"""Input pipeline.
17
18Please see the [reading data
19how-to](https://tensorflow.org/api_guides/python/reading_data)
20for context.
21"""
22
23
24from tensorflow.python.eager import context
25from tensorflow.python.framework import constant_op
26from tensorflow.python.framework import dtypes
27from tensorflow.python.framework import ops
28from tensorflow.python.framework import sparse_tensor
29from tensorflow.python.framework import tensor_shape
30from tensorflow.python.layers import utils
31from tensorflow.python.ops import array_ops
32from tensorflow.python.ops import control_flow_ops
33from tensorflow.python.ops import data_flow_ops
34from tensorflow.python.ops import io_ops
35from tensorflow.python.ops import math_ops
36from tensorflow.python.ops import random_ops
37from tensorflow.python.ops import sparse_ops
38from tensorflow.python.ops import variable_scope as vs
39from tensorflow.python.summary import summary
40from tensorflow.python.training import queue_runner
41from tensorflow.python.util import deprecation
42from tensorflow.python.util.compat import collections_abc
43from tensorflow.python.util.tf_export import tf_export
44
45
46# pylint: disable=protected-access
47_store_sparse = sparse_ops._add_sparse_to_tensors_map
48_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map
49_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map
50# pylint: enable=protected-access
51
52
53@tf_export(
54    "io.match_filenames_once",
55    v1=["io.match_filenames_once", "train.match_filenames_once"])
56@deprecation.deprecated_endpoints("train.match_filenames_once")
57def match_filenames_once(pattern, name=None):
58  """Save the list of files matching pattern, so it is only computed once.
59
60  NOTE: The order of the files returned is deterministic.
61
62  Args:
63    pattern: A file pattern (glob), or 1D tensor of file patterns.
64    name: A name for the operations (optional).
65
66  Returns:
67    A variable that is initialized to the list of files matching the pattern(s).
68  """
69  with ops.name_scope(name, "matching_filenames", [pattern]) as name:
70    return vs.variable(
71        name=name, initial_value=io_ops.matching_files(pattern),
72        trainable=False, validate_shape=False,
73        collections=[ops.GraphKeys.LOCAL_VARIABLES])
74
75
76@tf_export(v1=["train.limit_epochs"])
77@deprecation.deprecated(
78    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
79    "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.")
80def limit_epochs(tensor, num_epochs=None, name=None):
81  """Returns tensor `num_epochs` times and then raises an `OutOfRange` error.
82
83  Note: creates local counter `epochs`. Use `local_variables_initializer()` to
84  initialize local variables.
85
86  Args:
87    tensor: Any `Tensor`.
88    num_epochs: A positive integer (optional).  If specified, limits the number
89      of steps the output tensor may be evaluated.
90    name: A name for the operations (optional).
91
92  Returns:
93    tensor or `OutOfRange`.
94
95  Raises:
96    ValueError: if `num_epochs` is invalid.
97  """
98  if num_epochs is None:
99    return tensor
100  if num_epochs <= 0:
101    raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
102  with ops.name_scope(name, "limit_epochs", [tensor]) as name:
103    zero64 = constant_op.constant(0, dtype=dtypes.int64)
104    epochs = vs.variable(
105        zero64, name="epochs", trainable=False,
106        collections=[ops.GraphKeys.LOCAL_VARIABLES])
107    counter = epochs.count_up_to(num_epochs)
108    with ops.control_dependencies([counter]):
109      return array_ops.identity(tensor, name=name)
110
111
112@tf_export(v1=["train.input_producer"])
113@deprecation.deprecated(
114    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
115    "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle"
116    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
117    "`shuffle=False`, omit the `.shuffle(...)`.")
118def input_producer(input_tensor,
119                   element_shape=None,
120                   num_epochs=None,
121                   shuffle=True,
122                   seed=None,
123                   capacity=32,
124                   shared_name=None,
125                   summary_name=None,
126                   name=None,
127                   cancel_op=None):
128  """Output the rows of `input_tensor` to a queue for an input pipeline.
129
130  Note: if `num_epochs` is not `None`, this function creates local counter
131  `epochs`. Use `local_variables_initializer()` to initialize local variables.
132
133  Args:
134    input_tensor: A tensor with the rows to produce. Must be at least
135      one-dimensional. Must either have a fully-defined shape, or
136      `element_shape` must be defined.
137    element_shape: (Optional.) A `TensorShape` representing the shape of a
138      row of `input_tensor`, if it cannot be inferred.
139    num_epochs: (Optional.) An integer. If specified `input_producer` produces
140      each row of `input_tensor` `num_epochs` times before generating an
141      `OutOfRange` error. If not specified, `input_producer` can cycle through
142      the rows of `input_tensor` an unlimited number of times.
143    shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled
144      within each epoch.
145    seed: (Optional.) An integer. The seed to use if `shuffle` is true.
146    capacity: (Optional.) The capacity of the queue to be used for buffering
147      the input.
148    shared_name: (Optional.) If set, this queue will be shared under the given
149      name across multiple sessions.
150    summary_name: (Optional.) If set, a scalar summary for the current queue
151      size will be generated, using this name as part of the tag.
152    name: (Optional.) A name for queue.
153    cancel_op: (Optional.) Cancel op for the queue
154
155  Returns:
156    A queue with the output rows.  A `QueueRunner` for the queue is
157    added to the current `QUEUE_RUNNER` collection of the current
158    graph.
159
160  Raises:
161    ValueError: If the shape of the input cannot be inferred from the arguments.
162    RuntimeError: If called with eager execution enabled.
163
164  @compatibility(eager)
165  Input pipelines based on Queues are not supported when eager execution is
166  enabled. Please use the `tf.data` API to ingest data under eager execution.
167  @end_compatibility
168  """
169  if context.executing_eagerly():
170    raise RuntimeError(
171        "Input pipelines based on Queues are not supported when eager execution"
172        " is enabled. Please use tf.data to ingest data into your model"
173        " instead.")
174  with ops.name_scope(name, "input_producer", [input_tensor]):
175    input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
176    element_shape = input_tensor.shape[1:].merge_with(element_shape)
177    if not element_shape.is_fully_defined():
178      raise ValueError("Either `input_tensor` must have a fully defined shape "
179                       "or `element_shape` must be specified")
180
181    if shuffle:
182      input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
183
184    input_tensor = limit_epochs(input_tensor, num_epochs)
185
186    q = data_flow_ops.FIFOQueue(capacity=capacity,
187                                dtypes=[input_tensor.dtype.base_dtype],
188                                shapes=[element_shape],
189                                shared_name=shared_name, name=name)
190    enq = q.enqueue_many([input_tensor])
191    queue_runner.add_queue_runner(
192        queue_runner.QueueRunner(
193            q, [enq], cancel_op=cancel_op))
194    if summary_name is not None:
195      summary.scalar(summary_name,
196                     math_ops.cast(q.size(), dtypes.float32) * (1. / capacity))
197    return q
198
199
200@tf_export(v1=["train.string_input_producer"])
201@deprecation.deprecated(
202    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
203    "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle"
204    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
205    "`shuffle=False`, omit the `.shuffle(...)`.")
206def string_input_producer(string_tensor,
207                          num_epochs=None,
208                          shuffle=True,
209                          seed=None,
210                          capacity=32,
211                          shared_name=None,
212                          name=None,
213                          cancel_op=None):
214  """Output strings (e.g. filenames) to a queue for an input pipeline.
215
216  Note: if `num_epochs` is not `None`, this function creates local counter
217  `epochs`. Use `local_variables_initializer()` to initialize local variables.
218
219  Args:
220    string_tensor: A 1-D string tensor with the strings to produce.
221    num_epochs: An integer (optional). If specified, `string_input_producer`
222      produces each string from `string_tensor` `num_epochs` times before
223      generating an `OutOfRange` error. If not specified,
224      `string_input_producer` can cycle through the strings in `string_tensor`
225      an unlimited number of times.
226    shuffle: Boolean. If true, the strings are randomly shuffled within each
227      epoch.
228    seed: An integer (optional). Seed used if shuffle == True.
229    capacity: An integer. Sets the queue capacity.
230    shared_name: (optional). If set, this queue will be shared under the given
231      name across multiple sessions. All sessions open to the device which has
232      this queue will be able to access it via the shared_name. Using this in
233      a distributed setting means each name will only be seen by one of the
234      sessions which has access to this operation.
235    name: A name for the operations (optional).
236    cancel_op: Cancel op for the queue (optional).
237
238  Returns:
239    A queue with the output strings.  A `QueueRunner` for the Queue
240    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
241
242  Raises:
243    ValueError: If the string_tensor is a null Python list.  At runtime,
244    will fail with an assertion if string_tensor becomes a null tensor.
245
246  @compatibility(eager)
247  Input pipelines based on Queues are not supported when eager execution is
248  enabled. Please use the `tf.data` API to ingest data under eager execution.
249  @end_compatibility
250  """
251  not_null_err = "string_input_producer requires a non-null input tensor"
252  if not isinstance(string_tensor, ops.Tensor) and not string_tensor:
253    raise ValueError(not_null_err)
254
255  with ops.name_scope(name, "input_producer", [string_tensor]) as name:
256    string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string)
257    with ops.control_dependencies([
258        control_flow_ops.Assert(
259            math_ops.greater(array_ops.size(string_tensor), 0),
260            [not_null_err])]):
261      string_tensor = array_ops.identity(string_tensor)
262    return input_producer(
263        input_tensor=string_tensor,
264        element_shape=[],
265        num_epochs=num_epochs,
266        shuffle=shuffle,
267        seed=seed,
268        capacity=capacity,
269        shared_name=shared_name,
270        name=name,
271        summary_name="fraction_of_%d_full" % capacity,
272        cancel_op=cancel_op)
273
274
275@tf_export(v1=["train.range_input_producer"])
276@deprecation.deprecated(
277    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
278    "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If "
279    "`shuffle=False`, omit the `.shuffle(...)`.")
280def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None,
281                         capacity=32, shared_name=None, name=None):
282  """Produces the integers from 0 to limit-1 in a queue.
283
284  Note: if `num_epochs` is not `None`, this function creates local counter
285  `epochs`. Use `local_variables_initializer()` to initialize local variables.
286
287  Args:
288    limit: An int32 scalar tensor.
289    num_epochs: An integer (optional). If specified, `range_input_producer`
290      produces each integer `num_epochs` times before generating an
291      OutOfRange error. If not specified, `range_input_producer` can cycle
292      through the integers an unlimited number of times.
293    shuffle: Boolean. If true, the integers are randomly shuffled within each
294      epoch.
295    seed: An integer (optional). Seed used if shuffle == True.
296    capacity: An integer. Sets the queue capacity.
297    shared_name: (optional). If set, this queue will be shared under the given
298      name across multiple sessions.
299    name: A name for the operations (optional).
300
301  Returns:
302    A Queue with the output integers.  A `QueueRunner` for the Queue
303    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
304
305  @compatibility(eager)
306  Input pipelines based on Queues are not supported when eager execution is
307  enabled. Please use the `tf.data` API to ingest data under eager execution.
308  @end_compatibility
309  """
310  with ops.name_scope(name, "input_producer", [limit]) as name:
311    range_tensor = math_ops.range(limit)
312    return input_producer(
313        range_tensor, [], num_epochs, shuffle, seed, capacity,
314        shared_name, "fraction_of_%d_full" % capacity, name)
315
316
317@tf_export(v1=["train.slice_input_producer"])
318@deprecation.deprecated(
319    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
320    "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle"
321    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
322    "`shuffle=False`, omit the `.shuffle(...)`.")
323def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
324                         capacity=32, shared_name=None, name=None):
325  """Produces a slice of each `Tensor` in `tensor_list`.
326
327  Implemented using a Queue -- a `QueueRunner` for the Queue
328  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
329
330  Args:
331    tensor_list: A list of `Tensor` objects. Every `Tensor` in
332      `tensor_list` must have the same size in the first dimension.
333    num_epochs: An integer (optional). If specified, `slice_input_producer`
334      produces each slice `num_epochs` times before generating
335      an `OutOfRange` error. If not specified, `slice_input_producer` can cycle
336      through the slices an unlimited number of times.
337    shuffle: Boolean. If true, the integers are randomly shuffled within each
338      epoch.
339    seed: An integer (optional). Seed used if shuffle == True.
340    capacity: An integer. Sets the queue capacity.
341    shared_name: (optional). If set, this queue will be shared under the given
342      name across multiple sessions.
343    name: A name for the operations (optional).
344
345  Returns:
346    A list of tensors, one for each element of `tensor_list`.  If the tensor
347    in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output
348    tensor will have shape `[a, b, ..., z]`.
349
350  Raises:
351    ValueError: if `slice_input_producer` produces nothing from `tensor_list`.
352
353  @compatibility(eager)
354  Input pipelines based on Queues are not supported when eager execution is
355  enabled. Please use the `tf.data` API to ingest data under eager execution.
356  @end_compatibility
357  """
358  with ops.name_scope(name, "input_producer", tensor_list):
359    tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
360    if not tensor_list:
361      raise ValueError(
362          "Expected at least one tensor in slice_input_producer().")
363    range_size = array_ops.shape(tensor_list[0])[0]
364    # TODO(josh11b): Add an assertion that the first dimension of
365    # everything in TensorList matches. Maybe just check the inferred shapes?
366    queue = range_input_producer(range_size, num_epochs=num_epochs,
367                                 shuffle=shuffle, seed=seed, capacity=capacity,
368                                 shared_name=shared_name)
369    index = queue.dequeue()
370    output = [array_ops.gather(t, index) for t in tensor_list]
371    return output
372
373
374# Helpers for the batching functions ------------------------------------------
375
376
377def _flatten(tensor_list_list):
378  return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
379
380
381class _SparseMetaData:
382  """Store information about the Tensor: Is it sparse?, map_op, and rank."""
383
384  def __init__(self, sparse, map_op, rank):
385    """Create the metadata.
386
387    Args:
388      sparse: Python boolean.
389      map_op: The `Operation` that created the `SparseTensorsMap` in question.
390        This Op contains information about the underlying Map object and the
391        dtype of the original data.
392      rank: The statically known rank of the `SparseTensor`.
393    """
394    self._sparse = sparse
395    self._map_op = map_op
396    self._rank = tensor_shape.as_dimension(rank)
397
398  def __eq__(self, other):
399    if self.sparse != other.sparse:
400      return False
401    if not self.sparse:
402      return True
403    # If map_ops are not the same, the data source is not the same.
404    if (self.map_op is not None) != (other.map_op is not None):
405      return False
406    if self.map_op != other.map_op:
407      return False
408    if not self.rank.is_compatible_with(other.rank):
409      return False
410    return True
411
412  def __ne__(self, other):
413    return not self.__eq__(other)
414
415  def __str__(self):
416    return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name,
417                                             self.rank)
418
419  def merge_with(self, other):
420    if self != other:
421      raise ValueError("SparseMetaData objects are incompatible: %s vs. %s"
422                       % (self, other))
423    if self.sparse:
424      self.rank.merge_with(other.rank)
425    return self
426
427  @property
428  def map_op(self):
429    return self._map_op
430
431  @property
432  def sparse(self):
433    return self._sparse
434
435  @property
436  def rank(self):
437    return self._rank
438
439
440def _as_tensor_list(tensors):
441  if isinstance(tensors, dict):
442    return [tensors[k] for k in sorted(tensors, key=str)]
443  else:
444    return tensors
445
446
447def _as_tensor_list_list(tensors_list):
448  if not tensors_list:
449    raise ValueError("Expected at least one set of tensors")
450  if isinstance(tensors_list[0], dict):
451    expected_keys = set(tensors_list[0].keys())
452    for tensors in tensors_list[1:]:
453      if set(tensors.keys()) != expected_keys:
454        raise ValueError("All dictionaries in tensors_list must have "
455                         "the same keys")
456    return [_as_tensor_list(tensors) for tensors in tensors_list]
457  else:
458    return tensors_list
459
460
461def _as_original_type(original_tensors, tensor_list):
462  if isinstance(original_tensors, dict):
463    if len(original_tensors) == 1:
464      # tensor_list is bogusly returned as a single tensor if only one tensor
465      # was enqueued.  Make it a list again.  See b/28117485.
466      tensor_list = [tensor_list]
467    return {k: tensor_list[i]
468            for i, k in enumerate(sorted(original_tensors, key=str))}
469  else:
470    return tensor_list
471
472
473def _store_sparse_tensors(tensor_list, enqueue_many, keep_input,
474                          shared_map_ops=None):
475  """Store SparseTensors for feeding into batch, etc.
476
477  If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects
478  are reused (shared).  This argument is useful for, e.g., `batch_join`
479  where multiple enqueue operations write to the same Queue component,
480  and another (dequeue) thread reads from that same location and must then
481  restore the associated `SparseTensor` objects.  In this case, the sparse
482  restore must have a single `SparseTensorMap` from which to read out the
483  handles; so a single `SparseTensorMap` must be shared for storing
484  across the multiple enqueue operations.  This sharing is performed by
485  calling `_store_sparse_tensors` the first time with `shared_map_ops=None`,
486  and then in subsequent times with this value set to the list of `Operation`
487  objects created in the first call.
488
489  Args:
490    tensor_list: List of `Tensor` and `SparseTensor` objects.
491    enqueue_many: Python `Boolean`.
492    keep_input: Must be a scalar bool Tensor (not a Python bool). If False,
493      don't store.
494    shared_map_ops: (optional) List of `Operation` objects from a previous
495      call to `_store_sparse_tensors`.  If not `None`, the op types should be
496      one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the
497      locations corresponding to `SparseTensors` in `tensor_list`.
498
499  Returns:
500    A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list
501    of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list`
502    is a list of the same length of `_SparseMetaData` objects.
503  """
504  maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list)
505
506  def _sparse_meta_data(t, storing_op, map_op):
507    if not isinstance(t, sparse_tensor.SparseTensor):
508      return _SparseMetaData(False, None, None)
509    rank = t.dense_shape.shape.with_rank(1).dims[0]
510    if enqueue_many:
511      rank -= 1
512    # If a shared map_op was provided, use that. Otherwise use the name of
513    # the operation used to store the SparseTensor.
514    return _SparseMetaData(
515        sparse=True, map_op=map_op or storing_op, rank=rank)
516
517  def _maybe_store(t, shared_map_op):
518    """Store Sparse tensor, if necessary."""
519    if not isinstance(t, sparse_tensor.SparseTensor):
520      return t
521    map_op_name = shared_map_op.name if shared_map_op else None
522    def _maybe_store_sparse(t, map_op_name, keep_input):
523      """Conditionally store a single sparse Tensor."""
524      return utils.smart_cond(
525          keep_input,
526          lambda: _store_sparse(t, shared_name=map_op_name),
527          lambda: constant_op.constant(-1, dtypes.int64))
528    def _maybe_store_many_sparse(t, map_op_name, keep_input):
529      """Conditionally store multiple sparse Tensors."""
530      out_tensor = utils.smart_cond(
531          keep_input,
532          lambda: _store_many_sparse(t, shared_name=map_op_name),
533          lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64))
534      out_tensor.set_shape([None])  # necessary when t.ndims is unknown
535      return out_tensor
536    def _sparse_values_to_keep(t, keep_input):
537      """Convert a per-row `keep_input` vector to a per-value one."""
538      # Get the rows of every value in the sparse Tensor.
539      row_values = t.indices[:, 0]
540      # The value should be kept iff the row should be kept.
541      return array_ops.gather(keep_input, row_values)
542    if keep_input.shape.ndims == 1:
543      t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input))
544      store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name)
545    elif enqueue_many:
546      store_f = _maybe_store_many_sparse
547    else:
548      store_f = _maybe_store_sparse
549    return store_f(t, map_op_name, keep_input)
550
551  stored_list = [
552      _maybe_store(t, shared_map_op) for t, shared_map_op
553      in zip(tensor_list, maybe_shared_map_ops)]
554  # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`,
555  # we can't just get the Op of the resulting tensor.
556  def _sparse_op(stored):
557    for input_tensor in stored.op.inputs:
558      if input_tensor.op.type in ("AddSparseToTensorsMap",
559                                  "AddManySparseToTensorsMap"):
560        return input_tensor.op
561    # If there was no sparse input, then the original stored Tensor wasn't
562    # sparse and we can just return the original Tensor's Op.
563    return stored.op
564  sparse_info_list = [
565      _sparse_meta_data(t, _sparse_op(stored), shared_map_op)
566      for t, stored, shared_map_op
567      in zip(tensor_list, stored_list, maybe_shared_map_ops)]
568  # Expand dims of stored tensors by 1 for proper enqueue shape
569  stored_list = [
570      array_ops.expand_dims(s, [-1]) if s_info.sparse else s
571      for s, s_info in zip(stored_list, sparse_info_list)]
572  return stored_list, sparse_info_list
573
574
575def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input):
576  """Store SparseTensors for feeding into batch_join, etc."""
577  (s0, sparse_info_list) = _store_sparse_tensors(
578      tensor_list_list[0], enqueue_many, keep_input)
579  stored_list_list = [s0]
580  for tensor_list in tensor_list_list[1:]:
581    s, sparse_info_candidate = _store_sparse_tensors(
582        tensor_list, enqueue_many, keep_input,
583        [st.map_op for st in sparse_info_list])
584    if sparse_info_list != sparse_info_candidate:
585      raise ValueError("Inconsistent SparseTensors list: %s vs. %s"
586                       % (tensor_list_list[0], tensor_list))
587    sparse_info_list = [
588        info.merge_with(candidate)
589        for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)]
590    stored_list_list.append(s)
591
592  return (stored_list_list, sparse_info_list)
593
594
595def _restore_sparse_tensors(stored_list, sparse_info_list):
596  """Restore SparseTensors after dequeue in batch, batch_join, etc."""
597  received_sequence = isinstance(stored_list, collections_abc.Sequence)
598  if not received_sequence:
599    stored_list = (stored_list,)
600  tensors = [
601      _restore_sparse(sparse_map_op=info.map_op,
602                      sparse_handles=array_ops.squeeze(s, [1]),
603                      rank=tensor_shape.dimension_value(info.rank + 1))
604      if info.sparse else s
605      for (s, info) in zip(stored_list, sparse_info_list)]
606  has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors)
607  if has_st:
608    t_values = [
609        x.values if isinstance(x, sparse_tensor.SparseTensor)
610        else x
611        for x in tensors]
612    with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x)
613    ensure_restore_tensors = [
614        sparse_tensor.SparseTensor(indices=with_deps(x.indices),
615                                   values=with_deps(x.values),
616                                   dense_shape=with_deps(x.dense_shape))
617        if isinstance(x, sparse_tensor.SparseTensor)
618        else with_deps(x)
619        for x in tensors]
620  else:
621    ensure_restore_tensors = tensors
622  return ensure_restore_tensors if received_sequence else tensors[0]
623
624
625def _validate(tensor_list):
626  tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
627  if not tensor_list:
628    raise ValueError("Expected at least one tensor in batch().")
629  return tensor_list
630
631
632def _validate_join(tensor_list_list):
633  tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl)
634                      for tl in tensor_list_list]
635  if not tensor_list_list:
636    raise ValueError("Expected at least one input in batch_join().")
637  return tensor_list_list
638
639
640def _validate_keep_input(keep_input, enqueue_many):
641  """Validate `keep_input` argument to conditional batching functions."""
642  keep_input = ops.convert_to_tensor(keep_input)
643  if keep_input.shape.ndims is None:
644    raise ValueError(
645        "`keep_input` dimensions must be known at graph construction.")
646  if not enqueue_many and keep_input.shape.ndims == 1:
647    raise ValueError(
648        "`keep_input` cannot be a vector when `enqueue_many=False`.")
649  if keep_input.shape.ndims > 1:
650    raise ValueError("`keep_input` must be 0 or 1 dimensions.")
651  return keep_input
652
653
654def _dtypes(tensor_list_list):
655  all_types = [[t.dtype for t in tl] for tl in tensor_list_list]
656  types = all_types[0]
657  for other_types in all_types[1:]:
658    if other_types != types:
659      raise TypeError("Expected types to be consistent: %s vs. %s." %
660                      (", ".join(x.name for x in types),
661                       ", ".join(x.name for x in other_types)))
662  return types
663
664
665def _merge_shapes(shape_list, enqueue_many):
666  shape_list = [tensor_shape.as_shape(s) for s in shape_list]
667  if enqueue_many:
668    # We want the shapes without the leading batch dimension.
669    shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list]
670  merged_shape = shape_list[0]
671  for s in shape_list[1:]:
672    merged_shape.merge_with(s)
673  return merged_shape.as_list()
674
675
676def _shapes(tensor_list_list, shapes, enqueue_many):
677  """Calculate and merge the shapes of incoming tensors.
678
679  Args:
680    tensor_list_list: List of tensor lists.
681    shapes: List of shape tuples corresponding to tensors within the lists.
682    enqueue_many: Boolean describing whether shapes will be enqueued as
683      batches or individual entries.
684
685  Returns:
686    A list of shapes aggregating shape inference info from `tensor_list_list`,
687    or returning `shapes` if it is not `None`.
688
689  Raises:
690    ValueError: If any of the inferred shapes in `tensor_list_list` lack a
691      well defined rank.
692  """
693  if shapes is None:
694    len0 = len(tensor_list_list[0])
695
696    for tl in tensor_list_list:
697      for i in range(len0):
698        if tl[i].shape.ndims is None:
699          raise ValueError("Cannot infer Tensor's rank: %s" % tl[i])
700
701    shapes = [
702        _merge_shapes([tl[i].shape.as_list()
703                       for tl in tensor_list_list], enqueue_many)
704        for i in range(len0)
705    ]
706  return shapes
707
708
709def _select_which_to_enqueue(tensor_list, keep_input):
710  """Select which examples to enqueue based on vector `keep_input`."""
711  select_i = math_ops.cast(keep_input, dtypes.int32)
712  tensor_list = [
713      data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1]
714      for x in tensor_list]
715  return tensor_list
716
717
718def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input):
719  """Enqueue `tensor_list_list` in `queue`."""
720  if enqueue_many:
721    enqueue_fn = queue.enqueue_many
722  else:
723    enqueue_fn = queue.enqueue
724  if keep_input.shape.ndims == 1:
725    enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input))
726                   for x in tensor_list_list]
727  else:
728    enqueue_ops = [utils.smart_cond(
729        keep_input,
730        lambda: enqueue_fn(tl),  # pylint:disable=cell-var-from-loop
731        control_flow_ops.no_op) for tl in tensor_list_list]
732  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
733
734
735def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input):
736  """Enqueue `tensor_list` in `queue`."""
737  if enqueue_many:
738    enqueue_fn = queue.enqueue_many
739  else:
740    enqueue_fn = queue.enqueue
741  if keep_input.shape.ndims == 1:
742    enqueue_ops = [
743        enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads
744  else:
745    enqueue_ops = [utils.smart_cond(
746        keep_input,
747        lambda: enqueue_fn(tensor_list),
748        control_flow_ops.no_op)] * threads
749  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
750
751
752def _which_queue(dynamic_pad):
753  return (data_flow_ops.PaddingFIFOQueue if dynamic_pad
754          else data_flow_ops.FIFOQueue)
755
756
757def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32,
758           enqueue_many=False, shapes=None, dynamic_pad=False,
759           allow_smaller_final_batch=False, shared_name=None,
760           name=None):
761  """Helper function for `batch` and `maybe_batch`."""
762  if context.executing_eagerly():
763    raise ValueError(
764        "Input pipelines based on Queues are not supported when eager execution"
765        " is enabled. Please use tf.data to ingest data into your model"
766        " instead.")
767  tensor_list = _as_tensor_list(tensors)
768  with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name:
769    tensor_list = _validate(tensor_list)
770    keep_input = _validate_keep_input(keep_input, enqueue_many)
771    (tensor_list, sparse_info) = _store_sparse_tensors(
772        tensor_list, enqueue_many, keep_input)
773    types = _dtypes([tensor_list])
774    shapes = _shapes([tensor_list], shapes, enqueue_many)
775    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
776    queue = _which_queue(dynamic_pad)(
777        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
778    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
779    summary.scalar(
780        "fraction_of_%d_full" % capacity,
781        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
782
783    if allow_smaller_final_batch:
784      dequeued = queue.dequeue_up_to(batch_size, name=name)
785    else:
786      dequeued = queue.dequeue_many(batch_size, name=name)
787    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
788    return _as_original_type(tensors, dequeued)
789
790
791# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be
792# a multiple of len(tensor_list_list)?) parameter, to address the use
793# case where you want more parallelism than you can support different
794# readers (either because you don't have that many files or can't
795# read that many files in parallel due to the number of seeks required).
796# Once this is done, batch() can be written as a call to batch_join().
797def _batch_join(tensors_list, batch_size, keep_input, capacity=32,
798                enqueue_many=False, shapes=None, dynamic_pad=False,
799                allow_smaller_final_batch=False, shared_name=None, name=None):
800  """Helper function for `batch_join` and `maybe_batch_join`."""
801  if context.executing_eagerly():
802    raise ValueError(
803        "Input pipelines based on Queues are not supported when eager execution"
804        " is enabled. Please use tf.data to ingest data into your model"
805        " instead.")
806  tensor_list_list = _as_tensor_list_list(tensors_list)
807  with ops.name_scope(name, "batch_join",
808                      _flatten(tensor_list_list) + [keep_input]) as name:
809    tensor_list_list = _validate_join(tensor_list_list)
810    keep_input = _validate_keep_input(keep_input, enqueue_many)
811    tensor_list_list, sparse_info = _store_sparse_tensors_join(
812        tensor_list_list, enqueue_many, keep_input)
813    types = _dtypes(tensor_list_list)
814    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
815    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
816    queue = _which_queue(dynamic_pad)(
817        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
818    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
819    summary.scalar(
820        "fraction_of_%d_full" % capacity,
821        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
822
823    if allow_smaller_final_batch:
824      dequeued = queue.dequeue_up_to(batch_size, name=name)
825    else:
826      dequeued = queue.dequeue_many(batch_size, name=name)
827    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
828    # tensors_list was validated to not be empty.
829    return _as_original_type(tensors_list[0], dequeued)
830
831
832def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
833                   keep_input, num_threads=1, seed=None, enqueue_many=False,
834                   shapes=None, allow_smaller_final_batch=False,
835                   shared_name=None, name=None):
836  """Helper function for `shuffle_batch` and `maybe_shuffle_batch`."""
837  if context.executing_eagerly():
838    raise ValueError(
839        "Input pipelines based on Queues are not supported when eager execution"
840        " is enabled. Please use tf.data to ingest data into your model"
841        " instead.")
842  tensor_list = _as_tensor_list(tensors)
843  with ops.name_scope(name, "shuffle_batch",
844                      list(tensor_list) + [keep_input]) as name:
845    if capacity <= min_after_dequeue:
846      raise ValueError("capacity %d must be bigger than min_after_dequeue %d."
847                       % (capacity, min_after_dequeue))
848    tensor_list = _validate(tensor_list)
849    keep_input = _validate_keep_input(keep_input, enqueue_many)
850    tensor_list, sparse_info = _store_sparse_tensors(
851        tensor_list, enqueue_many, keep_input)
852    types = _dtypes([tensor_list])
853    shapes = _shapes([tensor_list], shapes, enqueue_many)
854    queue = data_flow_ops.RandomShuffleQueue(
855        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
856        dtypes=types, shapes=shapes, shared_name=shared_name)
857    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
858    full = (math_ops.cast(
859        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
860            (1. / (capacity - min_after_dequeue)))
861    # Note that name contains a '/' at the end so we intentionally do not place
862    # a '/' after %s below.
863    summary_name = (
864        "fraction_over_%d_of_%d_full" %
865        (min_after_dequeue, capacity - min_after_dequeue))
866    summary.scalar(summary_name, full)
867
868    if allow_smaller_final_batch:
869      dequeued = queue.dequeue_up_to(batch_size, name=name)
870    else:
871      dequeued = queue.dequeue_many(batch_size, name=name)
872    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
873    return _as_original_type(tensors, dequeued)
874
875
876def _shuffle_batch_join(tensors_list, batch_size, capacity,
877                        min_after_dequeue, keep_input, seed=None,
878                        enqueue_many=False, shapes=None,
879                        allow_smaller_final_batch=False, shared_name=None,
880                        name=None):
881  """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`."""
882  if context.executing_eagerly():
883    raise ValueError(
884        "Input pipelines based on Queues are not supported when eager execution"
885        " is enabled. Please use tf.data to ingest data into your model"
886        " instead.")
887  tensor_list_list = _as_tensor_list_list(tensors_list)
888  with ops.name_scope(name, "shuffle_batch_join",
889                      _flatten(tensor_list_list) + [keep_input]) as name:
890    tensor_list_list = _validate_join(tensor_list_list)
891    keep_input = _validate_keep_input(keep_input, enqueue_many)
892    tensor_list_list, sparse_info = _store_sparse_tensors_join(
893        tensor_list_list, enqueue_many, keep_input)
894    types = _dtypes(tensor_list_list)
895    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
896    queue = data_flow_ops.RandomShuffleQueue(
897        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
898        dtypes=types, shapes=shapes, shared_name=shared_name)
899    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
900    full = (math_ops.cast(
901        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
902            (1. / (capacity - min_after_dequeue)))
903    # Note that name contains a '/' at the end so we intentionally do not place
904    # a '/' after %s below.
905    summary_name = (
906        "fraction_over_%d_of_%d_full" %
907        (min_after_dequeue, capacity - min_after_dequeue))
908    summary.scalar(summary_name, full)
909
910    if allow_smaller_final_batch:
911      dequeued = queue.dequeue_up_to(batch_size, name=name)
912    else:
913      dequeued = queue.dequeue_many(batch_size, name=name)
914    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
915    # tensors_list was validated to not be empty.
916    return _as_original_type(tensors_list[0], dequeued)
917
918# Batching functions ----------------------------------------------------------
919
920
921@tf_export(v1=["train.batch"])
922@deprecation.deprecated(
923    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
924    "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if "
925    "`dynamic_pad=True`).")
926def batch(tensors, batch_size, num_threads=1, capacity=32,
927          enqueue_many=False, shapes=None, dynamic_pad=False,
928          allow_smaller_final_batch=False, shared_name=None, name=None):
929  """Creates batches of tensors in `tensors`.
930
931  The argument `tensors` can be a list or a dictionary of tensors.
932  The value returned by the function will be of the same type
933  as `tensors`.
934
935  This function is implemented using a queue. A `QueueRunner` for the
936  queue is added to the current `Graph`'s `QUEUE_RUNNER` collection.
937
938  If `enqueue_many` is `False`, `tensors` is assumed to represent a single
939  example.  An input tensor with shape `[x, y, z]` will be output as a tensor
940  with shape `[batch_size, x, y, z]`.
941
942  If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of
943  examples, where the first dimension is indexed by example, and all members of
944  `tensors` should have the same size in the first dimension.  If an input
945  tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x,
946  y, z]`.  The `capacity` argument controls the how long the prefetching is
947  allowed to grow the queues.
948
949  The returned operation is a dequeue operation and will throw
950  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
951  operation is feeding another input queue, its queue runner will catch
952  this exception, however, if this operation is used in your main thread
953  you are responsible for catching this yourself.
954
955  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
956  (i) the `shapes` argument is passed, or (ii) all of the tensors in
957  `tensors` must have fully-defined shapes. `ValueError` will be
958  raised if neither of these conditions holds.
959
960  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
961  tensors is known, but individual dimensions may have shape `None`.
962  In this case, for each enqueue the dimensions with value `None`
963  may have a variable length; upon dequeue, the output tensors will be padded
964  on the right to the maximum shape of the tensors in the current minibatch.
965  For numbers, this padding takes value 0.  For strings, this padding is
966  the empty string.  See `PaddingFIFOQueue` for more info.
967
968  If `allow_smaller_final_batch` is `True`, a smaller batch value than
969  `batch_size` is returned when the queue is closed and there are not enough
970  elements to fill the batch, otherwise the pending elements are discarded.
971  In addition, all output tensors' static shapes, as accessed via the
972  `shape` property will have a first `Dimension` value of `None`, and
973  operations that depend on fixed batch_size would fail.
974
975  Args:
976    tensors: The list or dictionary of tensors to enqueue.
977    batch_size: The new batch size pulled from the queue.
978    num_threads: The number of threads enqueuing `tensors`.  The batching will
979      be nondeterministic if `num_threads > 1`.
980    capacity: An integer. The maximum number of elements in the queue.
981    enqueue_many: Whether each tensor in `tensors` is a single example.
982    shapes: (Optional) The shapes for each example.  Defaults to the
983      inferred shapes for `tensors`.
984    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
985      The given dimensions are padded upon dequeue so that tensors within a
986      batch have the same shapes.
987    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
988      batch to be smaller if there are insufficient items left in the queue.
989    shared_name: (Optional). If set, this queue will be shared under the given
990      name across multiple sessions.
991    name: (Optional) A name for the operations.
992
993  Returns:
994    A list or dictionary of tensors with the same types as `tensors` (except if
995    the input is a list of one element, then it returns a tensor, not a list).
996
997  Raises:
998    ValueError: If the `shapes` are not specified, and cannot be
999      inferred from the elements of `tensors`.
1000
1001  @compatibility(eager)
1002  Input pipelines based on Queues are not supported when eager execution is
1003  enabled. Please use the `tf.data` API to ingest data under eager execution.
1004  @end_compatibility
1005  """
1006  return _batch(
1007      tensors,
1008      batch_size,
1009      keep_input=True,
1010      num_threads=num_threads,
1011      capacity=capacity,
1012      enqueue_many=enqueue_many,
1013      shapes=shapes,
1014      dynamic_pad=dynamic_pad,
1015      allow_smaller_final_batch=allow_smaller_final_batch,
1016      shared_name=shared_name,
1017      name=name)
1018
1019
1020@tf_export(v1=["train.maybe_batch"])
1021@deprecation.deprecated(
1022    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1023    "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`"
1024    " if `dynamic_pad=True`).")
1025def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32,
1026                enqueue_many=False, shapes=None, dynamic_pad=False,
1027                allow_smaller_final_batch=False, shared_name=None, name=None):
1028  """Conditionally creates batches of tensors based on `keep_input`.
1029
1030  See docstring in `batch` for more details.
1031
1032  Args:
1033    tensors: The list or dictionary of tensors to enqueue.
1034    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1035      added to the queue or not.  If it is a scalar and evaluates `True`, then
1036      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1037      is `True`, then each example is added to the queue only if the
1038      corresponding value in `keep_input` is `True`. This tensor essentially
1039      acts as a filtering mechanism.
1040    batch_size: The new batch size pulled from the queue.
1041    num_threads: The number of threads enqueuing `tensors`.  The batching will
1042      be nondeterministic if `num_threads > 1`.
1043    capacity: An integer. The maximum number of elements in the queue.
1044    enqueue_many: Whether each tensor in `tensors` is a single example.
1045    shapes: (Optional) The shapes for each example.  Defaults to the
1046      inferred shapes for `tensors`.
1047    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1048      The given dimensions are padded upon dequeue so that tensors within a
1049      batch have the same shapes.
1050    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1051      batch to be smaller if there are insufficient items left in the queue.
1052    shared_name: (Optional). If set, this queue will be shared under the given
1053      name across multiple sessions.
1054    name: (Optional) A name for the operations.
1055
1056  Returns:
1057    A list or dictionary of tensors with the same types as `tensors`.
1058
1059  Raises:
1060    ValueError: If the `shapes` are not specified, and cannot be
1061      inferred from the elements of `tensors`.
1062  """
1063  return _batch(
1064      tensors,
1065      batch_size,
1066      keep_input,
1067      num_threads=num_threads,
1068      capacity=capacity,
1069      enqueue_many=enqueue_many,
1070      shapes=shapes,
1071      dynamic_pad=dynamic_pad,
1072      allow_smaller_final_batch=allow_smaller_final_batch,
1073      shared_name=shared_name,
1074      name=name)
1075
1076
1077@tf_export(v1=["train.batch_join"])
1078@deprecation.deprecated(
1079    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1080    "`tf.data.Dataset.interleave(...).batch(batch_size)` (or "
1081    "`padded_batch(...)` if `dynamic_pad=True`).")
1082def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
1083               shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
1084               shared_name=None, name=None):
1085  """Runs a list of tensors to fill a queue to create batches of examples.
1086
1087  The `tensors_list` argument is a list of tuples of tensors, or a list of
1088  dictionaries of tensors.  Each element in the list is treated similarly
1089  to the `tensors` argument of `tf.compat.v1.train.batch()`.
1090
1091  WARNING: This function is nondeterministic, since it starts a separate thread
1092  for each tensor.
1093
1094  Enqueues a different list of tensors in different threads.
1095  Implemented using a queue -- a `QueueRunner` for the queue
1096  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
1097
1098  `len(tensors_list)` threads will be started,
1099  with thread `i` enqueuing the tensors from
1100  `tensors_list[i]`. `tensors_list[i1][j]` must match
1101  `tensors_list[i2][j]` in type and shape, except in the first
1102  dimension if `enqueue_many` is true.
1103
1104  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1105  to represent a single example. An input tensor `x` will be output as a
1106  tensor with shape `[batch_size] + x.shape`.
1107
1108  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1109  represent a batch of examples, where the first dimension is indexed
1110  by example, and all members of `tensors_list[i]` should have the
1111  same size in the first dimension.  The slices of any input tensor
1112  `x` are treated as examples, and the output tensors will have shape
1113  `[batch_size] + x.shape[1:]`.
1114
1115  The `capacity` argument controls the how long the prefetching is allowed to
1116  grow the queues.
1117
1118  The returned operation is a dequeue operation and will throw
1119  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1120  operation is feeding another input queue, its queue runner will catch
1121  this exception, however, if this operation is used in your main thread
1122  you are responsible for catching this yourself.
1123
1124  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
1125  (i) the `shapes` argument is passed, or (ii) all of the tensors in
1126  `tensors_list` must have fully-defined shapes. `ValueError` will be
1127  raised if neither of these conditions holds.
1128
1129  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
1130  tensors is known, but individual dimensions may have value `None`.
1131  In this case, for each enqueue the dimensions with value `None`
1132  may have a variable length; upon dequeue, the output tensors will be padded
1133  on the right to the maximum shape of the tensors in the current minibatch.
1134  For numbers, this padding takes value 0.  For strings, this padding is
1135  the empty string.  See `PaddingFIFOQueue` for more info.
1136
1137  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1138  `batch_size` is returned when the queue is closed and there are not enough
1139  elements to fill the batch, otherwise the pending elements are discarded.
1140  In addition, all output tensors' static shapes, as accessed via the
1141  `shape` property will have a first `Dimension` value of `None`, and
1142  operations that depend on fixed batch_size would fail.
1143
1144  Args:
1145    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1146    batch_size: An integer. The new batch size pulled from the queue.
1147    capacity: An integer. The maximum number of elements in the queue.
1148    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1149      example.
1150    shapes: (Optional) The shapes for each example.  Defaults to the
1151      inferred shapes for `tensor_list_list[i]`.
1152    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1153      The given dimensions are padded upon dequeue so that tensors within a
1154      batch have the same shapes.
1155    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1156      batch to be smaller if there are insufficient items left in the queue.
1157    shared_name: (Optional) If set, this queue will be shared under the given
1158      name across multiple sessions.
1159    name: (Optional) A name for the operations.
1160
1161  Returns:
1162    A list or dictionary of tensors with the same number and types as
1163    `tensors_list[i]`.
1164
1165  Raises:
1166    ValueError: If the `shapes` are not specified, and cannot be
1167      inferred from the elements of `tensor_list_list`.
1168
1169  @compatibility(eager)
1170  Input pipelines based on Queues are not supported when eager execution is
1171  enabled. Please use the `tf.data` API to ingest data under eager execution.
1172  @end_compatibility
1173  """
1174  return _batch_join(
1175      tensors_list,
1176      batch_size,
1177      keep_input=True,
1178      capacity=capacity,
1179      enqueue_many=enqueue_many,
1180      shapes=shapes,
1181      dynamic_pad=dynamic_pad,
1182      allow_smaller_final_batch=allow_smaller_final_batch,
1183      shared_name=shared_name,
1184      name=name)
1185
1186
1187@tf_export(v1=["train.maybe_batch_join"])
1188@deprecation.deprecated(
1189    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1190    "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or "
1191    "`padded_batch(...)` if `dynamic_pad=True`).")
1192def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32,
1193                     enqueue_many=False, shapes=None, dynamic_pad=False,
1194                     allow_smaller_final_batch=False, shared_name=None,
1195                     name=None):
1196  """Runs a list of tensors to conditionally fill a queue to create batches.
1197
1198  See docstring in `batch_join` for more details.
1199
1200  Args:
1201    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1202    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1203      added to the queue or not.  If it is a scalar and evaluates `True`, then
1204      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1205      is `True`, then each example is added to the queue only if the
1206      corresponding value in `keep_input` is `True`. This tensor essentially
1207      acts as a filtering mechanism.
1208    batch_size: An integer. The new batch size pulled from the queue.
1209    capacity: An integer. The maximum number of elements in the queue.
1210    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1211      example.
1212    shapes: (Optional) The shapes for each example.  Defaults to the
1213      inferred shapes for `tensor_list_list[i]`.
1214    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1215      The given dimensions are padded upon dequeue so that tensors within a
1216      batch have the same shapes.
1217    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1218      batch to be smaller if there are insufficient items left in the queue.
1219    shared_name: (Optional) If set, this queue will be shared under the given
1220      name across multiple sessions.
1221    name: (Optional) A name for the operations.
1222
1223  Returns:
1224    A list or dictionary of tensors with the same number and types as
1225    `tensors_list[i]`.
1226
1227  Raises:
1228    ValueError: If the `shapes` are not specified, and cannot be
1229      inferred from the elements of `tensor_list_list`.
1230  """
1231  return _batch_join(
1232      tensors_list,
1233      batch_size,
1234      keep_input,
1235      capacity=capacity,
1236      enqueue_many=enqueue_many,
1237      shapes=shapes,
1238      dynamic_pad=dynamic_pad,
1239      allow_smaller_final_batch=allow_smaller_final_batch,
1240      shared_name=shared_name,
1241      name=name)
1242
1243
1244@tf_export(v1=["train.shuffle_batch"])
1245@deprecation.deprecated(
1246    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1247    "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.")
1248def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1249                  num_threads=1, seed=None, enqueue_many=False, shapes=None,
1250                  allow_smaller_final_batch=False, shared_name=None, name=None):
1251  """Creates batches by randomly shuffling tensors.
1252
1253  This function adds the following to the current `Graph`:
1254
1255  * A shuffling queue into which tensors from `tensors` are enqueued.
1256  * A `dequeue_many` operation to create batches from the queue.
1257  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1258    from `tensors`.
1259
1260  If `enqueue_many` is `False`, `tensors` is assumed to represent a
1261  single example.  An input tensor with shape `[x, y, z]` will be output
1262  as a tensor with shape `[batch_size, x, y, z]`.
1263
1264  If `enqueue_many` is `True`, `tensors` is assumed to represent a
1265  batch of examples, where the first dimension is indexed by example,
1266  and all members of `tensors` should have the same size in the
1267  first dimension.  If an input tensor has shape `[*, x, y, z]`, the
1268  output will have shape `[batch_size, x, y, z]`.
1269
1270  The `capacity` argument controls the how long the prefetching is allowed to
1271  grow the queues.
1272
1273  The returned operation is a dequeue operation and will throw
1274  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1275  operation is feeding another input queue, its queue runner will catch
1276  this exception, however, if this operation is used in your main thread
1277  you are responsible for catching this yourself.
1278
1279  For example:
1280
1281  ```python
1282  # Creates batches of 32 images and 32 labels.
1283  image_batch, label_batch = tf.compat.v1.train.shuffle_batch(
1284        [single_image, single_label],
1285        batch_size=32,
1286        num_threads=4,
1287        capacity=50000,
1288        min_after_dequeue=10000)
1289  ```
1290
1291  *N.B.:* You must ensure that either (i) the `shapes` argument is
1292  passed, or (ii) all of the tensors in `tensors` must have
1293  fully-defined shapes. `ValueError` will be raised if neither of
1294  these conditions holds.
1295
1296  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1297  `batch_size` is returned when the queue is closed and there are not enough
1298  elements to fill the batch, otherwise the pending elements are discarded.
1299  In addition, all output tensors' static shapes, as accessed via the
1300  `shape` property will have a first `Dimension` value of `None`, and
1301  operations that depend on fixed batch_size would fail.
1302
1303  Args:
1304    tensors: The list or dictionary of tensors to enqueue.
1305    batch_size: The new batch size pulled from the queue.
1306    capacity: An integer. The maximum number of elements in the queue.
1307    min_after_dequeue: Minimum number elements in the queue after a
1308      dequeue, used to ensure a level of mixing of elements.
1309    num_threads: The number of threads enqueuing `tensor_list`.
1310    seed: Seed for the random shuffling within the queue.
1311    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1312    shapes: (Optional) The shapes for each example.  Defaults to the
1313      inferred shapes for `tensor_list`.
1314    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1315      batch to be smaller if there are insufficient items left in the queue.
1316    shared_name: (Optional) If set, this queue will be shared under the given
1317      name across multiple sessions.
1318    name: (Optional) A name for the operations.
1319
1320  Returns:
1321    A list or dictionary of tensors with the types as `tensors`.
1322
1323  Raises:
1324    ValueError: If the `shapes` are not specified, and cannot be
1325      inferred from the elements of `tensors`.
1326
1327  @compatibility(eager)
1328  Input pipelines based on Queues are not supported when eager execution is
1329  enabled. Please use the `tf.data` API to ingest data under eager execution.
1330  @end_compatibility
1331  """
1332  return _shuffle_batch(
1333      tensors,
1334      batch_size,
1335      capacity,
1336      min_after_dequeue,
1337      keep_input=True,
1338      num_threads=num_threads,
1339      seed=seed,
1340      enqueue_many=enqueue_many,
1341      shapes=shapes,
1342      allow_smaller_final_batch=allow_smaller_final_batch,
1343      shared_name=shared_name,
1344      name=name)
1345
1346
1347@tf_export(v1=["train.maybe_shuffle_batch"])
1348@deprecation.deprecated(
1349    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1350    "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`"
1351    ".")
1352def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1353                        keep_input, num_threads=1, seed=None,
1354                        enqueue_many=False, shapes=None,
1355                        allow_smaller_final_batch=False, shared_name=None,
1356                        name=None):
1357  """Creates batches by randomly shuffling conditionally-enqueued tensors.
1358
1359  See docstring in `shuffle_batch` for more details.
1360
1361  Args:
1362    tensors: The list or dictionary of tensors to enqueue.
1363    batch_size: The new batch size pulled from the queue.
1364    capacity: An integer. The maximum number of elements in the queue.
1365    min_after_dequeue: Minimum number elements in the queue after a
1366      dequeue, used to ensure a level of mixing of elements.
1367    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1368      added to the queue or not.  If it is a scalar and evaluates `True`, then
1369      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1370      is `True`, then each example is added to the queue only if the
1371      corresponding value in `keep_input` is `True`. This tensor essentially
1372      acts as a filtering mechanism.
1373    num_threads: The number of threads enqueuing `tensor_list`.
1374    seed: Seed for the random shuffling within the queue.
1375    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1376    shapes: (Optional) The shapes for each example.  Defaults to the
1377      inferred shapes for `tensor_list`.
1378    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1379      batch to be smaller if there are insufficient items left in the queue.
1380    shared_name: (Optional) If set, this queue will be shared under the given
1381      name across multiple sessions.
1382    name: (Optional) A name for the operations.
1383
1384  Returns:
1385    A list or dictionary of tensors with the types as `tensors`.
1386
1387  Raises:
1388    ValueError: If the `shapes` are not specified, and cannot be
1389      inferred from the elements of `tensors`.
1390
1391  @compatibility(eager)
1392  Input pipelines based on Queues are not supported when eager execution is
1393  enabled. Please use the `tf.data` API to ingest data under eager execution.
1394  @end_compatibility
1395  """
1396  return _shuffle_batch(
1397      tensors,
1398      batch_size,
1399      capacity,
1400      min_after_dequeue,
1401      keep_input,
1402      num_threads=num_threads,
1403      seed=seed,
1404      enqueue_many=enqueue_many,
1405      shapes=shapes,
1406      allow_smaller_final_batch=allow_smaller_final_batch,
1407      shared_name=shared_name,
1408      name=name)
1409
1410
1411@tf_export(v1=["train.shuffle_batch_join"])
1412@deprecation.deprecated(
1413    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1414    "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch"
1415    "(batch_size)`.")
1416def shuffle_batch_join(tensors_list, batch_size, capacity,
1417                       min_after_dequeue, seed=None, enqueue_many=False,
1418                       shapes=None, allow_smaller_final_batch=False,
1419                       shared_name=None, name=None):
1420  """Create batches by randomly shuffling tensors.
1421
1422  The `tensors_list` argument is a list of tuples of tensors, or a list of
1423  dictionaries of tensors.  Each element in the list is treated similarly
1424  to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`.
1425
1426  This version enqueues a different list of tensors in different threads.
1427  It adds the following to the current `Graph`:
1428
1429  * A shuffling queue into which tensors from `tensors_list` are enqueued.
1430  * A `dequeue_many` operation to create batches from the queue.
1431  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1432    from `tensors_list`.
1433
1434  `len(tensors_list)` threads will be started, with thread `i` enqueuing
1435  the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match
1436  `tensors_list[i2][j]` in type and shape, except in the first dimension if
1437  `enqueue_many` is true.
1438
1439  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1440  to represent a single example.  An input tensor with shape `[x, y, z]`
1441  will be output as a tensor with shape `[batch_size, x, y, z]`.
1442
1443  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1444  represent a batch of examples, where the first dimension is indexed
1445  by example, and all members of `tensors_list[i]` should have the
1446  same size in the first dimension.  If an input tensor has shape `[*, x,
1447  y, z]`, the output will have shape `[batch_size, x, y, z]`.
1448
1449  The `capacity` argument controls the how long the prefetching is allowed to
1450  grow the queues.
1451
1452  The returned operation is a dequeue operation and will throw
1453  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1454  operation is feeding another input queue, its queue runner will catch
1455  this exception, however, if this operation is used in your main thread
1456  you are responsible for catching this yourself.
1457
1458  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1459  `batch_size` is returned when the queue is closed and there are not enough
1460  elements to fill the batch, otherwise the pending elements are discarded.
1461  In addition, all output tensors' static shapes, as accessed via the
1462  `shape` property will have a first `Dimension` value of `None`, and
1463  operations that depend on fixed batch_size would fail.
1464
1465  Args:
1466    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1467    batch_size: An integer. The new batch size pulled from the queue.
1468    capacity: An integer. The maximum number of elements in the queue.
1469    min_after_dequeue: Minimum number elements in the queue after a
1470      dequeue, used to ensure a level of mixing of elements.
1471    seed: Seed for the random shuffling within the queue.
1472    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1473      example.
1474    shapes: (Optional) The shapes for each example.  Defaults to the
1475      inferred shapes for `tensors_list[i]`.
1476    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1477      batch to be smaller if there are insufficient items left in the queue.
1478    shared_name: (optional). If set, this queue will be shared under the given
1479      name across multiple sessions.
1480    name: (Optional) A name for the operations.
1481
1482  Returns:
1483    A list or dictionary of tensors with the same number and types as
1484    `tensors_list[i]`.
1485
1486  Raises:
1487    ValueError: If the `shapes` are not specified, and cannot be
1488      inferred from the elements of `tensors_list`.
1489
1490  @compatibility(eager)
1491  Input pipelines based on Queues are not supported when eager execution is
1492  enabled. Please use the `tf.data` API to ingest data under eager execution.
1493  @end_compatibility
1494  """
1495  return _shuffle_batch_join(
1496      tensors_list,
1497      batch_size,
1498      capacity,
1499      min_after_dequeue,
1500      keep_input=True,
1501      seed=seed,
1502      enqueue_many=enqueue_many,
1503      shapes=shapes,
1504      allow_smaller_final_batch=allow_smaller_final_batch,
1505      shared_name=shared_name,
1506      name=name)
1507
1508
1509@tf_export(v1=["train.maybe_shuffle_batch_join"])
1510@deprecation.deprecated(
1511    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1512    "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)"
1513    ".batch(batch_size)`.")
1514def maybe_shuffle_batch_join(tensors_list, batch_size, capacity,
1515                             min_after_dequeue, keep_input, seed=None,
1516                             enqueue_many=False, shapes=None,
1517                             allow_smaller_final_batch=False, shared_name=None,
1518                             name=None):
1519  """Create batches by randomly shuffling conditionally-enqueued tensors.
1520
1521  See docstring in `shuffle_batch_join` for more details.
1522
1523  Args:
1524    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1525    batch_size: An integer. The new batch size pulled from the queue.
1526    capacity: An integer. The maximum number of elements in the queue.
1527    min_after_dequeue: Minimum number elements in the queue after a
1528      dequeue, used to ensure a level of mixing of elements.
1529    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1530      added to the queue or not.  If it is a scalar and evaluates `True`, then
1531      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1532      is `True`, then each example is added to the queue only if the
1533      corresponding value in `keep_input` is `True`. This tensor essentially
1534      acts as a filtering mechanism.
1535    seed: Seed for the random shuffling within the queue.
1536    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1537      example.
1538    shapes: (Optional) The shapes for each example.  Defaults to the
1539      inferred shapes for `tensors_list[i]`.
1540    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1541      batch to be smaller if there are insufficient items left in the queue.
1542    shared_name: (optional). If set, this queue will be shared under the given
1543      name across multiple sessions.
1544    name: (Optional) A name for the operations.
1545
1546  Returns:
1547    A list or dictionary of tensors with the same number and types as
1548    `tensors_list[i]`.
1549
1550  Raises:
1551    ValueError: If the `shapes` are not specified, and cannot be
1552      inferred from the elements of `tensors_list`.
1553
1554  @compatibility(eager)
1555  Input pipelines based on Queues are not supported when eager execution is
1556  enabled. Please use the `tf.data` API to ingest data under eager execution.
1557  @end_compatibility
1558  """
1559  return _shuffle_batch_join(
1560      tensors_list,
1561      batch_size,
1562      capacity,
1563      min_after_dequeue,
1564      keep_input,
1565      seed=seed,
1566      enqueue_many=enqueue_many,
1567      shapes=shapes,
1568      allow_smaller_final_batch=allow_smaller_final_batch,
1569      shared_name=shared_name,
1570      name=name)
1571