122 lines
3.6 KiB
C++
122 lines
3.6 KiB
C++
/*
|
|
* Copyright 2021 Google LLC
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* https://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <algorithm>
|
|
#include <cmath>
|
|
#include <cstdint>
|
|
#include <memory>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "random_generator.h"
|
|
#include "sampler.h"
|
|
|
|
namespace dist_proc {
|
|
namespace aggregation {
|
|
namespace internal {
|
|
|
|
class KllSampler;
|
|
|
|
// Hierarchy of compactors, which store items from the stream and 'compact'
|
|
// them when necessary (i.e., keep every second item in a sorted compactor)
|
|
// and add them to the compactor one level up.
|
|
class CompactorStack {
|
|
public:
|
|
CompactorStack(int64_t inv_eps, int64_t inv_delta, RandomGenerator* random);
|
|
CompactorStack(int64_t inv_eps, int64_t inv_delta, int k, RandomGenerator* random);
|
|
~CompactorStack();
|
|
|
|
// Initialize or reset the compactor stack and all counters and thresholds.
|
|
void Reset();
|
|
|
|
void Add(const int64_t value);
|
|
|
|
// Adds an item to the compactor stack with weight >= 1.
|
|
// Does nothing if weight <= 0.
|
|
void AddWithWeight(int64_t value, int weight);
|
|
|
|
// Ensures that the contents of each compactor are sorted.
|
|
void SortCompactorContents();
|
|
|
|
// Target capacity of compactor with index h. If this capacity is exceeded,
|
|
// the compactor will be lazily compacted in one of the next CompactStack()
|
|
// runs. I.e., this capacity can be temorarily exceeded.
|
|
int TargetCapacityAtLevel(int h) const;
|
|
|
|
void DoubleSamplerCapacity();
|
|
|
|
int num_stored_items() const;
|
|
|
|
std::optional<std::pair<const int64_t, int64_t>> sampled_item_and_weight() const;
|
|
|
|
// Returns the lowest active level in the compactor stack, which is identical
|
|
// with the number of replaced levels, or log2(sampler_capacity()).
|
|
int lowest_active_level() const;
|
|
|
|
int64_t sampler_capacity() const;
|
|
|
|
// For testing
|
|
bool IsSamplerOn() const {
|
|
return sampler_ != nullptr;
|
|
}
|
|
|
|
const std::vector<std::vector<int64_t>>& compactors() const {
|
|
return compactors_;
|
|
}
|
|
|
|
RandomGenerator* random() {
|
|
return random_;
|
|
}
|
|
|
|
int k() const {
|
|
return k_;
|
|
}
|
|
|
|
private:
|
|
void ClearCompactors();
|
|
|
|
// Adds a new compactor at the highest level. To be called when the currently
|
|
// topmost compactor is full.
|
|
void AddLevel();
|
|
|
|
// Called when at least one level in the compactor stack is above capacity.
|
|
// Iterates from bottom to top through the compactors and compacts the
|
|
// first one that is over its capacity by halving its contents and adding
|
|
// them to the compactor one level higher.
|
|
void CompactStack();
|
|
|
|
void CompactLevel(int level);
|
|
|
|
// To compact the items in a compactor to roughly half the size,
|
|
// sorts the items and adds every even or odd item (determined randomly)
|
|
// to the up_compactor.
|
|
void Halve(std::vector<int64_t>* down_compactor, std::vector<int64_t>* up_compactor);
|
|
|
|
std::vector<std::vector<int64_t>> compactors_;
|
|
int k_;
|
|
const double c_ = 2.0 / 3.0;
|
|
int overall_capacity_;
|
|
int num_items_in_compactors_;
|
|
RandomGenerator* random_;
|
|
std::unique_ptr<KllSampler> sampler_;
|
|
};
|
|
|
|
} // namespace internal
|
|
} // namespace aggregation
|
|
} // namespace dist_proc
|