Bitcoin Core 31.99.0
P2P Digital Currency
dbwrapper.cpp
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <dbwrapper.h>
6#include <compat/byteswap.h>
7#include <random.h>
8#include <sync.h>
10#include <test/fuzz/fuzz.h>
11#include <test/fuzz/util.h>
12#include <test/util/random.h>
14#include <util/byte_units.h>
15#include <util/check.h>
16#include <util/threadpool.h>
17
18#include <leveldb/env.h>
19#include <leveldb/helpers/memenv/memenv.h>
20
21#include <algorithm>
22#include <cassert>
23#include <cstdint>
24#include <deque>
25#include <functional>
26#include <future>
27#include <latch>
28#include <map>
29#include <memory>
30#include <numeric>
31#include <optional>
32#include <set>
33#include <string>
34#include <tuple>
35#include <vector>
36
37namespace {
38
59class DeterministicEnv final : public leveldb::EnvWrapper
60{
61 using WorkFunction = void (*)(void*);
62
63 struct Work {
64 WorkFunction function;
65 void* arg;
66 };
67
68 Mutex m_mutex;
69 std::deque<Work> m_queue GUARDED_BY(m_mutex);
70
71public:
72 explicit DeterministicEnv(leveldb::Env* base) : EnvWrapper(base) {}
73
74 void Schedule(WorkFunction function, void* arg) override EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
75 {
76 LOCK(m_mutex);
77 m_queue.push_back({function, arg});
78 }
79
82 bool RunOne() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
83 {
84 Work work;
85 {
86 LOCK(m_mutex);
87 if (m_queue.empty()) return false;
88 work = m_queue.front();
89 m_queue.pop_front();
90 }
91 work.function(work.arg);
92 return true;
93 }
94
96 void DrainWork() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { while (RunOne()) {} }
97};
98
99constexpr size_t MAX_VALUE_LEN{4096};
100constexpr uint8_t MAX_VALUE_MULTIPLIER{8};
101constexpr size_t WRITE_BATCH_HEADER{12}; // See kHeader in db/write_batch.cc
102
105const std::string OBFUSCATION_KEY{"\000obfuscate_key", 14};
106
110std::vector<uint8_t> MakeValue(uint16_t key, uint32_t size)
111{
112 std::vector<uint8_t> v(size);
113 std::iota(v.begin(), v.end(), static_cast<uint8_t>(key ^ (key >> 8)));
114 return v;
115}
116
119struct LevelDBBytewiseU16Cmp {
120 bool operator()(uint16_t a, uint16_t b) const { return internal_bswap_16(a) < internal_bswap_16(b); }
121};
122
124using Oracle = std::map<uint16_t, uint32_t, LevelDBBytewiseU16Cmp>;
125
126struct FailUnserialize {
127 template <typename Stream>
128 void Unserialize(Stream&) { throw std::ios_base::failure{"always fail"}; }
129};
130
131uint16_t ConsumeKey(FuzzedDataProvider& provider) { return provider.ConsumeIntegral<uint16_t>(); }
132uint32_t ConsumeValueSize(FuzzedDataProvider& provider)
133{
134 const uint16_t len{provider.ConsumeIntegralInRange<uint16_t>(0, MAX_VALUE_LEN)};
135 const uint8_t multiplier{provider.ConsumeIntegralInRange<uint8_t>(1, MAX_VALUE_MULTIPLIER)};
136 return static_cast<uint32_t>(len) * multiplier;
137}
138
141void VerifyIterator(CDBWrapper& dbw, const Oracle& oracle,
142 bool obfuscate, std::optional<uint16_t> seek_key = std::nullopt)
143{
144 const std::unique_ptr<CDBIterator> it{dbw.NewIterator()};
145 auto oracle_it{seek_key ? oracle.lower_bound(*seek_key) : oracle.begin()};
146 if (seek_key) {
147 it->Seek(*seek_key);
148 } else {
149 it->SeekToFirst();
150 }
151 for (; it->Valid(); it->Next()) {
152 uint16_t db_key;
153 assert(it->GetKey(db_key));
154 if (oracle_it != oracle.end() && db_key == oracle_it->first) {
155 std::vector<uint8_t> db_value;
156 assert(it->GetValue(db_value));
157 assert(db_value == MakeValue(db_key, oracle_it->second));
158 ++oracle_it;
159 } else {
160 assert(obfuscate);
161 std::string key_str;
162 assert(it->GetKey(key_str));
163 assert(key_str == OBFUSCATION_KEY);
164 }
165 }
166 assert(oracle_it == oracle.end());
167}
168
170constexpr size_t MAX_READ_WORKERS{8};
171
172ThreadPool g_read_pool{"dbfuzz"};
173Mutex g_read_pool_mutex;
174
175void StartReadPoolIfNeeded() EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex)
176{
177 LOCK(g_read_pool_mutex);
178 if (!g_read_pool.WorkersCount()) g_read_pool.Start(MAX_READ_WORKERS);
179}
180
182DBParams ConsumeDBParams(FuzzedDataProvider& provider, leveldb::Env* testing_env,
183 bool obfuscate, DBOptions options = {})
184{
185 return DBParams{
186 .path = "dbwrapper_fuzz",
187 .cache_bytes = provider.ConsumeIntegralInRange<size_t>(64 << 10, 1_MiB),
188 .obfuscate = obfuscate,
189 .options = options,
190 .testing_env = testing_env,
191 .max_file_size = provider.ConsumeBool()
193 : provider.ConsumeIntegralInRange<size_t>(1_MiB, 4_MiB),
194 };
195}
196
197template <typename DrainWorkFn, typename RunOneFn>
198void TestDbWrapper(FuzzedDataProvider& provider,
199 leveldb::Env* testing_env,
200 DrainWorkFn drain_work,
201 RunOneFn run_one,
202 bool allow_force_compact)
203{
205
206 const bool obfuscate{provider.ConsumeBool()};
207
208 const auto make_db{[&](DBOptions options = {}) {
209 return std::make_unique<CDBWrapper>(ConsumeDBParams(provider, testing_env, obfuscate, options));
210 }};
211 std::unique_ptr<CDBWrapper> dbw{make_db()};
212
213 // Oracle: key → value size. Content is reconstructed via MakeValue().
214 Oracle oracle;
215
216 LIMITED_WHILE(provider.ConsumeBool(), 1'000)
217 {
218 CallOneOf(
219 provider,
220 // --- Mutations ---
221 [&] {
222 const auto key{ConsumeKey(provider)};
223 const auto size{ConsumeValueSize(provider)};
224 drain_work();
225 dbw->Write(key, MakeValue(key, size), /*fSync=*/provider.ConsumeBool());
226 oracle[key] = size;
227 },
228 [&] {
229 const auto key{ConsumeKey(provider)};
230 drain_work();
231 dbw->Erase(key, /*fSync=*/provider.ConsumeBool());
232 oracle.erase(key);
233 },
234 [&] {
235 CDBBatch batch{*dbw};
236 std::map<uint16_t, uint32_t> batch_writes;
237 std::set<uint16_t> batch_erases;
238 const auto fill{[&] {
239 LIMITED_WHILE(provider.ConsumeBool(), 20)
240 {
241 const auto key{ConsumeKey(provider)};
242 if (provider.ConsumeBool()) {
243 const auto size{ConsumeValueSize(provider)};
244 batch.Write(key, MakeValue(key, size));
245 batch_writes[key] = size;
246 batch_erases.erase(key);
247 } else {
248 batch.Erase(key);
249 batch_erases.insert(key);
250 batch_writes.erase(key);
251 }
252 }
253 }};
254 fill();
255 if (provider.ConsumeBool()) {
256 assert(batch.ApproximateSize() >= WRITE_BATCH_HEADER);
257 batch.Clear();
258 assert(batch.ApproximateSize() == WRITE_BATCH_HEADER);
259 batch_writes.clear();
260 batch_erases.clear();
261 fill();
262 }
263 drain_work();
264 dbw->WriteBatch(batch, /*fSync=*/provider.ConsumeBool());
265 for (const auto& [k, v] : batch_writes) oracle[k] = v;
266 for (const auto& k : batch_erases) oracle.erase(k);
267 },
268 [&] {
269 drain_work();
270 dbw.reset();
271 DBOptions options{};
272 if (allow_force_compact && provider.ConsumeBool()) {
273 options.force_compact = true;
274 }
275 dbw = make_db(options);
276 VerifyIterator(*dbw, oracle, obfuscate);
277 },
278 // --- Reads ---
279 [&] {
280 const auto key{ConsumeKey(provider)};
281 std::vector<uint8_t> value;
282 const bool found{dbw->Read(key, value)};
283 if (const auto it{oracle.find(key)}; it != oracle.end()) {
284 assert(found && value == MakeValue(key, it->second));
285 } else {
286 assert(!found);
287 }
288 },
289 [&] {
290 const auto key{ConsumeKey(provider)};
291 assert(dbw->Exists(key) == oracle.contains(key));
292 },
293 [&] {
294 uint16_t key{};
295 if (!oracle.empty() && provider.ConsumeBool()) {
296 auto it{oracle.begin()};
297 std::advance(it, provider.ConsumeIntegralInRange<size_t>(0, oracle.size() - 1));
298 key = it->first;
299 } else {
300 key = ConsumeKey(provider);
301 }
302 FailUnserialize wrong_type;
303 assert(!dbw->Read(key, wrong_type));
304 },
305 [&] {
306 const auto seek_key{provider.ConsumeBool()
307 ? std::optional<uint16_t>{ConsumeKey(provider)}
308 : std::nullopt};
309 VerifyIterator(*dbw, oracle, obfuscate, seek_key);
310 },
311 // --- Stats ---
312 [&] {
313 assert(dbw->IsEmpty() == (oracle.empty() && !obfuscate));
314 },
315 [&] {
316 const auto [k1, k2]{std::minmax({ConsumeKey(provider), ConsumeKey(provider)}, LevelDBBytewiseU16Cmp{})};
317 const size_t estimate_size{dbw->EstimateSize(k1, k2)};
318 if (k1 == k2) assert(estimate_size == 0);
319 },
320 [&] {
321 (void)dbw->DynamicMemoryUsage();
322 },
323 // --- Compaction control (no-op when run_one is no-op) ---
324 [&] {
325 run_one();
326 });
327 }
328
329 VerifyIterator(*dbw, oracle, obfuscate);
330 drain_work();
331}
332
333} // namespace
334
335FUZZ_TARGET(dbwrapper, .init = [] { static auto setup{MakeNoLogFileContext<>()}; })
336{
337 FuzzedDataProvider provider{buffer.data(), buffer.size()};
338
339 const auto memenv{std::unique_ptr<leveldb::Env>{leveldb::NewMemEnv(leveldb::Env::Default())}};
340 DeterministicEnv det_env{memenv.get()};
342 provider, &det_env,
343 [&] { det_env.DrainWork(); },
344 [&] { return det_env.RunOne(); },
345 /*allow_force_compact=*/false);
346}
347
348FUZZ_TARGET(dbwrapper_threaded, .init = [] { static auto setup{MakeNoLogFileContext<>()}; })
349{
350 FuzzedDataProvider provider{buffer.data(), buffer.size()};
351
352 const auto memenv{std::unique_ptr<leveldb::Env>{leveldb::NewMemEnv(leveldb::Env::Default())}};
354 provider, memenv.get(),
355 /*drain_work=*/[] {},
356 /*run_one=*/[] { return false; },
357 /*allow_force_compact=*/true);
358}
359
360FUZZ_TARGET(dbwrapper_concurrent_reads, .init = [] { static auto setup{MakeNoLogFileContext<>()}; }) EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex)
361{
362 StartReadPoolIfNeeded();
364
365 FuzzedDataProvider provider{buffer.data(), buffer.size()};
366
367 const auto memenv{std::unique_ptr<leveldb::Env>{leveldb::NewMemEnv(leveldb::Env::Default())}};
368 DeterministicEnv det_env{memenv.get()};
369
370 CDBWrapper db{ConsumeDBParams(provider, &det_env, /*obfuscate=*/provider.ConsumeBool())};
371
372 // Seed the DB. Drain work after small batches so we don't deadlock on a
373 // scheduled compaction.
374 const size_t num_entries{provider.ConsumeIntegralInRange<size_t>(100, 3'000)};
375 std::vector<uint16_t> keys;
376 keys.reserve(num_entries);
377 Oracle oracle;
378 constexpr size_t SEED_BATCH_SIZE{400};
379 for (size_t start{0}; start < num_entries; start += SEED_BATCH_SIZE) {
380 CDBBatch batch{db};
381 const size_t end{std::min(start + SEED_BATCH_SIZE, num_entries)};
382 for (size_t i{start}; i < end; ++i) {
383 const auto k{ConsumeKey(provider)};
384 const auto size{ConsumeValueSize(provider)};
385 batch.Write(k, MakeValue(k, size));
386 keys.push_back(k);
387 oracle[k] = size;
388 }
389 det_env.DrainWork();
390 db.WriteBatch(batch, /*fSync=*/true);
391 }
392
393 while (provider.ConsumeBool() && det_env.RunOne()) {}
394
395 // Build query list from seeded and random keys.
396 const size_t num_queries{provider.ConsumeIntegralInRange<size_t>(1, 2'000)};
397 enum class ReadOp { Read, Exists, IteratorSeek };
398 std::vector<std::tuple<ReadOp, uint16_t>> queries;
399 queries.reserve(num_queries);
400 for (size_t i{0}; i < num_queries; ++i) {
401 const auto op{provider.PickValueInArray({ReadOp::Read, ReadOp::Exists, ReadOp::IteratorSeek})};
402 const uint16_t key{provider.ConsumeBool()
403 ? keys[provider.ConsumeIntegralInRange<size_t>(0, keys.size() - 1)]
404 : ConsumeKey(provider)};
405 queries.emplace_back(op, key);
406 }
407
408
409 // Workers + main thread synchronize on the latch so all reads start together.
410 std::latch start_latch{static_cast<ptrdiff_t>(MAX_READ_WORKERS + 1)};
411 std::vector<std::function<void()>> tasks(MAX_READ_WORKERS);
412 FastRandomContext rng{ConsumeUInt256(provider)};
413 std::ranges::generate(tasks, [&] {
414 return [&, seed = rng.rand256()] {
415 FastRandomContext thread_rng{seed};
416 std::vector<size_t> order(queries.size());
417 std::iota(order.begin(), order.end(), size_t{0});
418 std::ranges::shuffle(order, thread_rng);
419 std::vector<uint8_t> v;
420 std::string key_str;
421 start_latch.arrive_and_wait();
422 const std::unique_ptr<CDBIterator> it{db.NewIterator()};
423 // Every read must agree with the oracle, the source of truth.
424 for (const auto i : order) {
425 const auto& [op, key] = queries[i];
426 switch (op) {
427 case ReadOp::Read:
428 if (const auto oit{oracle.find(key)}; oit != oracle.end()) {
429 assert(db.Read(key, v) && v == MakeValue(key, oit->second));
430 } else {
431 assert(!db.Read(key, v));
432 }
433 break;
434 case ReadOp::Exists:
435 assert(db.Exists(key) == oracle.contains(key));
436 break;
437 case ReadOp::IteratorSeek:
438 it->Seek(key);
439 // Skip the obfuscation metadata entry (a non-uint16_t key) if we land
440 // on it, so the result matches the oracle, which only tracks user keys.
441 if (it->Valid() && it->GetKey(key_str) && key_str == OBFUSCATION_KEY) it->Next();
442 if (const auto oit{oracle.lower_bound(key)}; oit != oracle.end()) {
443 assert(it->Valid());
444 uint16_t actual_key;
445 assert(it->GetKey(actual_key) && actual_key == oit->first);
446 assert(it->GetValue(v) && v == MakeValue(actual_key, oit->second));
447 } else {
448 assert(!it->Valid());
449 }
450 break;
451 }
452 }
453 };
454 });
455 auto futures{*Assert(g_read_pool.Submit(std::move(tasks)))};
456
457 // Release the workers and immediately run the queued compaction on this
458 // thread, so compaction races against the concurrent reads.
459 start_latch.arrive_and_wait();
460 det_env.DrainWork();
461
462 for (auto& fut : futures) fut.get();
463 det_env.DrainWork();
464}
BSWAP_CONSTEXPR uint16_t internal_bswap_16(uint16_t x)
Definition: byteswap.h:44
#define Assert(val)
Identity function.
Definition: check.h:116
Batch of changes queued to be written to a CDBWrapper.
Definition: dbwrapper.h:83
void Seek(const K &key)
Definition: dbwrapper.h:153
size_t DynamicMemoryUsage() const
Definition: dbwrapper.cpp:304
bool Read(const K &key, V &value) const
Definition: dbwrapper.h:215
CDBIterator * NewIterator()
Definition: dbwrapper.cpp:373
bool Exists(const K &key) const
Definition: dbwrapper.h:243
void Erase(const K &key, bool fSync=false)
Definition: dbwrapper.h:252
void WriteBatch(CDBBatch &batch, bool fSync=false)
Definition: dbwrapper.cpp:288
void Write(const K &key, const V &value, bool fSync=false)
Definition: dbwrapper.h:235
bool IsEmpty()
Return true if the database managed by this class contains no entries.
Definition: dbwrapper.cpp:354
size_t EstimateSize(const K &key_begin, const K &key_end) const
Definition: dbwrapper.h:272
Fast randomness source.
Definition: random.h:386
T ConsumeIntegralInRange(T min, T max)
T PickValueInArray(const T(&array)[size])
Fixed-size thread pool for running arbitrary tasks concurrently.
Definition: threadpool.h:48
static const size_t DBWRAPPER_MAX_FILE_SIZE
Definition: dbwrapper.h:29
#define LIMITED_WHILE(condition, limit)
Can be used to limit a theoretically unbounded loop.
Definition: fuzz.h:22
Definition: basic.cpp:8
static RPCMethod generate()
Definition: mining.cpp:288
void Unserialize(Stream &, V)=delete
User-controlled performance and debug options.
Definition: dbwrapper.h:32
bool force_compact
Compact database on startup.
Definition: dbwrapper.h:34
Application-specific storage settings.
Definition: dbwrapper.h:38
fs::path path
Location in the filesystem where leveldb data will be stored.
Definition: dbwrapper.h:40
#define LOCK(cs)
Definition: sync.h:268
FUZZ_TARGET(dbwrapper_concurrent_reads,.init=[] { static auto setup{MakeNoLogFileContext<>()};}) EXCLUSIVE_LOCKS_REQUIRED(!g_read_pool_mutex)
Definition: dbwrapper.cpp:360
TestDbWrapper(provider, &det_env, [&] { det_env.DrainWork();}, [&] { return det_env.RunOne();}, false)
DeterministicEnv det_env
Definition: dbwrapper.cpp:340
const auto memenv
Definition: dbwrapper.cpp:339
uint256 ConsumeUInt256(FuzzedDataProvider &fuzzed_data_provider) noexcept
Definition: util.h:191
size_t CallOneOf(FuzzedDataProvider &fuzzed_data_provider, Callables... callables)
Definition: util.h:37
void SeedRandomStateForTest(SeedRand seedtype)
Seed the global RNG state for testing and log the seed value.
Definition: random.cpp:19
@ ZEROS
Seed with a compile time constant of zeros.
static int setup(void)
Definition: tests.c:8040
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
#define GUARDED_BY(x)
Definition: threadsafety.h:37
assert(!tx.IsCoinBase())