1 /*
2  * Copyright 2019 Google LLC.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     https://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "private_join_and_compute/util/process_record_file_util.h"
17 
18 #include <gmock/gmock.h>
19 #include <gtest/gtest.h>
20 
21 #include <filesystem>
22 #include <memory>
23 #include <string>
24 #include <vector>
25 
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/string_view.h"
28 #include "private_join_and_compute/util/process_record_file_parameters.h"
29 #include "private_join_and_compute/util/proto_util.h"
30 #include "private_join_and_compute/util/status_testing.inc"
31 #include "private_join_and_compute/util/test.pb.h"
32 
33 namespace private_join_and_compute::util::process_file_util {
34 namespace {
35 
36 using proto::test::IntValueProto;
37 using proto::test::StringValueProto;
38 
__anonebe88f780202(IntValueProto proto) 39 auto record_transformer = [](IntValueProto proto) {
40   StringValueProto result;
41   result.set_prefix(proto.prefix());
42   result.set_value(std::to_string(proto.value()).append("_bla"));
43   return result;
44 };
45 
writeValues(absl::string_view input_file)46 void writeValues(absl::string_view input_file) {
47   IntValueProto v1;
48   v1.set_prefix(1);
49   v1.set_value(9);
50   IntValueProto v2;
51   v2.set_prefix(2);
52   v2.set_value(4);
53   IntValueProto v3;
54   v3.set_prefix(3);
55   v3.set_value(7);
56   auto writer = std::unique_ptr<RecordWriter>(RecordWriter::Get());
57   ASSERT_OK(writer->Open(input_file));
58   ASSERT_OK(writer->Write(ProtoUtils::ToString(v2)));
59   ASSERT_OK(writer->Write(ProtoUtils::ToString(v1)));
60   ASSERT_OK(writer->Write(ProtoUtils::ToString(v3)));
61   ASSERT_OK(writer->Close());
62 }
63 
TEST(ProcessRecordFileTest,FileDoesNotExist)64 TEST(ProcessRecordFileTest, FileDoesNotExist) {
65   ProcessRecordFileParameters params;
66   std::filesystem::path temp_dir(::testing::TempDir());
67   std::string input_file = (temp_dir / "input_1.proto").string();
68   std::string output_file = (temp_dir / "output_1.proto").string();
69 
70   auto status =
71       process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
72           record_transformer, params, input_file, output_file);
73 
74   EXPECT_FALSE(status.ok());
75   EXPECT_EQ(status.code(), absl::StatusCode::kNotFound);
76 }
77 
TEST(ProcessRecordFileTest,TestProcessesFile)78 TEST(ProcessRecordFileTest, TestProcessesFile) {
79   ProcessRecordFileParameters params;
80   params.data_chunk_size = 2;
81   params.thread_count = 2;
82   std::filesystem::path temp_dir(::testing::TempDir());
83   std::string input_file = (temp_dir / "input_2.proto").string();
84   std::string output_file = (temp_dir / "output_2.proto").string();
85 
86   writeValues(input_file);
87 
88   auto status =
89       process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
90           record_transformer, params, input_file, output_file);
91   ASSERT_OK(status);
92 
93   ASSERT_TRUE(std::filesystem::exists(output_file));
94   // Check intermediate file was deleted.
95   ASSERT_FALSE(std::filesystem::exists(output_file + "0"));
96 
97   auto reader = std::unique_ptr<RecordReader>(RecordReader::GetRecordReader());
98   ASSERT_OK(reader->Open(output_file));
99 
100   StringValueProto s1;
101   s1.set_prefix(1);
102   s1.set_value("9_bla");
103   StringValueProto s2;
104   s2.set_prefix(2);
105   s2.set_value("4_bla");
106   StringValueProto s3;
107   s3.set_prefix(3);
108   s3.set_value("7_bla");
109   std::vector<std::string> expected_result{ProtoUtils::ToString(s1),
110                                            ProtoUtils::ToString(s2),
111                                            ProtoUtils::ToString(s3)};
112 
113   std::vector<std::string> actual_result;
114   while (reader->HasMore().value()) {
115     std::string raw_record;
116     ASSERT_OK(reader->Read(&raw_record));
117     actual_result.push_back(raw_record);
118   }
119   EXPECT_OK(reader->Close());
120   ASSERT_EQ(expected_result, actual_result);
121 
122   // Remove all files.
123   std::filesystem::remove(input_file);
124   std::filesystem::remove(output_file);
125 }
126 
TEST(ProcessRecordFileTest,TestCustomSortKey)127 TEST(ProcessRecordFileTest, TestCustomSortKey) {
128   ProcessRecordFileParameters params;
129   params.data_chunk_size = 1;
130   params.thread_count = 1;
131   std::filesystem::path temp_dir(::testing::TempDir());
132   std::string input_file = (temp_dir / "input_3.proto").string();
133   std::string output_file = (temp_dir / "output_3.proto").string();
134 
135   writeValues(input_file);
136 
137   auto get_sorting_key_function = [](absl::string_view raw_record) {
138     return ProtoUtils::FromString<StringValueProto>(raw_record).value();
139   };
140   auto status =
141       process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
142           record_transformer, params, input_file, output_file,
143           get_sorting_key_function);
144   ASSERT_OK(status);
145 
146   ASSERT_TRUE(std::filesystem::exists(output_file));
147 
148   StringValueProto s1;
149   s1.set_prefix(1);
150   s1.set_value("9_bla");
151   StringValueProto s2;
152   s2.set_prefix(2);
153   s2.set_value("4_bla");
154   StringValueProto s3;
155   s3.set_prefix(3);
156   s3.set_value("7_bla");
157   std::vector<std::string> expected_result{ProtoUtils::ToString(s2),
158                                            ProtoUtils::ToString(s3),
159                                            ProtoUtils::ToString(s1)};
160 
161   auto reader = std::unique_ptr<RecordReader>(RecordReader::GetRecordReader());
162   ASSERT_OK(reader->Open(output_file));
163   std::vector<std::string> actual_result;
164   while (reader->HasMore().value()) {
165     std::string raw_record;
166     ASSERT_OK(reader->Read(&raw_record));
167     actual_result.push_back(raw_record);
168   }
169   EXPECT_OK(reader->Close());
170   ASSERT_EQ(expected_result, actual_result);
171 
172   // Remove all files.
173   std::filesystem::remove(input_file);
174   std::filesystem::remove(output_file);
175 }
176 
177 }  // namespace
178 }  // namespace private_join_and_compute::util::process_file_util
179