xref: /aosp_15_r20/external/icing/icing/join/join-processor.cc (revision 8b6cd535a057e39b3b86660c4aa06c99747c2136)
1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "icing/join/join-processor.h"
16 
17 #include <algorithm>
18 #include <deque>
19 #include <memory>
20 #include <optional>
21 #include <queue>
22 #include <string>
23 #include <string_view>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <utility>
27 #include <vector>
28 
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/join/aggregation-scorer.h"
34 #include "icing/join/document-join-id-pair.h"
35 #include "icing/join/join-children-fetcher-impl-deprecated.h"
36 #include "icing/join/join-children-fetcher-impl-v3.h"
37 #include "icing/join/join-children-fetcher.h"
38 #include "icing/join/qualified-id-join-index.h"
39 #include "icing/join/qualified-id.h"
40 #include "icing/proto/schema.pb.h"
41 #include "icing/proto/scoring.pb.h"
42 #include "icing/proto/search.pb.h"
43 #include "icing/schema/joinable-property.h"
44 #include "icing/scoring/scored-document-hit.h"
45 #include "icing/store/document-filter-data.h"
46 #include "icing/store/document-id.h"
47 #include "icing/store/namespace-id-fingerprint.h"
48 #include "icing/util/logging.h"
49 #include "icing/util/status-macros.h"
50 
51 namespace icing {
52 namespace lib {
53 
54 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcher(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)55 JoinProcessor::GetChildrenFetcher(
56     const JoinSpecProto& join_spec,
57     std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
58   if (join_spec.parent_property_expression() != kQualifiedIdExpr) {
59     // TODO(b/256022027): So far we only support kQualifiedIdExpr for
60     // parent_property_expression, we could support more.
61     return absl_ports::UnimplementedError(absl_ports::StrCat(
62         "Parent property expression must be ", kQualifiedIdExpr));
63   }
64 
65   switch (qualified_id_join_index_->version()) {
66     case QualifiedIdJoinIndex::Version::kV1:
67       return GetChildrenFetcherV1(join_spec,
68                                   std::move(child_scored_document_hits));
69     case QualifiedIdJoinIndex::Version::kV2:
70       return GetChildrenFetcherV2(join_spec,
71                                   std::move(child_scored_document_hits));
72     case QualifiedIdJoinIndex::Version::kV3:
73       return JoinChildrenFetcherImplV3::Create(
74           join_spec, schema_store_, doc_store_, qualified_id_join_index_,
75           current_time_ms_, std::move(child_scored_document_hits));
76   }
77 }
78 
79 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV1(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)80 JoinProcessor::GetChildrenFetcherV1(
81     const JoinSpecProto& join_spec,
82     std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
83   ScoredDocumentHitComparator score_comparator(
84       /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
85       ScoringSpecProto::Order::DESC);
86   std::sort(child_scored_document_hits.begin(),
87             child_scored_document_hits.end(), score_comparator);
88 
89   // Step 1: group child documents by parent documentId. Currently we only
90   //         support QualifiedId joining, so fetch the qualified id content of
91   //         child_property_expression, break it down into namespace + uri, and
92   //         lookup the DocumentId.
93   // The keys of this map are the DocumentIds of the parent docs the child
94   // ScoredDocumentHits refer to. The values in this map are vectors of child
95   // ScoredDocumentHits that refer to a parent DocumentId.
96   std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
97       map_joinable_qualified_id;
98   for (const ScoredDocumentHit& child : child_scored_document_hits) {
99     ICING_ASSIGN_OR_RETURN(
100         DocumentId ref_doc_id,
101         FetchReferencedQualifiedId(child.document_id(),
102                                    join_spec.child_property_expression()));
103     if (ref_doc_id == kInvalidDocumentId) {
104       continue;
105     }
106 
107     map_joinable_qualified_id[ref_doc_id].push_back(child);
108   }
109   return JoinChildrenFetcherImplDeprecated::Create(
110       join_spec, std::move(map_joinable_qualified_id));
111 }
112 
113 libtextclassifier3::StatusOr<std::unique_ptr<JoinChildrenFetcher>>
GetChildrenFetcherV2(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && child_scored_document_hits)114 JoinProcessor::GetChildrenFetcherV2(
115     const JoinSpecProto& join_spec,
116     std::vector<ScoredDocumentHit>&& child_scored_document_hits) {
117   // Step 1a: sort child ScoredDocumentHits in document id descending order.
118   std::sort(child_scored_document_hits.begin(),
119             child_scored_document_hits.end(),
120             [](const ScoredDocumentHit& lhs, const ScoredDocumentHit& rhs) {
121               return lhs.document_id() > rhs.document_id();
122             });
123 
124   // Step 1b: group all child ScoredDocumentHits by the document's
125   //          schema_type_id.
126   std::unordered_map<SchemaTypeId, std::vector<ScoredDocumentHit>>
127       schema_to_child_scored_doc_hits_map;
128   for (const ScoredDocumentHit& child_scored_document_hit :
129        child_scored_document_hits) {
130     std::optional<DocumentFilterData> child_doc_filter_data =
131         doc_store_->GetAliveDocumentFilterData(
132             child_scored_document_hit.document_id(), current_time_ms_);
133     if (!child_doc_filter_data) {
134       continue;
135     }
136 
137     schema_to_child_scored_doc_hits_map[child_doc_filter_data->schema_type_id()]
138         .push_back(child_scored_document_hit);
139   }
140 
141   // Step 1c: for each schema_type_id, lookup QualifiedIdJoinIndexImplV2 to
142   //          fetch all child join data from posting list(s). Convert all
143   //          child join data to referenced parent document ids and bucketize
144   //          child ScoredDocumentHits by it.
145   std::unordered_map<DocumentId, std::vector<ScoredDocumentHit>>
146       parent_to_child_docs_map;
147   for (auto& [schema_type_id, grouped_child_scored_doc_hits] :
148        schema_to_child_scored_doc_hits_map) {
149     // Get joinable_property_id of this schema.
150     ICING_ASSIGN_OR_RETURN(
151         const JoinablePropertyMetadata* metadata,
152         schema_store_->GetJoinablePropertyMetadata(
153             schema_type_id, join_spec.child_property_expression()));
154     if (metadata == nullptr ||
155         metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
156       // Currently we only support qualified id, so skip other types.
157       continue;
158     }
159 
160     // Lookup QualifiedIdJoinIndexImplV2.
161     ICING_ASSIGN_OR_RETURN(
162         std::unique_ptr<QualifiedIdJoinIndex::JoinDataIteratorBase>
163             join_index_iter,
164         qualified_id_join_index_->GetIterator(
165             schema_type_id, /*joinable_property_id=*/metadata->id));
166 
167     // - Join index contains all join data of schema_type_id and
168     //   join_index_iter will return all of them in (child) document id
169     //   descending order.
170     // - But we only need join data of child document ids which appear in
171     //   grouped_child_scored_doc_hits. Also grouped_child_scored_doc_hits
172     //   contain ScoredDocumentHits in (child) document id descending order.
173     // - Therefore, we advance 2 iterators to intersect them and get desired
174     //   join data.
175     auto child_scored_doc_hits_iter = grouped_child_scored_doc_hits.cbegin();
176     while (join_index_iter->Advance().ok() &&
177            child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend()) {
178       // Advance child_scored_doc_hits_iter until it points to a
179       // ScoredDocumentHit with document id <= the one pointed by
180       // join_index_iter.
181       while (child_scored_doc_hits_iter !=
182                  grouped_child_scored_doc_hits.cend() &&
183              child_scored_doc_hits_iter->document_id() >
184                  join_index_iter->GetCurrent().document_id()) {
185         ++child_scored_doc_hits_iter;
186       }
187 
188       if (child_scored_doc_hits_iter != grouped_child_scored_doc_hits.cend() &&
189           child_scored_doc_hits_iter->document_id() ==
190               join_index_iter->GetCurrent().document_id()) {
191         // We get a join data whose child document id exists in both join
192         // index and grouped_child_scored_doc_hits. Convert its join info to
193         // referenced parent document ids and bucketize ScoredDocumentHits by
194         // it (putting into parent_to_child_docs_map).
195         const NamespaceIdFingerprint& ref_doc_nsid_uri_fingerprint =
196             join_index_iter->GetCurrent().join_info();
197         libtextclassifier3::StatusOr<DocumentId> ref_parent_doc_id_or =
198             doc_store_->GetDocumentId(ref_doc_nsid_uri_fingerprint);
199         if (ref_parent_doc_id_or.ok()) {
200           parent_to_child_docs_map[std::move(ref_parent_doc_id_or).ValueOrDie()]
201               .push_back(*child_scored_doc_hits_iter);
202         }
203       }
204     }
205   }
206 
207   // Step 1d: finally, sort each parent's joined child ScoredDocumentHits by
208   //          score.
209   ScoredDocumentHitComparator score_comparator(
210       /*is_descending=*/join_spec.nested_spec().scoring_spec().order_by() ==
211       ScoringSpecProto::Order::DESC);
212   for (auto& [parent_doc_id, bucketized_child_scored_hits] :
213        parent_to_child_docs_map) {
214     std::sort(bucketized_child_scored_hits.begin(),
215               bucketized_child_scored_hits.end(), score_comparator);
216   }
217 
218   return JoinChildrenFetcherImplDeprecated::Create(
219       join_spec, std::move(parent_to_child_docs_map));
220 }
221 
222 libtextclassifier3::StatusOr<std::vector<JoinedScoredDocumentHit>>
Join(const JoinSpecProto & join_spec,std::vector<ScoredDocumentHit> && parent_scored_document_hits,const JoinChildrenFetcher & join_children_fetcher)223 JoinProcessor::Join(
224     const JoinSpecProto& join_spec,
225     std::vector<ScoredDocumentHit>&& parent_scored_document_hits,
226     const JoinChildrenFetcher& join_children_fetcher) {
227   std::unique_ptr<AggregationScorer> aggregation_scorer =
228       AggregationScorer::Create(join_spec);
229 
230   std::vector<JoinedScoredDocumentHit> joined_scored_document_hits;
231   joined_scored_document_hits.reserve(parent_scored_document_hits.size());
232 
233   // Step 2: iterate through all parent documentIds and construct
234   //         JoinedScoredDocumentHit for each by looking up
235   //         join_children_fetcher.
236   for (ScoredDocumentHit& parent : parent_scored_document_hits) {
237     ICING_ASSIGN_OR_RETURN(
238         std::vector<ScoredDocumentHit> children,
239         join_children_fetcher.GetChildren(parent.document_id()));
240 
241     double final_score = aggregation_scorer->GetScore(parent, children);
242     joined_scored_document_hits.emplace_back(final_score, std::move(parent),
243                                              std::move(children));
244   }
245 
246   return joined_scored_document_hits;
247 }
248 
249 libtextclassifier3::StatusOr<std::unordered_set<DocumentId>>
GetPropagatedChildDocumentsToDelete(const std::unordered_set<DocumentId> & deleted_document_ids)250 JoinProcessor::GetPropagatedChildDocumentsToDelete(
251     const std::unordered_set<DocumentId>& deleted_document_ids) {
252   // Sanity check: join index should be V3.
253   if (qualified_id_join_index_->version() !=
254       QualifiedIdJoinIndex::Version::kV3) {
255     return absl_ports::UnimplementedError(
256         "QualifiedIdJoinIndex version must be V3 to support delete "
257         "propagation.");
258   }
259 
260   // BFS traverse to find all child documents to propagate delete.
261   std::queue<DocumentId> que(
262       std::deque(deleted_document_ids.begin(), deleted_document_ids.end()));
263   std::unordered_set<DocumentId> child_documents_to_delete;
264   while (!que.empty()) {
265     DocumentId doc_id_to_expand = que.front();
266     que.pop();
267 
268     ICING_ASSIGN_OR_RETURN(std::vector<DocumentJoinIdPair> child_join_id_pairs,
269                            qualified_id_join_index_->Get(doc_id_to_expand));
270     for (const DocumentJoinIdPair& child_join_id_pair : child_join_id_pairs) {
271       if (child_documents_to_delete.find(child_join_id_pair.document_id()) !=
272               child_documents_to_delete.end() ||
273           deleted_document_ids.find(child_join_id_pair.document_id()) !=
274               deleted_document_ids.end()) {
275         // Already added into the set to delete or already deleted (happens only
276         // when there is a cycle back to the deleted or traversed document in
277         // the join relation). Skip it.
278         continue;
279       }
280 
281       // Get DocumentFilterData of the child document to look up its schema type
282       // id.
283       // - Skip if the child document has been deleted, since delete propagation
284       //   should've been done to all its children when deleting it previously.
285       // - Otherwise, we have to handle this child document and propagate delete
286       //   to the grandchildren, even if it is expired.
287       std::optional<DocumentFilterData> child_filter_data =
288           doc_store_->GetNonDeletedDocumentFilterData(
289               child_join_id_pair.document_id());
290       if (!child_filter_data) {
291         // The child document has been deleted. Skip.
292         continue;
293       }
294 
295       libtextclassifier3::StatusOr<const JoinablePropertyMetadata*>
296           metadata_or = schema_store_->GetJoinablePropertyMetadata(
297               child_filter_data->schema_type_id(),
298               child_join_id_pair.joinable_property_id());
299       if (!metadata_or.ok() || metadata_or.ValueOrDie() == nullptr) {
300         // This shouldn't happen because we've validated it during indexing and
301         // only put valid DocumentJoinIdPair into qualified id join index.
302         // Log and skip it.
303         ICING_LOG(ERROR) << "Failed to get metadata for schema type id "
304                          << child_filter_data->schema_type_id()
305                          << ", joinable property id "
306                          << static_cast<int>(
307                                 child_join_id_pair.joinable_property_id());
308         continue;
309       }
310       const JoinablePropertyMetadata* metadata = metadata_or.ValueOrDie();
311 
312       if (metadata->value_type == JoinableConfig::ValueType::QUALIFIED_ID &&
313           metadata->delete_propagation_type ==
314               JoinableConfig::DeletePropagationType::PROPAGATE_FROM) {
315         child_documents_to_delete.insert(child_join_id_pair.document_id());
316         que.push(child_join_id_pair.document_id());
317       }
318     }
319   }
320 
321   return child_documents_to_delete;
322 }
323 
324 libtextclassifier3::StatusOr<DocumentId>
FetchReferencedQualifiedId(const DocumentId & child_document_id,const std::string & property_path) const325 JoinProcessor::FetchReferencedQualifiedId(
326     const DocumentId& child_document_id,
327     const std::string& property_path) const {
328   std::optional<DocumentFilterData> child_filter_data =
329       doc_store_->GetAliveDocumentFilterData(child_document_id,
330                                              current_time_ms_);
331   if (!child_filter_data) {
332     return kInvalidDocumentId;
333   }
334 
335   ICING_ASSIGN_OR_RETURN(
336       const JoinablePropertyMetadata* metadata,
337       schema_store_->GetJoinablePropertyMetadata(
338           child_filter_data->schema_type_id(), property_path));
339   if (metadata == nullptr ||
340       metadata->value_type != JoinableConfig::ValueType::QUALIFIED_ID) {
341     // Currently we only support qualified id.
342     return kInvalidDocumentId;
343   }
344 
345   DocumentJoinIdPair info(child_document_id, metadata->id);
346   libtextclassifier3::StatusOr<std::string_view> ref_qualified_id_str_or =
347       qualified_id_join_index_->Get(info);
348   if (!ref_qualified_id_str_or.ok()) {
349     if (absl_ports::IsNotFound(ref_qualified_id_str_or.status())) {
350       return kInvalidDocumentId;
351     }
352     return std::move(ref_qualified_id_str_or).status();
353   }
354 
355   libtextclassifier3::StatusOr<QualifiedId> ref_qualified_id_or =
356       QualifiedId::Parse(std::move(ref_qualified_id_str_or).ValueOrDie());
357   if (!ref_qualified_id_or.ok()) {
358     // This shouldn't happen because we've validated it during indexing and only
359     // put valid qualified id strings into qualified id join index.
360     return kInvalidDocumentId;
361   }
362   QualifiedId qualified_id = std::move(ref_qualified_id_or).ValueOrDie();
363 
364   libtextclassifier3::StatusOr<DocumentId> ref_document_id_or =
365       doc_store_->GetDocumentId(qualified_id.name_space(), qualified_id.uri());
366   if (!ref_document_id_or.ok()) {
367     return kInvalidDocumentId;
368   }
369   return std::move(ref_document_id_or).ValueOrDie();
370 }
371 
372 }  // namespace lib
373 }  // namespace icing
374