1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "icing/join/join-processor.h"
16
17 #include <algorithm>
18 #include <deque>
19 #include <memory>
20 #include <optional>
21 #include <queue>
22 #include <string>
23 #include <string_view>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <utility>
27 #include <vector>
28
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/join/aggregation-scorer.h"
34 #include "icing/join/document-join-id-pair.h"
35 #include "icing/join/join-children-fetcher-impl-deprecated.h"
36 #include "icing/join/join-children-fetcher-impl-v3.h"
37 #include "icing/join/join-children-fetcher.h"
38 #include "icing/join/qualified-id-join-index.h"
39 #include "icing/join/qualified-id.h"
40 #include "icing/proto/schema.pb.h"
41 #include "icing/proto/scoring.pb.h"
42 #include "icing/proto/search.pb.h"
43 #include "icing/schema/joinable-property.h"
44 #include "icing/scoring/scored-document-hit.h"
45 #include "icing/store/document-filter-data.h"
46 #include "icing/store/document-id.h"
47 #include "icing/store/namespace-id-fingerprint.h"
48 #include "icing/util/logging.h"
49 #include "icing/util/status-macros.h"
50
51 namespace icing {
52 namespace lib {
53
54 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcher(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)55 JoinProcessor::GetChildrenFetcher(
56 const JoinSpecProto& join_spec,
57 std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
58 if (join_spec.parent_property_expression() != kQualifiedIdExpr) {
59 // TODO(b/256022027): So far we only support kQualifiedIdExpr for
60 // parent_property_expression, we could support more.
61 return absl_ports::UnimplementedError(absl_ports::StrCat(
62 "Parent property expression must be ", kQualifiedIdExpr));
63 }
64
65 switch (qualified_id_join_index_->version()) {
66 case QualifiedIdJoinIndex::Version::kV1:
67 return GetChildrenFetcherV1(join_spec,
68 std::move(child_scored_document_hits));
69 case QualifiedIdJoinIndex::Version::kV2:
70 return GetChildrenFetcherV2(join_spec,
71 std::move(child_scored_document_hits));
72 case QualifiedIdJoinIndex::Version::kV3:
73 return JoinChildrenFetcherImplV3::Create(
74 join_spec, schema_store_, doc_store_, qualified_id_join_index_,
75 current_time_ms_, std::move(child_scored_document_hits));
76 }
77 }
78
79 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV1(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)80 JoinProcessor::GetChildrenFetcherV1(
81 const JoinSpecProto& join_spec,
82 std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
83 ScoredDocumentHitComparator score_comparator(
84 /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
85 ScoringSpecProto::Order::DESC);
86 std::sort(child_scored_document_hits.begin(),
87 child_scored_document_hits.end(), score_comparator);
88
89 // Step 1: group child documents by parent documentId. Currently we only
90 // support QualifiedId joining, so fetch the qualified id content of
91 // child_property_expression, break it down into namespace + uri, and
92 // lookup the DocumentId.
93 // The keys of this map are the DocumentIds of the parent docs the child
94 // ScoredDocumentHits refer to. The values in this map are vectors of child
95 // ScoredDocumentHits that refer to a parent DocumentId.
96 std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
97 map_joinable_qualified_id;
98 for (const ScoredDocumentHit& child : child_scored_document_hits) {
99 ICING_ASSIGN_OR_RETURN(
100 DocumentId ref_doc_id,
101 FetchReferencedQualifiedId(child.document_id(),
102 join_spec.child_property_expression()));
103 if (ref_doc_id == kInvalidDocumentId) {
104 continue;
105 }
106
107 map_joinable_qualified_id[ref_doc_id].push_back(child);
108 }
109 return JoinChildrenFetcherImplDeprecated::Create(
110 join_spec, std::move(map_joinable_qualified_id));
111 }
112
113 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV2(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)114 JoinProcessor::GetChildrenFetcherV2(
115 const JoinSpecProto& join_spec,
116 std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
117 // Step 1a: sort child ScoredDocumentHits in document id descending order.
118 std::sort(child_scored_document_hits.begin(),
119 child_scored_document_hits.end(),
120 [](const ScoredDocumentHit& lhs, const ScoredDocumentHit& rhs) {
121 return lhs.document_id() > rhs.document_id();
122 });
123
124 // Step 1b: group all child ScoredDocumentHits by the document's
125 // schema_type_id.
126 std::unordered_map<SchemaTypeId, std::vector<ScoredDocumentHit>>
127 schema_to_child_scored_doc_hits_map;
128 for (const ScoredDocumentHit& child_scored_document_hit :
129 child_scored_document_hits) {
130 std::optional<DocumentFilterData> child_doc_filter_data =
131 doc_store_->GetAliveDocumentFilterData(
132 child_scored_document_hit.document_id(), current_time_ms_);
133 if (!child_doc_filter_data) {
134 continue;
135 }
136
137 schema_to_child_scored_doc_hits_map[child_doc_filter_data->schema_type_id()]
138 .push_back(child_scored_document_hit);
139 }
140
141 // Step 1c: for each schema_type_id, lookup QualifiedIdJoinIndexImplV2 to
142 // fetch all child join data from posting list(s). Convert all
143 // child join data to referenced parent document ids and bucketize
144 // child ScoredDocumentHits by it.
145 std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
146 parent_to_child_docs_map;
147 for (auto& [schema_type_id, grouped_child_scored_doc_hits] :
148 schema_to_child_scored_doc_hits_map) {
149 // Get joinable_property_id of this schema.
150 ICING_ASSIGN_OR_RETURN(
151 const JoinablePropertyMetadata* metadata,
152 schema_store_->GetJoinablePropertyMetadata(
153 schema_type_id, join_spec.child_property_expression()));
154 if (metadata == nullptr ||
155 metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
156 // Currently we only support qualified id, so skip other types.
157 continue;
158 }
159
160 // Lookup QualifiedIdJoinIndexImplV2.
161 ICING_ASSIGN_OR_RETURN(
162 std::unique_ptr<QualifiedIdJoinIndex::JoinDataIteratorBase>
163 join_index_iter,
164 qualified_id_join_index_->GetIterator(
165 schema_type_id, /*joinable_property_id=*/metadata->id));
166
167 // - Join index contains all join data of schema_type_id and
168 // join_index_iter will return all of them in (child) document id
169 // descending order.
170 // - But we only need join data of child document ids which appear in
171 // grouped_child_scored_doc_hits. Also grouped_child_scored_doc_hits
172 // contain ScoredDocumentHits in (child) document id descending order.
173 // - Therefore, we advance 2 iterators to intersect them and get desired
174 // join data.
175 auto child_scored_doc_hits_iter = grouped_child_scored_doc_hits.cbegin();
176 while (join_index_iter->Advance().ok() &&
177 child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend()) {
178 // Advance child_scored_doc_hits_iter until it points to a
179 // ScoredDocumentHit with document id <= the one pointed by
180 // join_index_iter.
181 while (child_scored_doc_hits_iter !=
182 grouped_child_scored_doc_hits.cend() &&
183 child_scored_doc_hits_iter->document_id() >
184 join_index_iter->GetCurrent().document_id()) {
185 ++child_scored_doc_hits_iter;
186 }
187
188 if (child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend() &&
189 child_scored_doc_hits_iter->document_id() ==
190 join_index_iter->GetCurrent().document_id()) {
191 // We get a join data whose child document id exists in both join
192 // index and grouped_child_scored_doc_hits. Convert its join info to
193 // referenced parent document ids and bucketize ScoredDocumentHits by
194 // it (putting into parent_to_child_docs_map).
195 const NamespaceIdFingerprint& ref_doc_nsid_uri_fingerprint =
196 join_index_iter->GetCurrent().join_info();
197 libtextclassifier3::StatusOr<DocumentId> ref_parent_doc_id_or =
198 doc_store_->GetDocumentId(ref_doc_nsid_uri_fingerprint);
199 if (ref_parent_doc_id_or.ok()) {
200 parent_to_child_docs_map[std::move(ref_parent_doc_id_or).ValueOrDie()]
201 .push_back(*child_scored_doc_hits_iter);
202 }
203 }
204 }
205 }
206
207 // Step 1d: finally, sort each parent's joined child ScoredDocumentHits by
208 // score.
209 ScoredDocumentHitComparator score_comparator(
210 /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
211 ScoringSpecProto::Order::DESC);
212 for (auto& [parent_doc_id, bucketized_child_scored_hits] :
213 parent_to_child_docs_map) {
214 std::sort(bucketized_child_scored_hits.begin(),
215 bucketized_child_scored_hits.end(), score_comparator);
216 }
217
218 return JoinChildrenFetcherImplDeprecated::Create(
219 join_spec, std::move(parent_to_child_docs_map));
220 }
221
222 libtextclassifier3::StatusOr<std::vector<JoinedScoredDocumentHit>>
Join(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && parent_scored_document_hits,const JoinChildrenFetcher & join_children_fetcher)223 JoinProcessor::Join(
224 const JoinSpecProto& join_spec,
225 std::vector<ScoredDocumentHit>&& parent_scored_document_hits,
226 const JoinChildrenFetcher& join_children_fetcher) {
227 std::unique_ptr<AggregationScorer> aggregation_scorer =
228 AggregationScorer::Create(join_spec);
229
230 std::vector<JoinedScoredDocumentHit> joined_scored_document_hits;
231 joined_scored_document_hits.reserve(parent_scored_document_hits.size());
232
233 // Step 2: iterate through all parent documentIds and construct
234 // JoinedScoredDocumentHit for each by looking up
235 // join_children_fetcher.
236 for (ScoredDocumentHit& parent : parent_scored_document_hits) {
237 ICING_ASSIGN_OR_RETURN(
238 std::vector<ScoredDocumentHit> children,
239 join_children_fetcher.GetChildren(parent.document_id()));
240
241 double final_score = aggregation_scorer->GetScore(parent, children);
242 joined_scored_document_hits.emplace_back(final_score, std::move(parent),
243 std::move(children));
244 }
245
246 return joined_scored_document_hits;
247 }
248
249 libtextclassifier3::StatusOr<std::unordered_set<DocumentId>>
GetPropagatedChildDocumentsToDelete(const std::unordered_set<DocumentId> & deleted_document_ids)250 JoinProcessor::GetPropagatedChildDocumentsToDelete(
251 const std::unordered_set<DocumentId>& deleted_document_ids) {
252 // Sanity check: join index should be V3.
253 if (qualified_id_join_index_->version() !=
254 QualifiedIdJoinIndex::Version::kV3) {
255 return absl_ports::UnimplementedError(
256 "QualifiedIdJoinIndex version must be V3 to support delete "
257 "propagation.");
258 }
259
260 // BFS traverse to find all child documents to propagate delete.
261 std::queue<DocumentId> que(
262 std::deque(deleted_document_ids.begin(), deleted_document_ids.end()));
263 std::unordered_set<DocumentId> child_documents_to_delete;
264 while (!que.empty()) {
265 DocumentId doc_id_to_expand = que.front();
266 que.pop();
267
268 ICING_ASSIGN_OR_RETURN(std::vector<DocumentJoinIdPair> child_join_id_pairs,
269 qualified_id_join_index_->Get(doc_id_to_expand));
270 for (const DocumentJoinIdPair& child_join_id_pair : child_join_id_pairs) {
271 if (child_documents_to_delete.find(child_join_id_pair.document_id()) !=
272 child_documents_to_delete.end() ||
273 deleted_document_ids.find(child_join_id_pair.document_id()) !=
274 deleted_document_ids.end()) {
275 // Already added into the set to delete or already deleted (happens only
276 // when there is a cycle back to the deleted or traversed document in
277 // the join relation). Skip it.
278 continue;
279 }
280
281 // Get DocumentFilterData of the child document to look up its schema type
282 // id.
283 // - Skip if the child document has been deleted, since delete propagation
284 // should've been done to all its children when deleting it previously.
285 // - Otherwise, we have to handle this child document and propagate delete
286 // to the grandchildren, even if it is expired.
287 std::optional<DocumentFilterData> child_filter_data =
288 doc_store_->GetNonDeletedDocumentFilterData(
289 child_join_id_pair.document_id());
290 if (!child_filter_data) {
291 // The child document has been deleted. Skip.
292 continue;
293 }
294
295 libtextclassifier3::StatusOr<const JoinablePropertyMetadata*>
296 metadata_or = schema_store_->GetJoinablePropertyMetadata(
297 child_filter_data->schema_type_id(),
298 child_join_id_pair.joinable_property_id());
299 if (!metadata_or.ok() || metadata_or.ValueOrDie() == nullptr) {
300 // This shouldn't happen because we've validated it during indexing and
301 // only put valid DocumentJoinIdPair into qualified id join index.
302 // Log and skip it.
303 ICING_LOG(ERROR) << "Failed to get metadata for schema type id "
304 << child_filter_data->schema_type_id()
305 << ", joinable property id "
306 << static_cast<int>(
307 child_join_id_pair.joinable_property_id());
308 continue;
309 }
310 const JoinablePropertyMetadata* metadata = metadata_or.ValueOrDie();
311
312 if (metadata->value_type == JoinableConfig::ValueType::QUALIFIED_ID &&
313 metadata->delete_propagation_type ==
314 JoinableConfig::DeletePropagationType::PROPAGATE_FROM) {
315 child_documents_to_delete.insert(child_join_id_pair.document_id());
316 que.push(child_join_id_pair.document_id());
317 }
318 }
319 }
320
321 return child_documents_to_delete;
322 }
323
324 libtextclassifier3::StatusOr<DocumentId>
FetchReferencedQualifiedId(const DocumentId & child_document_id,const std::string & property_path) const325 JoinProcessor::FetchReferencedQualifiedId(
326 const DocumentId& child_document_id,
327 const std::string& property_path) const {
328 std::optional<DocumentFilterData> child_filter_data =
329 doc_store_->GetAliveDocumentFilterData(child_document_id,
330 current_time_ms_);
331 if (!child_filter_data) {
332 return kInvalidDocumentId;
333 }
334
335 ICING_ASSIGN_OR_RETURN(
336 const JoinablePropertyMetadata* metadata,
337 schema_store_->GetJoinablePropertyMetadata(
338 child_filter_data->schema_type_id(), property_path));
339 if (metadata == nullptr ||
340 metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
341 // Currently we only support qualified id.
342 return kInvalidDocumentId;
343 }
344
345 DocumentJoinIdPair info(child_document_id, metadata->id);
346 libtextclassifier3::StatusOr<std::string_view> ref_qualified_id_str_or =
347 qualified_id_join_index_->Get(info);
348 if (!ref_qualified_id_str_or.ok()) {
349 if (absl_ports::IsNotFound(ref_qualified_id_str_or.status())) {
350 return kInvalidDocumentId;
351 }
352 return std::move(ref_qualified_id_str_or).status();
353 }
354
355 libtextclassifier3::StatusOr<QualifiedId> ref_qualified_id_or =
356 QualifiedId::Parse(std::move(ref_qualified_id_str_or).ValueOrDie());
357 if (!ref_qualified_id_or.ok()) {
358 // This shouldn't happen because we've validated it during indexing and only
359 // put valid qualified id strings into qualified id join index.
360 return kInvalidDocumentId;
361 }
362 QualifiedId qualified_id = std::move(ref_qualified_id_or).ValueOrDie();
363
364 libtextclassifier3::StatusOr<DocumentId> ref_document_id_or =
365 doc_store_->GetDocumentId(qualified_id.name_space(), qualified_id.uri());
366 if (!ref_document_id_or.ok()) {
367 return kInvalidDocumentId;
368 }
369 return std::move(ref_document_id_or).ValueOrDie();
370 }
371
372 } // namespace lib
373 } // namespace icing
374