1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15 16"""Input pipeline. 17 18Please see the [reading data 19how-to](https://tensorflow.org/api_guides/python/reading_data) 20for context. 21""" 22 23 24from tensorflow.python.eager import context 25from tensorflow.python.framework import constant_op 26from tensorflow.python.framework import dtypes 27from tensorflow.python.framework import ops 28from tensorflow.python.framework import sparse_tensor 29from tensorflow.python.framework import tensor_shape 30from tensorflow.python.layers import utils 31from tensorflow.python.ops import array_ops 32from tensorflow.python.ops import control_flow_ops 33from tensorflow.python.ops import data_flow_ops 34from tensorflow.python.ops import io_ops 35from tensorflow.python.ops import math_ops 36from tensorflow.python.ops import random_ops 37from tensorflow.python.ops import sparse_ops 38from tensorflow.python.ops import variable_scope as vs 39from tensorflow.python.summary import summary 40from tensorflow.python.training import queue_runner 41from tensorflow.python.util import deprecation 42from tensorflow.python.util.compat import collections_abc 43from tensorflow.python.util.tf_export import tf_export 44 45 46# pylint: disable=protected-access 47_store_sparse = sparse_ops._add_sparse_to_tensors_map 48_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map 49_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map 50# pylint: enable=protected-access 51 52 53@tf_export( 54 "io.match_filenames_once", 55 v1=["io.match_filenames_once", "train.match_filenames_once"]) 56@deprecation.deprecated_endpoints("train.match_filenames_once") 57def match_filenames_once(pattern, name=None): 58 """Save the list of files matching pattern, so it is only computed once. 59 60 NOTE: The order of the files returned is deterministic. 61 62 Args: 63 pattern: A file pattern (glob), or 1D tensor of file patterns. 64 name: A name for the operations (optional). 65 66 Returns: 67 A variable that is initialized to the list of files matching the pattern(s). 68 """ 69 with ops.name_scope(name, "matching_filenames", [pattern]) as name: 70 return vs.variable( 71 name=name, initial_value=io_ops.matching_files(pattern), 72 trainable=False, validate_shape=False, 73 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 74 75 76@tf_export(v1=["train.limit_epochs"]) 77@deprecation.deprecated( 78 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 79 "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.") 80def limit_epochs(tensor, num_epochs=None, name=None): 81 """Returns tensor `num_epochs` times and then raises an `OutOfRange` error. 82 83 Note: creates local counter `epochs`. Use `local_variables_initializer()` to 84 initialize local variables. 85 86 Args: 87 tensor: Any `Tensor`. 88 num_epochs: A positive integer (optional). If specified, limits the number 89 of steps the output tensor may be evaluated. 90 name: A name for the operations (optional). 91 92 Returns: 93 tensor or `OutOfRange`. 94 95 Raises: 96 ValueError: if `num_epochs` is invalid. 97 """ 98 if num_epochs is None: 99 return tensor 100 if num_epochs <= 0: 101 raise ValueError("num_epochs must be > 0 not %d." % num_epochs) 102 with ops.name_scope(name, "limit_epochs", [tensor]) as name: 103 zero64 = constant_op.constant(0, dtype=dtypes.int64) 104 epochs = vs.variable( 105 zero64, name="epochs", trainable=False, 106 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 107 counter = epochs.count_up_to(num_epochs) 108 with ops.control_dependencies([counter]): 109 return array_ops.identity(tensor, name=name) 110 111 112@tf_export(v1=["train.input_producer"]) 113@deprecation.deprecated( 114 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 115 "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle" 116 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 117 "`shuffle=False`, omit the `.shuffle(...)`.") 118def input_producer(input_tensor, 119 element_shape=None, 120 num_epochs=None, 121 shuffle=True, 122 seed=None, 123 capacity=32, 124 shared_name=None, 125 summary_name=None, 126 name=None, 127 cancel_op=None): 128 """Output the rows of `input_tensor` to a queue for an input pipeline. 129 130 Note: if `num_epochs` is not `None`, this function creates local counter 131 `epochs`. Use `local_variables_initializer()` to initialize local variables. 132 133 Args: 134 input_tensor: A tensor with the rows to produce. Must be at least 135 one-dimensional. Must either have a fully-defined shape, or 136 `element_shape` must be defined. 137 element_shape: (Optional.) A `TensorShape` representing the shape of a 138 row of `input_tensor`, if it cannot be inferred. 139 num_epochs: (Optional.) An integer. If specified `input_producer` produces 140 each row of `input_tensor` `num_epochs` times before generating an 141 `OutOfRange` error. If not specified, `input_producer` can cycle through 142 the rows of `input_tensor` an unlimited number of times. 143 shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled 144 within each epoch. 145 seed: (Optional.) An integer. The seed to use if `shuffle` is true. 146 capacity: (Optional.) The capacity of the queue to be used for buffering 147 the input. 148 shared_name: (Optional.) If set, this queue will be shared under the given 149 name across multiple sessions. 150 summary_name: (Optional.) If set, a scalar summary for the current queue 151 size will be generated, using this name as part of the tag. 152 name: (Optional.) A name for queue. 153 cancel_op: (Optional.) Cancel op for the queue 154 155 Returns: 156 A queue with the output rows. A `QueueRunner` for the queue is 157 added to the current `QUEUE_RUNNER` collection of the current 158 graph. 159 160 Raises: 161 ValueError: If the shape of the input cannot be inferred from the arguments. 162 RuntimeError: If called with eager execution enabled. 163 164 @compatibility(eager) 165 Input pipelines based on Queues are not supported when eager execution is 166 enabled. Please use the `tf.data` API to ingest data under eager execution. 167 @end_compatibility 168 """ 169 if context.executing_eagerly(): 170 raise RuntimeError( 171 "Input pipelines based on Queues are not supported when eager execution" 172 " is enabled. Please use tf.data to ingest data into your model" 173 " instead.") 174 with ops.name_scope(name, "input_producer", [input_tensor]): 175 input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor") 176 element_shape = input_tensor.shape[1:].merge_with(element_shape) 177 if not element_shape.is_fully_defined(): 178 raise ValueError("Either `input_tensor` must have a fully defined shape " 179 "or `element_shape` must be specified") 180 181 if shuffle: 182 input_tensor = random_ops.random_shuffle(input_tensor, seed=seed) 183 184 input_tensor = limit_epochs(input_tensor, num_epochs) 185 186 q = data_flow_ops.FIFOQueue(capacity=capacity, 187 dtypes=[input_tensor.dtype.base_dtype], 188 shapes=[element_shape], 189 shared_name=shared_name, name=name) 190 enq = q.enqueue_many([input_tensor]) 191 queue_runner.add_queue_runner( 192 queue_runner.QueueRunner( 193 q, [enq], cancel_op=cancel_op)) 194 if summary_name is not None: 195 summary.scalar(summary_name, 196 math_ops.cast(q.size(), dtypes.float32) * (1. / capacity)) 197 return q 198 199 200@tf_export(v1=["train.string_input_producer"]) 201@deprecation.deprecated( 202 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 203 "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle" 204 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 205 "`shuffle=False`, omit the `.shuffle(...)`.") 206def string_input_producer(string_tensor, 207 num_epochs=None, 208 shuffle=True, 209 seed=None, 210 capacity=32, 211 shared_name=None, 212 name=None, 213 cancel_op=None): 214 """Output strings (e.g. filenames) to a queue for an input pipeline. 215 216 Note: if `num_epochs` is not `None`, this function creates local counter 217 `epochs`. Use `local_variables_initializer()` to initialize local variables. 218 219 Args: 220 string_tensor: A 1-D string tensor with the strings to produce. 221 num_epochs: An integer (optional). If specified, `string_input_producer` 222 produces each string from `string_tensor` `num_epochs` times before 223 generating an `OutOfRange` error. If not specified, 224 `string_input_producer` can cycle through the strings in `string_tensor` 225 an unlimited number of times. 226 shuffle: Boolean. If true, the strings are randomly shuffled within each 227 epoch. 228 seed: An integer (optional). Seed used if shuffle == True. 229 capacity: An integer. Sets the queue capacity. 230 shared_name: (optional). If set, this queue will be shared under the given 231 name across multiple sessions. All sessions open to the device which has 232 this queue will be able to access it via the shared_name. Using this in 233 a distributed setting means each name will only be seen by one of the 234 sessions which has access to this operation. 235 name: A name for the operations (optional). 236 cancel_op: Cancel op for the queue (optional). 237 238 Returns: 239 A queue with the output strings. A `QueueRunner` for the Queue 240 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 241 242 Raises: 243 ValueError: If the string_tensor is a null Python list. At runtime, 244 will fail with an assertion if string_tensor becomes a null tensor. 245 246 @compatibility(eager) 247 Input pipelines based on Queues are not supported when eager execution is 248 enabled. Please use the `tf.data` API to ingest data under eager execution. 249 @end_compatibility 250 """ 251 not_null_err = "string_input_producer requires a non-null input tensor" 252 if not isinstance(string_tensor, ops.Tensor) and not string_tensor: 253 raise ValueError(not_null_err) 254 255 with ops.name_scope(name, "input_producer", [string_tensor]) as name: 256 string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string) 257 with ops.control_dependencies([ 258 control_flow_ops.Assert( 259 math_ops.greater(array_ops.size(string_tensor), 0), 260 [not_null_err])]): 261 string_tensor = array_ops.identity(string_tensor) 262 return input_producer( 263 input_tensor=string_tensor, 264 element_shape=[], 265 num_epochs=num_epochs, 266 shuffle=shuffle, 267 seed=seed, 268 capacity=capacity, 269 shared_name=shared_name, 270 name=name, 271 summary_name="fraction_of_%d_full" % capacity, 272 cancel_op=cancel_op) 273 274 275@tf_export(v1=["train.range_input_producer"]) 276@deprecation.deprecated( 277 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 278 "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If " 279 "`shuffle=False`, omit the `.shuffle(...)`.") 280def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None, 281 capacity=32, shared_name=None, name=None): 282 """Produces the integers from 0 to limit-1 in a queue. 283 284 Note: if `num_epochs` is not `None`, this function creates local counter 285 `epochs`. Use `local_variables_initializer()` to initialize local variables. 286 287 Args: 288 limit: An int32 scalar tensor. 289 num_epochs: An integer (optional). If specified, `range_input_producer` 290 produces each integer `num_epochs` times before generating an 291 OutOfRange error. If not specified, `range_input_producer` can cycle 292 through the integers an unlimited number of times. 293 shuffle: Boolean. If true, the integers are randomly shuffled within each 294 epoch. 295 seed: An integer (optional). Seed used if shuffle == True. 296 capacity: An integer. Sets the queue capacity. 297 shared_name: (optional). If set, this queue will be shared under the given 298 name across multiple sessions. 299 name: A name for the operations (optional). 300 301 Returns: 302 A Queue with the output integers. A `QueueRunner` for the Queue 303 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 304 305 @compatibility(eager) 306 Input pipelines based on Queues are not supported when eager execution is 307 enabled. Please use the `tf.data` API to ingest data under eager execution. 308 @end_compatibility 309 """ 310 with ops.name_scope(name, "input_producer", [limit]) as name: 311 range_tensor = math_ops.range(limit) 312 return input_producer( 313 range_tensor, [], num_epochs, shuffle, seed, capacity, 314 shared_name, "fraction_of_%d_full" % capacity, name) 315 316 317@tf_export(v1=["train.slice_input_producer"]) 318@deprecation.deprecated( 319 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 320 "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle" 321 "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If " 322 "`shuffle=False`, omit the `.shuffle(...)`.") 323def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None, 324 capacity=32, shared_name=None, name=None): 325 """Produces a slice of each `Tensor` in `tensor_list`. 326 327 Implemented using a Queue -- a `QueueRunner` for the Queue 328 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 329 330 Args: 331 tensor_list: A list of `Tensor` objects. Every `Tensor` in 332 `tensor_list` must have the same size in the first dimension. 333 num_epochs: An integer (optional). If specified, `slice_input_producer` 334 produces each slice `num_epochs` times before generating 335 an `OutOfRange` error. If not specified, `slice_input_producer` can cycle 336 through the slices an unlimited number of times. 337 shuffle: Boolean. If true, the integers are randomly shuffled within each 338 epoch. 339 seed: An integer (optional). Seed used if shuffle == True. 340 capacity: An integer. Sets the queue capacity. 341 shared_name: (optional). If set, this queue will be shared under the given 342 name across multiple sessions. 343 name: A name for the operations (optional). 344 345 Returns: 346 A list of tensors, one for each element of `tensor_list`. If the tensor 347 in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output 348 tensor will have shape `[a, b, ..., z]`. 349 350 Raises: 351 ValueError: if `slice_input_producer` produces nothing from `tensor_list`. 352 353 @compatibility(eager) 354 Input pipelines based on Queues are not supported when eager execution is 355 enabled. Please use the `tf.data` API to ingest data under eager execution. 356 @end_compatibility 357 """ 358 with ops.name_scope(name, "input_producer", tensor_list): 359 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 360 if not tensor_list: 361 raise ValueError( 362 "Expected at least one tensor in slice_input_producer().") 363 range_size = array_ops.shape(tensor_list[0])[0] 364 # TODO(josh11b): Add an assertion that the first dimension of 365 # everything in TensorList matches. Maybe just check the inferred shapes? 366 queue = range_input_producer(range_size, num_epochs=num_epochs, 367 shuffle=shuffle, seed=seed, capacity=capacity, 368 shared_name=shared_name) 369 index = queue.dequeue() 370 output = [array_ops.gather(t, index) for t in tensor_list] 371 return output 372 373 374# Helpers for the batching functions ------------------------------------------ 375 376 377def _flatten(tensor_list_list): 378 return [tensor for tensor_list in tensor_list_list for tensor in tensor_list] 379 380 381class _SparseMetaData: 382 """Store information about the Tensor: Is it sparse?, map_op, and rank.""" 383 384 def __init__(self, sparse, map_op, rank): 385 """Create the metadata. 386 387 Args: 388 sparse: Python boolean. 389 map_op: The `Operation` that created the `SparseTensorsMap` in question. 390 This Op contains information about the underlying Map object and the 391 dtype of the original data. 392 rank: The statically known rank of the `SparseTensor`. 393 """ 394 self._sparse = sparse 395 self._map_op = map_op 396 self._rank = tensor_shape.as_dimension(rank) 397 398 def __eq__(self, other): 399 if self.sparse != other.sparse: 400 return False 401 if not self.sparse: 402 return True 403 # If map_ops are not the same, the data source is not the same. 404 if (self.map_op is not None) != (other.map_op is not None): 405 return False 406 if self.map_op != other.map_op: 407 return False 408 if not self.rank.is_compatible_with(other.rank): 409 return False 410 return True 411 412 def __ne__(self, other): 413 return not self.__eq__(other) 414 415 def __str__(self): 416 return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name, 417 self.rank) 418 419 def merge_with(self, other): 420 if self != other: 421 raise ValueError("SparseMetaData objects are incompatible: %s vs. %s" 422 % (self, other)) 423 if self.sparse: 424 self.rank.merge_with(other.rank) 425 return self 426 427 @property 428 def map_op(self): 429 return self._map_op 430 431 @property 432 def sparse(self): 433 return self._sparse 434 435 @property 436 def rank(self): 437 return self._rank 438 439 440def _as_tensor_list(tensors): 441 if isinstance(tensors, dict): 442 return [tensors[k] for k in sorted(tensors, key=str)] 443 else: 444 return tensors 445 446 447def _as_tensor_list_list(tensors_list): 448 if not tensors_list: 449 raise ValueError("Expected at least one set of tensors") 450 if isinstance(tensors_list[0], dict): 451 expected_keys = set(tensors_list[0].keys()) 452 for tensors in tensors_list[1:]: 453 if set(tensors.keys()) != expected_keys: 454 raise ValueError("All dictionaries in tensors_list must have " 455 "the same keys") 456 return [_as_tensor_list(tensors) for tensors in tensors_list] 457 else: 458 return tensors_list 459 460 461def _as_original_type(original_tensors, tensor_list): 462 if isinstance(original_tensors, dict): 463 if len(original_tensors) == 1: 464 # tensor_list is bogusly returned as a single tensor if only one tensor 465 # was enqueued. Make it a list again. See b/28117485. 466 tensor_list = [tensor_list] 467 return {k: tensor_list[i] 468 for i, k in enumerate(sorted(original_tensors, key=str))} 469 else: 470 return tensor_list 471 472 473def _store_sparse_tensors(tensor_list, enqueue_many, keep_input, 474 shared_map_ops=None): 475 """Store SparseTensors for feeding into batch, etc. 476 477 If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects 478 are reused (shared). This argument is useful for, e.g., `batch_join` 479 where multiple enqueue operations write to the same Queue component, 480 and another (dequeue) thread reads from that same location and must then 481 restore the associated `SparseTensor` objects. In this case, the sparse 482 restore must have a single `SparseTensorMap` from which to read out the 483 handles; so a single `SparseTensorMap` must be shared for storing 484 across the multiple enqueue operations. This sharing is performed by 485 calling `_store_sparse_tensors` the first time with `shared_map_ops=None`, 486 and then in subsequent times with this value set to the list of `Operation` 487 objects created in the first call. 488 489 Args: 490 tensor_list: List of `Tensor` and `SparseTensor` objects. 491 enqueue_many: Python `Boolean`. 492 keep_input: Must be a scalar bool Tensor (not a Python bool). If False, 493 don't store. 494 shared_map_ops: (optional) List of `Operation` objects from a previous 495 call to `_store_sparse_tensors`. If not `None`, the op types should be 496 one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the 497 locations corresponding to `SparseTensors` in `tensor_list`. 498 499 Returns: 500 A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list 501 of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list` 502 is a list of the same length of `_SparseMetaData` objects. 503 """ 504 maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list) 505 506 def _sparse_meta_data(t, storing_op, map_op): 507 if not isinstance(t, sparse_tensor.SparseTensor): 508 return _SparseMetaData(False, None, None) 509 rank = t.dense_shape.shape.with_rank(1).dims[0] 510 if enqueue_many: 511 rank -= 1 512 # If a shared map_op was provided, use that. Otherwise use the name of 513 # the operation used to store the SparseTensor. 514 return _SparseMetaData( 515 sparse=True, map_op=map_op or storing_op, rank=rank) 516 517 def _maybe_store(t, shared_map_op): 518 """Store Sparse tensor, if necessary.""" 519 if not isinstance(t, sparse_tensor.SparseTensor): 520 return t 521 map_op_name = shared_map_op.name if shared_map_op else None 522 def _maybe_store_sparse(t, map_op_name, keep_input): 523 """Conditionally store a single sparse Tensor.""" 524 return utils.smart_cond( 525 keep_input, 526 lambda: _store_sparse(t, shared_name=map_op_name), 527 lambda: constant_op.constant(-1, dtypes.int64)) 528 def _maybe_store_many_sparse(t, map_op_name, keep_input): 529 """Conditionally store multiple sparse Tensors.""" 530 out_tensor = utils.smart_cond( 531 keep_input, 532 lambda: _store_many_sparse(t, shared_name=map_op_name), 533 lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64)) 534 out_tensor.set_shape([None]) # necessary when t.ndims is unknown 535 return out_tensor 536 def _sparse_values_to_keep(t, keep_input): 537 """Convert a per-row `keep_input` vector to a per-value one.""" 538 # Get the rows of every value in the sparse Tensor. 539 row_values = t.indices[:, 0] 540 # The value should be kept iff the row should be kept. 541 return array_ops.gather(keep_input, row_values) 542 if keep_input.shape.ndims == 1: 543 t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input)) 544 store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name) 545 elif enqueue_many: 546 store_f = _maybe_store_many_sparse 547 else: 548 store_f = _maybe_store_sparse 549 return store_f(t, map_op_name, keep_input) 550 551 stored_list = [ 552 _maybe_store(t, shared_map_op) for t, shared_map_op 553 in zip(tensor_list, maybe_shared_map_ops)] 554 # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`, 555 # we can't just get the Op of the resulting tensor. 556 def _sparse_op(stored): 557 for input_tensor in stored.op.inputs: 558 if input_tensor.op.type in ("AddSparseToTensorsMap", 559 "AddManySparseToTensorsMap"): 560 return input_tensor.op 561 # If there was no sparse input, then the original stored Tensor wasn't 562 # sparse and we can just return the original Tensor's Op. 563 return stored.op 564 sparse_info_list = [ 565 _sparse_meta_data(t, _sparse_op(stored), shared_map_op) 566 for t, stored, shared_map_op 567 in zip(tensor_list, stored_list, maybe_shared_map_ops)] 568 # Expand dims of stored tensors by 1 for proper enqueue shape 569 stored_list = [ 570 array_ops.expand_dims(s, [-1]) if s_info.sparse else s 571 for s, s_info in zip(stored_list, sparse_info_list)] 572 return stored_list, sparse_info_list 573 574 575def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input): 576 """Store SparseTensors for feeding into batch_join, etc.""" 577 (s0, sparse_info_list) = _store_sparse_tensors( 578 tensor_list_list[0], enqueue_many, keep_input) 579 stored_list_list = [s0] 580 for tensor_list in tensor_list_list[1:]: 581 s, sparse_info_candidate = _store_sparse_tensors( 582 tensor_list, enqueue_many, keep_input, 583 [st.map_op for st in sparse_info_list]) 584 if sparse_info_list != sparse_info_candidate: 585 raise ValueError("Inconsistent SparseTensors list: %s vs. %s" 586 % (tensor_list_list[0], tensor_list)) 587 sparse_info_list = [ 588 info.merge_with(candidate) 589 for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)] 590 stored_list_list.append(s) 591 592 return (stored_list_list, sparse_info_list) 593 594 595def _restore_sparse_tensors(stored_list, sparse_info_list): 596 """Restore SparseTensors after dequeue in batch, batch_join, etc.""" 597 received_sequence = isinstance(stored_list, collections_abc.Sequence) 598 if not received_sequence: 599 stored_list = (stored_list,) 600 tensors = [ 601 _restore_sparse(sparse_map_op=info.map_op, 602 sparse_handles=array_ops.squeeze(s, [1]), 603 rank=tensor_shape.dimension_value(info.rank + 1)) 604 if info.sparse else s 605 for (s, info) in zip(stored_list, sparse_info_list)] 606 has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors) 607 if has_st: 608 t_values = [ 609 x.values if isinstance(x, sparse_tensor.SparseTensor) 610 else x 611 for x in tensors] 612 with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x) 613 ensure_restore_tensors = [ 614 sparse_tensor.SparseTensor(indices=with_deps(x.indices), 615 values=with_deps(x.values), 616 dense_shape=with_deps(x.dense_shape)) 617 if isinstance(x, sparse_tensor.SparseTensor) 618 else with_deps(x) 619 for x in tensors] 620 else: 621 ensure_restore_tensors = tensors 622 return ensure_restore_tensors if received_sequence else tensors[0] 623 624 625def _validate(tensor_list): 626 tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list) 627 if not tensor_list: 628 raise ValueError("Expected at least one tensor in batch().") 629 return tensor_list 630 631 632def _validate_join(tensor_list_list): 633 tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl) 634 for tl in tensor_list_list] 635 if not tensor_list_list: 636 raise ValueError("Expected at least one input in batch_join().") 637 return tensor_list_list 638 639 640def _validate_keep_input(keep_input, enqueue_many): 641 """Validate `keep_input` argument to conditional batching functions.""" 642 keep_input = ops.convert_to_tensor(keep_input) 643 if keep_input.shape.ndims is None: 644 raise ValueError( 645 "`keep_input` dimensions must be known at graph construction.") 646 if not enqueue_many and keep_input.shape.ndims == 1: 647 raise ValueError( 648 "`keep_input` cannot be a vector when `enqueue_many=False`.") 649 if keep_input.shape.ndims > 1: 650 raise ValueError("`keep_input` must be 0 or 1 dimensions.") 651 return keep_input 652 653 654def _dtypes(tensor_list_list): 655 all_types = [[t.dtype for t in tl] for tl in tensor_list_list] 656 types = all_types[0] 657 for other_types in all_types[1:]: 658 if other_types != types: 659 raise TypeError("Expected types to be consistent: %s vs. %s." % 660 (", ".join(x.name for x in types), 661 ", ".join(x.name for x in other_types))) 662 return types 663 664 665def _merge_shapes(shape_list, enqueue_many): 666 shape_list = [tensor_shape.as_shape(s) for s in shape_list] 667 if enqueue_many: 668 # We want the shapes without the leading batch dimension. 669 shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list] 670 merged_shape = shape_list[0] 671 for s in shape_list[1:]: 672 merged_shape.merge_with(s) 673 return merged_shape.as_list() 674 675 676def _shapes(tensor_list_list, shapes, enqueue_many): 677 """Calculate and merge the shapes of incoming tensors. 678 679 Args: 680 tensor_list_list: List of tensor lists. 681 shapes: List of shape tuples corresponding to tensors within the lists. 682 enqueue_many: Boolean describing whether shapes will be enqueued as 683 batches or individual entries. 684 685 Returns: 686 A list of shapes aggregating shape inference info from `tensor_list_list`, 687 or returning `shapes` if it is not `None`. 688 689 Raises: 690 ValueError: If any of the inferred shapes in `tensor_list_list` lack a 691 well defined rank. 692 """ 693 if shapes is None: 694 len0 = len(tensor_list_list[0]) 695 696 for tl in tensor_list_list: 697 for i in range(len0): 698 if tl[i].shape.ndims is None: 699 raise ValueError("Cannot infer Tensor's rank: %s" % tl[i]) 700 701 shapes = [ 702 _merge_shapes([tl[i].shape.as_list() 703 for tl in tensor_list_list], enqueue_many) 704 for i in range(len0) 705 ] 706 return shapes 707 708 709def _select_which_to_enqueue(tensor_list, keep_input): 710 """Select which examples to enqueue based on vector `keep_input`.""" 711 select_i = math_ops.cast(keep_input, dtypes.int32) 712 tensor_list = [ 713 data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1] 714 for x in tensor_list] 715 return tensor_list 716 717 718def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input): 719 """Enqueue `tensor_list_list` in `queue`.""" 720 if enqueue_many: 721 enqueue_fn = queue.enqueue_many 722 else: 723 enqueue_fn = queue.enqueue 724 if keep_input.shape.ndims == 1: 725 enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input)) 726 for x in tensor_list_list] 727 else: 728 enqueue_ops = [utils.smart_cond( 729 keep_input, 730 lambda: enqueue_fn(tl), # pylint:disable=cell-var-from-loop 731 control_flow_ops.no_op) for tl in tensor_list_list] 732 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 733 734 735def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input): 736 """Enqueue `tensor_list` in `queue`.""" 737 if enqueue_many: 738 enqueue_fn = queue.enqueue_many 739 else: 740 enqueue_fn = queue.enqueue 741 if keep_input.shape.ndims == 1: 742 enqueue_ops = [ 743 enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads 744 else: 745 enqueue_ops = [utils.smart_cond( 746 keep_input, 747 lambda: enqueue_fn(tensor_list), 748 control_flow_ops.no_op)] * threads 749 queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops)) 750 751 752def _which_queue(dynamic_pad): 753 return (data_flow_ops.PaddingFIFOQueue if dynamic_pad 754 else data_flow_ops.FIFOQueue) 755 756 757def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32, 758 enqueue_many=False, shapes=None, dynamic_pad=False, 759 allow_smaller_final_batch=False, shared_name=None, 760 name=None): 761 """Helper function for `batch` and `maybe_batch`.""" 762 if context.executing_eagerly(): 763 raise ValueError( 764 "Input pipelines based on Queues are not supported when eager execution" 765 " is enabled. Please use tf.data to ingest data into your model" 766 " instead.") 767 tensor_list = _as_tensor_list(tensors) 768 with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name: 769 tensor_list = _validate(tensor_list) 770 keep_input = _validate_keep_input(keep_input, enqueue_many) 771 (tensor_list, sparse_info) = _store_sparse_tensors( 772 tensor_list, enqueue_many, keep_input) 773 types = _dtypes([tensor_list]) 774 shapes = _shapes([tensor_list], shapes, enqueue_many) 775 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 776 queue = _which_queue(dynamic_pad)( 777 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 778 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 779 summary.scalar( 780 "fraction_of_%d_full" % capacity, 781 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 782 783 if allow_smaller_final_batch: 784 dequeued = queue.dequeue_up_to(batch_size, name=name) 785 else: 786 dequeued = queue.dequeue_many(batch_size, name=name) 787 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 788 return _as_original_type(tensors, dequeued) 789 790 791# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be 792# a multiple of len(tensor_list_list)?) parameter, to address the use 793# case where you want more parallelism than you can support different 794# readers (either because you don't have that many files or can't 795# read that many files in parallel due to the number of seeks required). 796# Once this is done, batch() can be written as a call to batch_join(). 797def _batch_join(tensors_list, batch_size, keep_input, capacity=32, 798 enqueue_many=False, shapes=None, dynamic_pad=False, 799 allow_smaller_final_batch=False, shared_name=None, name=None): 800 """Helper function for `batch_join` and `maybe_batch_join`.""" 801 if context.executing_eagerly(): 802 raise ValueError( 803 "Input pipelines based on Queues are not supported when eager execution" 804 " is enabled. Please use tf.data to ingest data into your model" 805 " instead.") 806 tensor_list_list = _as_tensor_list_list(tensors_list) 807 with ops.name_scope(name, "batch_join", 808 _flatten(tensor_list_list) + [keep_input]) as name: 809 tensor_list_list = _validate_join(tensor_list_list) 810 keep_input = _validate_keep_input(keep_input, enqueue_many) 811 tensor_list_list, sparse_info = _store_sparse_tensors_join( 812 tensor_list_list, enqueue_many, keep_input) 813 types = _dtypes(tensor_list_list) 814 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 815 # TODO(josh11b,mrry): Switch to BatchQueue once it is written. 816 queue = _which_queue(dynamic_pad)( 817 capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name) 818 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 819 summary.scalar( 820 "fraction_of_%d_full" % capacity, 821 math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity)) 822 823 if allow_smaller_final_batch: 824 dequeued = queue.dequeue_up_to(batch_size, name=name) 825 else: 826 dequeued = queue.dequeue_many(batch_size, name=name) 827 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 828 # tensors_list was validated to not be empty. 829 return _as_original_type(tensors_list[0], dequeued) 830 831 832def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 833 keep_input, num_threads=1, seed=None, enqueue_many=False, 834 shapes=None, allow_smaller_final_batch=False, 835 shared_name=None, name=None): 836 """Helper function for `shuffle_batch` and `maybe_shuffle_batch`.""" 837 if context.executing_eagerly(): 838 raise ValueError( 839 "Input pipelines based on Queues are not supported when eager execution" 840 " is enabled. Please use tf.data to ingest data into your model" 841 " instead.") 842 tensor_list = _as_tensor_list(tensors) 843 with ops.name_scope(name, "shuffle_batch", 844 list(tensor_list) + [keep_input]) as name: 845 if capacity <= min_after_dequeue: 846 raise ValueError("capacity %d must be bigger than min_after_dequeue %d." 847 % (capacity, min_after_dequeue)) 848 tensor_list = _validate(tensor_list) 849 keep_input = _validate_keep_input(keep_input, enqueue_many) 850 tensor_list, sparse_info = _store_sparse_tensors( 851 tensor_list, enqueue_many, keep_input) 852 types = _dtypes([tensor_list]) 853 shapes = _shapes([tensor_list], shapes, enqueue_many) 854 queue = data_flow_ops.RandomShuffleQueue( 855 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 856 dtypes=types, shapes=shapes, shared_name=shared_name) 857 _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input) 858 full = (math_ops.cast( 859 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 860 (1. / (capacity - min_after_dequeue))) 861 # Note that name contains a '/' at the end so we intentionally do not place 862 # a '/' after %s below. 863 summary_name = ( 864 "fraction_over_%d_of_%d_full" % 865 (min_after_dequeue, capacity - min_after_dequeue)) 866 summary.scalar(summary_name, full) 867 868 if allow_smaller_final_batch: 869 dequeued = queue.dequeue_up_to(batch_size, name=name) 870 else: 871 dequeued = queue.dequeue_many(batch_size, name=name) 872 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 873 return _as_original_type(tensors, dequeued) 874 875 876def _shuffle_batch_join(tensors_list, batch_size, capacity, 877 min_after_dequeue, keep_input, seed=None, 878 enqueue_many=False, shapes=None, 879 allow_smaller_final_batch=False, shared_name=None, 880 name=None): 881 """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`.""" 882 if context.executing_eagerly(): 883 raise ValueError( 884 "Input pipelines based on Queues are not supported when eager execution" 885 " is enabled. Please use tf.data to ingest data into your model" 886 " instead.") 887 tensor_list_list = _as_tensor_list_list(tensors_list) 888 with ops.name_scope(name, "shuffle_batch_join", 889 _flatten(tensor_list_list) + [keep_input]) as name: 890 tensor_list_list = _validate_join(tensor_list_list) 891 keep_input = _validate_keep_input(keep_input, enqueue_many) 892 tensor_list_list, sparse_info = _store_sparse_tensors_join( 893 tensor_list_list, enqueue_many, keep_input) 894 types = _dtypes(tensor_list_list) 895 shapes = _shapes(tensor_list_list, shapes, enqueue_many) 896 queue = data_flow_ops.RandomShuffleQueue( 897 capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed, 898 dtypes=types, shapes=shapes, shared_name=shared_name) 899 _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input) 900 full = (math_ops.cast( 901 math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) * 902 (1. / (capacity - min_after_dequeue))) 903 # Note that name contains a '/' at the end so we intentionally do not place 904 # a '/' after %s below. 905 summary_name = ( 906 "fraction_over_%d_of_%d_full" % 907 (min_after_dequeue, capacity - min_after_dequeue)) 908 summary.scalar(summary_name, full) 909 910 if allow_smaller_final_batch: 911 dequeued = queue.dequeue_up_to(batch_size, name=name) 912 else: 913 dequeued = queue.dequeue_many(batch_size, name=name) 914 dequeued = _restore_sparse_tensors(dequeued, sparse_info) 915 # tensors_list was validated to not be empty. 916 return _as_original_type(tensors_list[0], dequeued) 917 918# Batching functions ---------------------------------------------------------- 919 920 921@tf_export(v1=["train.batch"]) 922@deprecation.deprecated( 923 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 924 "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if " 925 "`dynamic_pad=True`).") 926def batch(tensors, batch_size, num_threads=1, capacity=32, 927 enqueue_many=False, shapes=None, dynamic_pad=False, 928 allow_smaller_final_batch=False, shared_name=None, name=None): 929 """Creates batches of tensors in `tensors`. 930 931 The argument `tensors` can be a list or a dictionary of tensors. 932 The value returned by the function will be of the same type 933 as `tensors`. 934 935 This function is implemented using a queue. A `QueueRunner` for the 936 queue is added to the current `Graph`'s `QUEUE_RUNNER` collection. 937 938 If `enqueue_many` is `False`, `tensors` is assumed to represent a single 939 example. An input tensor with shape `[x, y, z]` will be output as a tensor 940 with shape `[batch_size, x, y, z]`. 941 942 If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of 943 examples, where the first dimension is indexed by example, and all members of 944 `tensors` should have the same size in the first dimension. If an input 945 tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x, 946 y, z]`. The `capacity` argument controls the how long the prefetching is 947 allowed to grow the queues. 948 949 The returned operation is a dequeue operation and will throw 950 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 951 operation is feeding another input queue, its queue runner will catch 952 this exception, however, if this operation is used in your main thread 953 you are responsible for catching this yourself. 954 955 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 956 (i) the `shapes` argument is passed, or (ii) all of the tensors in 957 `tensors` must have fully-defined shapes. `ValueError` will be 958 raised if neither of these conditions holds. 959 960 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 961 tensors is known, but individual dimensions may have shape `None`. 962 In this case, for each enqueue the dimensions with value `None` 963 may have a variable length; upon dequeue, the output tensors will be padded 964 on the right to the maximum shape of the tensors in the current minibatch. 965 For numbers, this padding takes value 0. For strings, this padding is 966 the empty string. See `PaddingFIFOQueue` for more info. 967 968 If `allow_smaller_final_batch` is `True`, a smaller batch value than 969 `batch_size` is returned when the queue is closed and there are not enough 970 elements to fill the batch, otherwise the pending elements are discarded. 971 In addition, all output tensors' static shapes, as accessed via the 972 `shape` property will have a first `Dimension` value of `None`, and 973 operations that depend on fixed batch_size would fail. 974 975 Args: 976 tensors: The list or dictionary of tensors to enqueue. 977 batch_size: The new batch size pulled from the queue. 978 num_threads: The number of threads enqueuing `tensors`. The batching will 979 be nondeterministic if `num_threads > 1`. 980 capacity: An integer. The maximum number of elements in the queue. 981 enqueue_many: Whether each tensor in `tensors` is a single example. 982 shapes: (Optional) The shapes for each example. Defaults to the 983 inferred shapes for `tensors`. 984 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 985 The given dimensions are padded upon dequeue so that tensors within a 986 batch have the same shapes. 987 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 988 batch to be smaller if there are insufficient items left in the queue. 989 shared_name: (Optional). If set, this queue will be shared under the given 990 name across multiple sessions. 991 name: (Optional) A name for the operations. 992 993 Returns: 994 A list or dictionary of tensors with the same types as `tensors` (except if 995 the input is a list of one element, then it returns a tensor, not a list). 996 997 Raises: 998 ValueError: If the `shapes` are not specified, and cannot be 999 inferred from the elements of `tensors`. 1000 1001 @compatibility(eager) 1002 Input pipelines based on Queues are not supported when eager execution is 1003 enabled. Please use the `tf.data` API to ingest data under eager execution. 1004 @end_compatibility 1005 """ 1006 return _batch( 1007 tensors, 1008 batch_size, 1009 keep_input=True, 1010 num_threads=num_threads, 1011 capacity=capacity, 1012 enqueue_many=enqueue_many, 1013 shapes=shapes, 1014 dynamic_pad=dynamic_pad, 1015 allow_smaller_final_batch=allow_smaller_final_batch, 1016 shared_name=shared_name, 1017 name=name) 1018 1019 1020@tf_export(v1=["train.maybe_batch"]) 1021@deprecation.deprecated( 1022 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1023 "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`" 1024 " if `dynamic_pad=True`).") 1025def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32, 1026 enqueue_many=False, shapes=None, dynamic_pad=False, 1027 allow_smaller_final_batch=False, shared_name=None, name=None): 1028 """Conditionally creates batches of tensors based on `keep_input`. 1029 1030 See docstring in `batch` for more details. 1031 1032 Args: 1033 tensors: The list or dictionary of tensors to enqueue. 1034 keep_input: A `bool` Tensor. This tensor controls whether the input is 1035 added to the queue or not. If it is a scalar and evaluates `True`, then 1036 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1037 is `True`, then each example is added to the queue only if the 1038 corresponding value in `keep_input` is `True`. This tensor essentially 1039 acts as a filtering mechanism. 1040 batch_size: The new batch size pulled from the queue. 1041 num_threads: The number of threads enqueuing `tensors`. The batching will 1042 be nondeterministic if `num_threads > 1`. 1043 capacity: An integer. The maximum number of elements in the queue. 1044 enqueue_many: Whether each tensor in `tensors` is a single example. 1045 shapes: (Optional) The shapes for each example. Defaults to the 1046 inferred shapes for `tensors`. 1047 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1048 The given dimensions are padded upon dequeue so that tensors within a 1049 batch have the same shapes. 1050 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1051 batch to be smaller if there are insufficient items left in the queue. 1052 shared_name: (Optional). If set, this queue will be shared under the given 1053 name across multiple sessions. 1054 name: (Optional) A name for the operations. 1055 1056 Returns: 1057 A list or dictionary of tensors with the same types as `tensors`. 1058 1059 Raises: 1060 ValueError: If the `shapes` are not specified, and cannot be 1061 inferred from the elements of `tensors`. 1062 """ 1063 return _batch( 1064 tensors, 1065 batch_size, 1066 keep_input, 1067 num_threads=num_threads, 1068 capacity=capacity, 1069 enqueue_many=enqueue_many, 1070 shapes=shapes, 1071 dynamic_pad=dynamic_pad, 1072 allow_smaller_final_batch=allow_smaller_final_batch, 1073 shared_name=shared_name, 1074 name=name) 1075 1076 1077@tf_export(v1=["train.batch_join"]) 1078@deprecation.deprecated( 1079 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1080 "`tf.data.Dataset.interleave(...).batch(batch_size)` (or " 1081 "`padded_batch(...)` if `dynamic_pad=True`).") 1082def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False, 1083 shapes=None, dynamic_pad=False, allow_smaller_final_batch=False, 1084 shared_name=None, name=None): 1085 """Runs a list of tensors to fill a queue to create batches of examples. 1086 1087 The `tensors_list` argument is a list of tuples of tensors, or a list of 1088 dictionaries of tensors. Each element in the list is treated similarly 1089 to the `tensors` argument of `tf.compat.v1.train.batch()`. 1090 1091 WARNING: This function is nondeterministic, since it starts a separate thread 1092 for each tensor. 1093 1094 Enqueues a different list of tensors in different threads. 1095 Implemented using a queue -- a `QueueRunner` for the queue 1096 is added to the current `Graph`'s `QUEUE_RUNNER` collection. 1097 1098 `len(tensors_list)` threads will be started, 1099 with thread `i` enqueuing the tensors from 1100 `tensors_list[i]`. `tensors_list[i1][j]` must match 1101 `tensors_list[i2][j]` in type and shape, except in the first 1102 dimension if `enqueue_many` is true. 1103 1104 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1105 to represent a single example. An input tensor `x` will be output as a 1106 tensor with shape `[batch_size] + x.shape`. 1107 1108 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1109 represent a batch of examples, where the first dimension is indexed 1110 by example, and all members of `tensors_list[i]` should have the 1111 same size in the first dimension. The slices of any input tensor 1112 `x` are treated as examples, and the output tensors will have shape 1113 `[batch_size] + x.shape[1:]`. 1114 1115 The `capacity` argument controls the how long the prefetching is allowed to 1116 grow the queues. 1117 1118 The returned operation is a dequeue operation and will throw 1119 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1120 operation is feeding another input queue, its queue runner will catch 1121 this exception, however, if this operation is used in your main thread 1122 you are responsible for catching this yourself. 1123 1124 *N.B.:* If `dynamic_pad` is `False`, you must ensure that either 1125 (i) the `shapes` argument is passed, or (ii) all of the tensors in 1126 `tensors_list` must have fully-defined shapes. `ValueError` will be 1127 raised if neither of these conditions holds. 1128 1129 If `dynamic_pad` is `True`, it is sufficient that the *rank* of the 1130 tensors is known, but individual dimensions may have value `None`. 1131 In this case, for each enqueue the dimensions with value `None` 1132 may have a variable length; upon dequeue, the output tensors will be padded 1133 on the right to the maximum shape of the tensors in the current minibatch. 1134 For numbers, this padding takes value 0. For strings, this padding is 1135 the empty string. See `PaddingFIFOQueue` for more info. 1136 1137 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1138 `batch_size` is returned when the queue is closed and there are not enough 1139 elements to fill the batch, otherwise the pending elements are discarded. 1140 In addition, all output tensors' static shapes, as accessed via the 1141 `shape` property will have a first `Dimension` value of `None`, and 1142 operations that depend on fixed batch_size would fail. 1143 1144 Args: 1145 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1146 batch_size: An integer. The new batch size pulled from the queue. 1147 capacity: An integer. The maximum number of elements in the queue. 1148 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1149 example. 1150 shapes: (Optional) The shapes for each example. Defaults to the 1151 inferred shapes for `tensor_list_list[i]`. 1152 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1153 The given dimensions are padded upon dequeue so that tensors within a 1154 batch have the same shapes. 1155 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1156 batch to be smaller if there are insufficient items left in the queue. 1157 shared_name: (Optional) If set, this queue will be shared under the given 1158 name across multiple sessions. 1159 name: (Optional) A name for the operations. 1160 1161 Returns: 1162 A list or dictionary of tensors with the same number and types as 1163 `tensors_list[i]`. 1164 1165 Raises: 1166 ValueError: If the `shapes` are not specified, and cannot be 1167 inferred from the elements of `tensor_list_list`. 1168 1169 @compatibility(eager) 1170 Input pipelines based on Queues are not supported when eager execution is 1171 enabled. Please use the `tf.data` API to ingest data under eager execution. 1172 @end_compatibility 1173 """ 1174 return _batch_join( 1175 tensors_list, 1176 batch_size, 1177 keep_input=True, 1178 capacity=capacity, 1179 enqueue_many=enqueue_many, 1180 shapes=shapes, 1181 dynamic_pad=dynamic_pad, 1182 allow_smaller_final_batch=allow_smaller_final_batch, 1183 shared_name=shared_name, 1184 name=name) 1185 1186 1187@tf_export(v1=["train.maybe_batch_join"]) 1188@deprecation.deprecated( 1189 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1190 "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or " 1191 "`padded_batch(...)` if `dynamic_pad=True`).") 1192def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32, 1193 enqueue_many=False, shapes=None, dynamic_pad=False, 1194 allow_smaller_final_batch=False, shared_name=None, 1195 name=None): 1196 """Runs a list of tensors to conditionally fill a queue to create batches. 1197 1198 See docstring in `batch_join` for more details. 1199 1200 Args: 1201 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1202 keep_input: A `bool` Tensor. This tensor controls whether the input is 1203 added to the queue or not. If it is a scalar and evaluates `True`, then 1204 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1205 is `True`, then each example is added to the queue only if the 1206 corresponding value in `keep_input` is `True`. This tensor essentially 1207 acts as a filtering mechanism. 1208 batch_size: An integer. The new batch size pulled from the queue. 1209 capacity: An integer. The maximum number of elements in the queue. 1210 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1211 example. 1212 shapes: (Optional) The shapes for each example. Defaults to the 1213 inferred shapes for `tensor_list_list[i]`. 1214 dynamic_pad: Boolean. Allow variable dimensions in input shapes. 1215 The given dimensions are padded upon dequeue so that tensors within a 1216 batch have the same shapes. 1217 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1218 batch to be smaller if there are insufficient items left in the queue. 1219 shared_name: (Optional) If set, this queue will be shared under the given 1220 name across multiple sessions. 1221 name: (Optional) A name for the operations. 1222 1223 Returns: 1224 A list or dictionary of tensors with the same number and types as 1225 `tensors_list[i]`. 1226 1227 Raises: 1228 ValueError: If the `shapes` are not specified, and cannot be 1229 inferred from the elements of `tensor_list_list`. 1230 """ 1231 return _batch_join( 1232 tensors_list, 1233 batch_size, 1234 keep_input, 1235 capacity=capacity, 1236 enqueue_many=enqueue_many, 1237 shapes=shapes, 1238 dynamic_pad=dynamic_pad, 1239 allow_smaller_final_batch=allow_smaller_final_batch, 1240 shared_name=shared_name, 1241 name=name) 1242 1243 1244@tf_export(v1=["train.shuffle_batch"]) 1245@deprecation.deprecated( 1246 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1247 "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.") 1248def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1249 num_threads=1, seed=None, enqueue_many=False, shapes=None, 1250 allow_smaller_final_batch=False, shared_name=None, name=None): 1251 """Creates batches by randomly shuffling tensors. 1252 1253 This function adds the following to the current `Graph`: 1254 1255 * A shuffling queue into which tensors from `tensors` are enqueued. 1256 * A `dequeue_many` operation to create batches from the queue. 1257 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1258 from `tensors`. 1259 1260 If `enqueue_many` is `False`, `tensors` is assumed to represent a 1261 single example. An input tensor with shape `[x, y, z]` will be output 1262 as a tensor with shape `[batch_size, x, y, z]`. 1263 1264 If `enqueue_many` is `True`, `tensors` is assumed to represent a 1265 batch of examples, where the first dimension is indexed by example, 1266 and all members of `tensors` should have the same size in the 1267 first dimension. If an input tensor has shape `[*, x, y, z]`, the 1268 output will have shape `[batch_size, x, y, z]`. 1269 1270 The `capacity` argument controls the how long the prefetching is allowed to 1271 grow the queues. 1272 1273 The returned operation is a dequeue operation and will throw 1274 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1275 operation is feeding another input queue, its queue runner will catch 1276 this exception, however, if this operation is used in your main thread 1277 you are responsible for catching this yourself. 1278 1279 For example: 1280 1281 ```python 1282 # Creates batches of 32 images and 32 labels. 1283 image_batch, label_batch = tf.compat.v1.train.shuffle_batch( 1284 [single_image, single_label], 1285 batch_size=32, 1286 num_threads=4, 1287 capacity=50000, 1288 min_after_dequeue=10000) 1289 ``` 1290 1291 *N.B.:* You must ensure that either (i) the `shapes` argument is 1292 passed, or (ii) all of the tensors in `tensors` must have 1293 fully-defined shapes. `ValueError` will be raised if neither of 1294 these conditions holds. 1295 1296 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1297 `batch_size` is returned when the queue is closed and there are not enough 1298 elements to fill the batch, otherwise the pending elements are discarded. 1299 In addition, all output tensors' static shapes, as accessed via the 1300 `shape` property will have a first `Dimension` value of `None`, and 1301 operations that depend on fixed batch_size would fail. 1302 1303 Args: 1304 tensors: The list or dictionary of tensors to enqueue. 1305 batch_size: The new batch size pulled from the queue. 1306 capacity: An integer. The maximum number of elements in the queue. 1307 min_after_dequeue: Minimum number elements in the queue after a 1308 dequeue, used to ensure a level of mixing of elements. 1309 num_threads: The number of threads enqueuing `tensor_list`. 1310 seed: Seed for the random shuffling within the queue. 1311 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1312 shapes: (Optional) The shapes for each example. Defaults to the 1313 inferred shapes for `tensor_list`. 1314 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1315 batch to be smaller if there are insufficient items left in the queue. 1316 shared_name: (Optional) If set, this queue will be shared under the given 1317 name across multiple sessions. 1318 name: (Optional) A name for the operations. 1319 1320 Returns: 1321 A list or dictionary of tensors with the types as `tensors`. 1322 1323 Raises: 1324 ValueError: If the `shapes` are not specified, and cannot be 1325 inferred from the elements of `tensors`. 1326 1327 @compatibility(eager) 1328 Input pipelines based on Queues are not supported when eager execution is 1329 enabled. Please use the `tf.data` API to ingest data under eager execution. 1330 @end_compatibility 1331 """ 1332 return _shuffle_batch( 1333 tensors, 1334 batch_size, 1335 capacity, 1336 min_after_dequeue, 1337 keep_input=True, 1338 num_threads=num_threads, 1339 seed=seed, 1340 enqueue_many=enqueue_many, 1341 shapes=shapes, 1342 allow_smaller_final_batch=allow_smaller_final_batch, 1343 shared_name=shared_name, 1344 name=name) 1345 1346 1347@tf_export(v1=["train.maybe_shuffle_batch"]) 1348@deprecation.deprecated( 1349 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1350 "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`" 1351 ".") 1352def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue, 1353 keep_input, num_threads=1, seed=None, 1354 enqueue_many=False, shapes=None, 1355 allow_smaller_final_batch=False, shared_name=None, 1356 name=None): 1357 """Creates batches by randomly shuffling conditionally-enqueued tensors. 1358 1359 See docstring in `shuffle_batch` for more details. 1360 1361 Args: 1362 tensors: The list or dictionary of tensors to enqueue. 1363 batch_size: The new batch size pulled from the queue. 1364 capacity: An integer. The maximum number of elements in the queue. 1365 min_after_dequeue: Minimum number elements in the queue after a 1366 dequeue, used to ensure a level of mixing of elements. 1367 keep_input: A `bool` Tensor. This tensor controls whether the input is 1368 added to the queue or not. If it is a scalar and evaluates `True`, then 1369 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1370 is `True`, then each example is added to the queue only if the 1371 corresponding value in `keep_input` is `True`. This tensor essentially 1372 acts as a filtering mechanism. 1373 num_threads: The number of threads enqueuing `tensor_list`. 1374 seed: Seed for the random shuffling within the queue. 1375 enqueue_many: Whether each tensor in `tensor_list` is a single example. 1376 shapes: (Optional) The shapes for each example. Defaults to the 1377 inferred shapes for `tensor_list`. 1378 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1379 batch to be smaller if there are insufficient items left in the queue. 1380 shared_name: (Optional) If set, this queue will be shared under the given 1381 name across multiple sessions. 1382 name: (Optional) A name for the operations. 1383 1384 Returns: 1385 A list or dictionary of tensors with the types as `tensors`. 1386 1387 Raises: 1388 ValueError: If the `shapes` are not specified, and cannot be 1389 inferred from the elements of `tensors`. 1390 1391 @compatibility(eager) 1392 Input pipelines based on Queues are not supported when eager execution is 1393 enabled. Please use the `tf.data` API to ingest data under eager execution. 1394 @end_compatibility 1395 """ 1396 return _shuffle_batch( 1397 tensors, 1398 batch_size, 1399 capacity, 1400 min_after_dequeue, 1401 keep_input, 1402 num_threads=num_threads, 1403 seed=seed, 1404 enqueue_many=enqueue_many, 1405 shapes=shapes, 1406 allow_smaller_final_batch=allow_smaller_final_batch, 1407 shared_name=shared_name, 1408 name=name) 1409 1410 1411@tf_export(v1=["train.shuffle_batch_join"]) 1412@deprecation.deprecated( 1413 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1414 "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch" 1415 "(batch_size)`.") 1416def shuffle_batch_join(tensors_list, batch_size, capacity, 1417 min_after_dequeue, seed=None, enqueue_many=False, 1418 shapes=None, allow_smaller_final_batch=False, 1419 shared_name=None, name=None): 1420 """Create batches by randomly shuffling tensors. 1421 1422 The `tensors_list` argument is a list of tuples of tensors, or a list of 1423 dictionaries of tensors. Each element in the list is treated similarly 1424 to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`. 1425 1426 This version enqueues a different list of tensors in different threads. 1427 It adds the following to the current `Graph`: 1428 1429 * A shuffling queue into which tensors from `tensors_list` are enqueued. 1430 * A `dequeue_many` operation to create batches from the queue. 1431 * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors 1432 from `tensors_list`. 1433 1434 `len(tensors_list)` threads will be started, with thread `i` enqueuing 1435 the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match 1436 `tensors_list[i2][j]` in type and shape, except in the first dimension if 1437 `enqueue_many` is true. 1438 1439 If `enqueue_many` is `False`, each `tensors_list[i]` is assumed 1440 to represent a single example. An input tensor with shape `[x, y, z]` 1441 will be output as a tensor with shape `[batch_size, x, y, z]`. 1442 1443 If `enqueue_many` is `True`, `tensors_list[i]` is assumed to 1444 represent a batch of examples, where the first dimension is indexed 1445 by example, and all members of `tensors_list[i]` should have the 1446 same size in the first dimension. If an input tensor has shape `[*, x, 1447 y, z]`, the output will have shape `[batch_size, x, y, z]`. 1448 1449 The `capacity` argument controls the how long the prefetching is allowed to 1450 grow the queues. 1451 1452 The returned operation is a dequeue operation and will throw 1453 `tf.errors.OutOfRangeError` if the input queue is exhausted. If this 1454 operation is feeding another input queue, its queue runner will catch 1455 this exception, however, if this operation is used in your main thread 1456 you are responsible for catching this yourself. 1457 1458 If `allow_smaller_final_batch` is `True`, a smaller batch value than 1459 `batch_size` is returned when the queue is closed and there are not enough 1460 elements to fill the batch, otherwise the pending elements are discarded. 1461 In addition, all output tensors' static shapes, as accessed via the 1462 `shape` property will have a first `Dimension` value of `None`, and 1463 operations that depend on fixed batch_size would fail. 1464 1465 Args: 1466 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1467 batch_size: An integer. The new batch size pulled from the queue. 1468 capacity: An integer. The maximum number of elements in the queue. 1469 min_after_dequeue: Minimum number elements in the queue after a 1470 dequeue, used to ensure a level of mixing of elements. 1471 seed: Seed for the random shuffling within the queue. 1472 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1473 example. 1474 shapes: (Optional) The shapes for each example. Defaults to the 1475 inferred shapes for `tensors_list[i]`. 1476 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1477 batch to be smaller if there are insufficient items left in the queue. 1478 shared_name: (optional). If set, this queue will be shared under the given 1479 name across multiple sessions. 1480 name: (Optional) A name for the operations. 1481 1482 Returns: 1483 A list or dictionary of tensors with the same number and types as 1484 `tensors_list[i]`. 1485 1486 Raises: 1487 ValueError: If the `shapes` are not specified, and cannot be 1488 inferred from the elements of `tensors_list`. 1489 1490 @compatibility(eager) 1491 Input pipelines based on Queues are not supported when eager execution is 1492 enabled. Please use the `tf.data` API to ingest data under eager execution. 1493 @end_compatibility 1494 """ 1495 return _shuffle_batch_join( 1496 tensors_list, 1497 batch_size, 1498 capacity, 1499 min_after_dequeue, 1500 keep_input=True, 1501 seed=seed, 1502 enqueue_many=enqueue_many, 1503 shapes=shapes, 1504 allow_smaller_final_batch=allow_smaller_final_batch, 1505 shared_name=shared_name, 1506 name=name) 1507 1508 1509@tf_export(v1=["train.maybe_shuffle_batch_join"]) 1510@deprecation.deprecated( 1511 None, "Queue-based input pipelines have been replaced by `tf.data`. Use " 1512 "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)" 1513 ".batch(batch_size)`.") 1514def maybe_shuffle_batch_join(tensors_list, batch_size, capacity, 1515 min_after_dequeue, keep_input, seed=None, 1516 enqueue_many=False, shapes=None, 1517 allow_smaller_final_batch=False, shared_name=None, 1518 name=None): 1519 """Create batches by randomly shuffling conditionally-enqueued tensors. 1520 1521 See docstring in `shuffle_batch_join` for more details. 1522 1523 Args: 1524 tensors_list: A list of tuples or dictionaries of tensors to enqueue. 1525 batch_size: An integer. The new batch size pulled from the queue. 1526 capacity: An integer. The maximum number of elements in the queue. 1527 min_after_dequeue: Minimum number elements in the queue after a 1528 dequeue, used to ensure a level of mixing of elements. 1529 keep_input: A `bool` Tensor. This tensor controls whether the input is 1530 added to the queue or not. If it is a scalar and evaluates `True`, then 1531 `tensors` are all added to the queue. If it is a vector and `enqueue_many` 1532 is `True`, then each example is added to the queue only if the 1533 corresponding value in `keep_input` is `True`. This tensor essentially 1534 acts as a filtering mechanism. 1535 seed: Seed for the random shuffling within the queue. 1536 enqueue_many: Whether each tensor in `tensor_list_list` is a single 1537 example. 1538 shapes: (Optional) The shapes for each example. Defaults to the 1539 inferred shapes for `tensors_list[i]`. 1540 allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final 1541 batch to be smaller if there are insufficient items left in the queue. 1542 shared_name: (optional). If set, this queue will be shared under the given 1543 name across multiple sessions. 1544 name: (Optional) A name for the operations. 1545 1546 Returns: 1547 A list or dictionary of tensors with the same number and types as 1548 `tensors_list[i]`. 1549 1550 Raises: 1551 ValueError: If the `shapes` are not specified, and cannot be 1552 inferred from the elements of `tensors_list`. 1553 1554 @compatibility(eager) 1555 Input pipelines based on Queues are not supported when eager execution is 1556 enabled. Please use the `tf.data` API to ingest data under eager execution. 1557 @end_compatibility 1558 """ 1559 return _shuffle_batch_join( 1560 tensors_list, 1561 batch_size, 1562 capacity, 1563 min_after_dequeue, 1564 keep_input, 1565 seed=seed, 1566 enqueue_many=enqueue_many, 1567 shapes=shapes, 1568 allow_smaller_final_batch=allow_smaller_final_batch, 1569 shared_name=shared_name, 1570 name=name) 1571