1 /*
2 * Copyright 2019 Google LLC.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * https://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "private_join_and_compute/util/process_record_file_util.h"
17
18 #include <gmock/gmock.h>
19 #include <gtest/gtest.h>
20
21 #include <filesystem>
22 #include <memory>
23 #include <string>
24 #include <vector>
25
26 #include "absl/strings/str_cat.h"
27 #include "absl/strings/string_view.h"
28 #include "private_join_and_compute/util/process_record_file_parameters.h"
29 #include "private_join_and_compute/util/proto_util.h"
30 #include "private_join_and_compute/util/status_testing.inc"
31 #include "private_join_and_compute/util/test.pb.h"
32
33 namespace private_join_and_compute::util::process_file_util {
34 namespace {
35
36 using proto::test::IntValueProto;
37 using proto::test::StringValueProto;
38
__anonebe88f780202(IntValueProto proto) 39 auto record_transformer = [](IntValueProto proto) {
40 StringValueProto result;
41 result.set_prefix(proto.prefix());
42 result.set_value(std::to_string(proto.value()).append("_bla"));
43 return result;
44 };
45
writeValues(absl::string_view input_file)46 void writeValues(absl::string_view input_file) {
47 IntValueProto v1;
48 v1.set_prefix(1);
49 v1.set_value(9);
50 IntValueProto v2;
51 v2.set_prefix(2);
52 v2.set_value(4);
53 IntValueProto v3;
54 v3.set_prefix(3);
55 v3.set_value(7);
56 auto writer = std::unique_ptr<RecordWriter>(RecordWriter::Get());
57 ASSERT_OK(writer->Open(input_file));
58 ASSERT_OK(writer->Write(ProtoUtils::ToString(v2)));
59 ASSERT_OK(writer->Write(ProtoUtils::ToString(v1)));
60 ASSERT_OK(writer->Write(ProtoUtils::ToString(v3)));
61 ASSERT_OK(writer->Close());
62 }
63
TEST(ProcessRecordFileTest,FileDoesNotExist)64 TEST(ProcessRecordFileTest, FileDoesNotExist) {
65 ProcessRecordFileParameters params;
66 std::filesystem::path temp_dir(::testing::TempDir());
67 std::string input_file = (temp_dir / "input_1.proto").string();
68 std::string output_file = (temp_dir / "output_1.proto").string();
69
70 auto status =
71 process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
72 record_transformer, params, input_file, output_file);
73
74 EXPECT_FALSE(status.ok());
75 EXPECT_EQ(status.code(), absl::StatusCode::kNotFound);
76 }
77
TEST(ProcessRecordFileTest,TestProcessesFile)78 TEST(ProcessRecordFileTest, TestProcessesFile) {
79 ProcessRecordFileParameters params;
80 params.data_chunk_size = 2;
81 params.thread_count = 2;
82 std::filesystem::path temp_dir(::testing::TempDir());
83 std::string input_file = (temp_dir / "input_2.proto").string();
84 std::string output_file = (temp_dir / "output_2.proto").string();
85
86 writeValues(input_file);
87
88 auto status =
89 process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
90 record_transformer, params, input_file, output_file);
91 ASSERT_OK(status);
92
93 ASSERT_TRUE(std::filesystem::exists(output_file));
94 // Check intermediate file was deleted.
95 ASSERT_FALSE(std::filesystem::exists(output_file + "0"));
96
97 auto reader = std::unique_ptr<RecordReader>(RecordReader::GetRecordReader());
98 ASSERT_OK(reader->Open(output_file));
99
100 StringValueProto s1;
101 s1.set_prefix(1);
102 s1.set_value("9_bla");
103 StringValueProto s2;
104 s2.set_prefix(2);
105 s2.set_value("4_bla");
106 StringValueProto s3;
107 s3.set_prefix(3);
108 s3.set_value("7_bla");
109 std::vector<std::string> expected_result{ProtoUtils::ToString(s1),
110 ProtoUtils::ToString(s2),
111 ProtoUtils::ToString(s3)};
112
113 std::vector<std::string> actual_result;
114 while (reader->HasMore().value()) {
115 std::string raw_record;
116 ASSERT_OK(reader->Read(&raw_record));
117 actual_result.push_back(raw_record);
118 }
119 EXPECT_OK(reader->Close());
120 ASSERT_EQ(expected_result, actual_result);
121
122 // Remove all files.
123 std::filesystem::remove(input_file);
124 std::filesystem::remove(output_file);
125 }
126
TEST(ProcessRecordFileTest,TestCustomSortKey)127 TEST(ProcessRecordFileTest, TestCustomSortKey) {
128 ProcessRecordFileParameters params;
129 params.data_chunk_size = 1;
130 params.thread_count = 1;
131 std::filesystem::path temp_dir(::testing::TempDir());
132 std::string input_file = (temp_dir / "input_3.proto").string();
133 std::string output_file = (temp_dir / "output_3.proto").string();
134
135 writeValues(input_file);
136
137 auto get_sorting_key_function = [](absl::string_view raw_record) {
138 return ProtoUtils::FromString<StringValueProto>(raw_record).value();
139 };
140 auto status =
141 process_file_util::ProcessRecordFile<IntValueProto, StringValueProto>(
142 record_transformer, params, input_file, output_file,
143 get_sorting_key_function);
144 ASSERT_OK(status);
145
146 ASSERT_TRUE(std::filesystem::exists(output_file));
147
148 StringValueProto s1;
149 s1.set_prefix(1);
150 s1.set_value("9_bla");
151 StringValueProto s2;
152 s2.set_prefix(2);
153 s2.set_value("4_bla");
154 StringValueProto s3;
155 s3.set_prefix(3);
156 s3.set_value("7_bla");
157 std::vector<std::string> expected_result{ProtoUtils::ToString(s2),
158 ProtoUtils::ToString(s3),
159 ProtoUtils::ToString(s1)};
160
161 auto reader = std::unique_ptr<RecordReader>(RecordReader::GetRecordReader());
162 ASSERT_OK(reader->Open(output_file));
163 std::vector<std::string> actual_result;
164 while (reader->HasMore().value()) {
165 std::string raw_record;
166 ASSERT_OK(reader->Read(&raw_record));
167 actual_result.push_back(raw_record);
168 }
169 EXPECT_OK(reader->Close());
170 ASSERT_EQ(expected_result, actual_result);
171
172 // Remove all files.
173 std::filesystem::remove(input_file);
174 std::filesystem::remove(output_file);
175 }
176
177 } // namespace
178 } // namespace private_join_and_compute::util::process_file_util
179