pallet_price_aggregator/lib.rs
1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! # Price Aggregator Pallet
20//!
21//! ## Overview
22//!
23//! Purpose of this pallet is to aggregate price data over some time, and then calculate the moving average.
24//!
25//! ## Solution
26//!
27//! The overall solution is broken down into several steps that occur over the course of various time periods.
28//!
29//! ### Block Aggregation
30//!
31//! During each block, the native currency price data is accumulated. This is done 'outside' the pallet, and it's only expected
32//! that 'something' will push this data to the price aggregator pallet. The pallet itself doesn't care about the source of the data, nor who submitted it.
33//!
34//! At the end of each block, accumulated data is processed according to the specified algorithm (e.g. can be average, median, or something else).
35//! In case processing was successful, the result is stored in the intermediate value aggregator.
36//! In case processing fails, value is simply ignored.
37//!
38//! ### Intermediate Value Aggregation
39//!
40//! After a predetermined amount of time (blocks) has passed, the average value is calculated from the intermediate value aggregator.
41//! In case it's a valid value (non-zero), it's pushed into the circular buffer used to calculate the moving average.
42//! In case of an error, the value is simply ignored.
43//!
44//! ### Moving Average Calculation
45//!
46//! The moving average is calculated from the circular buffer, and is used to provide the 'average' price of the native currency, over some time period.
47//! It's important to note that the moving average is not a 'real-time' value, but rather a 'lagging' indicator.
48
49#![cfg_attr(not(feature = "std"), no_std)]
50
51use frame_support::{pallet_prelude::*, DefaultNoBound};
52use frame_system::pallet_prelude::*;
53pub use pallet::*;
54use sp_arithmetic::{
55 fixed_point::FixedU128,
56 traits::{CheckedAdd, SaturatedConversion, Saturating, Zero},
57};
58use sp_std::marker::PhantomData;
59
60use orml_traits::OnNewData;
61
62use astar_primitives::{
63 oracle::{CurrencyAmount, CurrencyId, PriceProvider},
64 BlockNumber,
65};
66
67pub mod weights;
68pub use weights::WeightInfo;
69
70#[cfg(test)]
71mod mock;
72#[cfg(test)]
73mod tests;
74
75#[cfg(feature = "runtime-benchmarks")]
76mod benchmarking;
77
78/// Trait for processing accumulated currency values within a single block.
79///
80/// This can be anything from median, average, or more complex calculation.
81pub trait ProcessBlockValues {
82 /// Process the accumulated values and return the result.
83 ///
84 /// In case of an error, return an error message.
85 fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str>;
86}
87
88/// Used to calculate the simple average of the accumulated values.
89pub struct AverageBlockValue;
90impl ProcessBlockValues for AverageBlockValue {
91 fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str> {
92 if values.is_empty() {
93 return Err("No values exist for the current block.");
94 }
95
96 let sum = values.iter().fold(CurrencyAmount::zero(), |acc, &value| {
97 acc.saturating_add(value)
98 });
99
100 Ok(sum.saturating_mul(FixedU128::from_rational(1, values.len() as u128)))
101 }
102}
103
104/// Used to calculate the median of the accumulated values.
105pub struct MedianBlockValue;
106impl ProcessBlockValues for MedianBlockValue {
107 fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str> {
108 if values.is_empty() {
109 return Err("No values exist for the current block.");
110 }
111
112 let mut sorted_values = values.to_vec();
113 sorted_values.sort_unstable();
114
115 let mid = sorted_values.len() / 2;
116
117 if sorted_values.len() % 2 == 0 {
118 Ok(sorted_values[mid.saturating_sub(1)]
119 .saturating_add(sorted_values[mid])
120 .saturating_mul(CurrencyAmount::from_rational(1, 2)))
121 } else {
122 Ok(sorted_values[mid])
123 }
124 }
125}
126
127/// Used to aggregate the accumulated values over some time period.
128///
129/// To avoid having a large memory footprint, values are summed up into a single accumulator.
130/// Number of summed up values is tracked in a separate field.
131#[derive(Encode, Decode, MaxEncodedLen, Default, Clone, Copy, Debug, PartialEq, Eq, TypeInfo)]
132pub struct ValueAggregator {
133 /// Total accumulated value amount.
134 #[codec(compact)]
135 pub(crate) total: CurrencyAmount,
136 /// Number of values accumulated.
137 #[codec(compact)]
138 pub(crate) count: u32,
139 /// Block number at which aggregation should reset.
140 #[codec(compact)]
141 pub(crate) limit_block: BlockNumber,
142}
143
144impl ValueAggregator {
145 /// New value aggregator, with the given block number as the new limit.
146 pub fn new(limit_block: BlockNumber) -> Self {
147 Self {
148 limit_block,
149 ..Default::default()
150 }
151 }
152
153 /// Attempts to add a value to the aggregator, consuming `self` in the process.
154 ///
155 /// Returns an error if the addition would cause an overflow in the accumulator or the counter.
156 /// Otherwise returns the updated aggregator.
157 pub fn try_add(mut self, value: CurrencyAmount) -> Result<Self, &'static str> {
158 self.total = self
159 .total
160 .checked_add(&value)
161 .ok_or("Failed to add value to the aggregator due to overflow.")?;
162
163 self.count = self
164 .count
165 .checked_add(1)
166 .ok_or("Failed to increment count in the aggregator due to overflow.")?;
167
168 Ok(self)
169 }
170
171 /// Returns the average of the accumulated values.
172 pub fn average(&self) -> CurrencyAmount {
173 if self.count.is_zero() {
174 CurrencyAmount::zero()
175 } else {
176 self.total
177 .saturating_mul(FixedU128::from_rational(1, self.count.into()))
178 }
179 }
180}
181
182/// Used to store the aggregated intermediate values into a circular buffer.
183///
184/// Inserts values sequentially into the buffer, until the buffer has been filled out.
185/// After that, the oldest value is always overwritten with the new value.
186#[derive(
187 Encode,
188 Decode,
189 MaxEncodedLen,
190 RuntimeDebugNoBound,
191 PartialEqNoBound,
192 EqNoBound,
193 CloneNoBound,
194 TypeInfo,
195 DefaultNoBound,
196)]
197#[scale_info(skip_type_params(L))]
198pub struct CircularBuffer<L: Get<u32>> {
199 /// Currency values store.
200 pub(crate) buffer: BoundedVec<CurrencyAmount, L>,
201 /// Next index to write to.
202 #[codec(compact)]
203 pub(crate) head: u32,
204}
205
206impl<L: Get<u32>> CircularBuffer<L> {
207 /// Adds a new value to the circular buffer, possibly overriding the oldest value if capacity is filled.
208 pub fn add(&mut self, value: CurrencyAmount) {
209 // This can never happen, parameters must ensure that.
210 // But we still check it and log an error if it does.
211 if self.head >= L::get() || self.head as usize > self.buffer.len() {
212 log::error!(
213 target: LOG_TARGET,
214 "Failed to push value to the circular buffer due to invalid next index. \
215 Next index: {:?}, Buffer length: {:?}, Buffer capacity: {:?}",
216 self.head,
217 self.buffer.len(),
218 L::get()
219 );
220 return;
221 }
222
223 if self.buffer.len() > self.head as usize {
224 // Vec has been filled out, so we need to override the 'head' value
225 self.buffer[self.head as usize] = value;
226 } else {
227 // Vec is not full yet, so we can just push the value
228 let _ignorable = self.buffer.try_push(value);
229 }
230 self.head = self.head.saturating_add(1) % L::get();
231 }
232
233 /// Returns the average of the accumulated values.
234 pub fn average(&self) -> CurrencyAmount {
235 if self.buffer.is_empty() {
236 return CurrencyAmount::zero();
237 }
238
239 let sum = self
240 .buffer
241 .iter()
242 .fold(CurrencyAmount::zero(), |acc, &value| {
243 acc.saturating_add(value)
244 });
245
246 // At this point, length of the buffer is guaranteed to be greater than zero.
247 sum.saturating_mul(FixedU128::from_rational(1, self.buffer.len() as u128))
248 }
249}
250
251const LOG_TARGET: &str = "price-aggregator";
252
253#[frame_support::pallet]
254pub mod pallet {
255 use super::*;
256
257 /// The current storage version.
258 pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
259
260 #[pallet::pallet]
261 #[pallet::storage_version(STORAGE_VERSION)]
262 pub struct Pallet<T>(PhantomData<T>);
263
264 #[pallet::config]
265 pub trait Config: frame_system::Config {
266 /// Maximum number of distinct currency values we can store during a single block.
267 #[pallet::constant]
268 type MaxValuesPerBlock: Get<u32>;
269
270 /// Used to process accumulated values in the current block.
271 type ProcessBlockValues: ProcessBlockValues;
272
273 /// Native currency ID that this pallet is supposed to track.
274 type NativeCurrencyId: Get<CurrencyId>;
275
276 /// Maximum length of the circular buffer used to calculate the moving average.
277 #[pallet::constant]
278 type CircularBufferLength: Get<u32>;
279
280 /// Duration of aggregation period expressed in the number of blocks.
281 /// During this time, currency values are aggregated, and are then used to calculate the average value.
282 #[pallet::constant]
283 type AggregationDuration: Get<BlockNumberFor<Self>>;
284
285 type WeightInfo: WeightInfo;
286 }
287
288 #[pallet::genesis_config]
289 #[derive(frame_support::DefaultNoBound)]
290 pub struct GenesisConfig<T: Config> {
291 pub circular_buffer: BoundedVec<CurrencyAmount, T::CircularBufferLength>,
292 }
293
294 #[pallet::genesis_build]
295 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
296 fn build(&self) {
297 ValuesCircularBuffer::<T>::put(CircularBuffer::<T::CircularBufferLength> {
298 buffer: self.circular_buffer.clone(),
299 head: self.circular_buffer.len() as u32 % T::CircularBufferLength::get(),
300 });
301
302 IntermediateValueAggregator::<T>::mutate(|aggregator| {
303 aggregator.limit_block = T::AggregationDuration::get().saturated_into();
304 });
305 }
306 }
307
308 #[pallet::event]
309 #[pallet::generate_deposit(pub(crate) fn deposit_event)]
310 pub enum Event<T: Config> {
311 /// New average native currency value has been calculated and pushed into the moving average buffer.
312 AverageAggregatedValue { value: CurrencyAmount },
313 }
314
315 /// Storage for the accumulated native currency price in the current block.
316 #[pallet::storage]
317 #[pallet::whitelist_storage]
318 pub type CurrentBlockValues<T: Config> =
319 StorageValue<_, BoundedVec<CurrencyAmount, T::MaxValuesPerBlock>, ValueQuery>;
320
321 /// Used to store the aggregated processed block values during some time period.
322 #[pallet::storage]
323 #[pallet::whitelist_storage]
324 pub type IntermediateValueAggregator<T: Config> = StorageValue<_, ValueAggregator, ValueQuery>;
325
326 /// Used to store aggregated intermediate values for some time period.
327 #[pallet::storage]
328 pub type ValuesCircularBuffer<T: Config> =
329 StorageValue<_, CircularBuffer<T::CircularBufferLength>, ValueQuery>;
330
331 #[pallet::hooks]
332 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
333 fn on_initialize(now: BlockNumberFor<T>) -> Weight {
334 // Need to account for the reads and writes of:
335 // - CurrentBlockValues
336 // - IntermediateValueAggregator
337 //
338 // Also need to account for the weight of processing block accumulated values.
339 let mut total_weight = T::DbWeight::get()
340 .reads_writes(2, 2)
341 .saturating_add(T::WeightInfo::process_block_aggregated_values());
342
343 if IntermediateValueAggregator::<T>::get().limit_block <= now.saturated_into() {
344 total_weight
345 .saturating_accrue(T::WeightInfo::process_intermediate_aggregated_values());
346 }
347
348 total_weight
349 }
350
351 fn on_finalize(now: BlockNumberFor<T>) {
352 // 1. Process the accumulated native currency values in the current block.
353 Self::process_block_aggregated_values();
354
355 // 2. Check if we need to push the average aggregated value to the storage.
356 if IntermediateValueAggregator::<T>::get().limit_block <= now.saturated_into() {
357 Self::process_intermediate_aggregated_values(now);
358 }
359 }
360
361 fn integrity_test() {
362 assert!(T::MaxValuesPerBlock::get() > 0);
363 assert!(T::CircularBufferLength::get() > 0);
364 assert!(!T::AggregationDuration::get().is_zero());
365 }
366 }
367
368 impl<T: Config> Pallet<T> {
369 /// Used to process the native currency values accumulated in the current block.
370 ///
371 /// Guarantees that the accumulated values are cleared after processing.
372 /// In case of an error during processing, intermediate aggregated value is not updated.
373 pub(crate) fn process_block_aggregated_values() {
374 // 1. Take the accumulated block values, clearing the existing storage.
375 let accumulated_values = CurrentBlockValues::<T>::take();
376
377 // 2. Attempt to process accumulated block values.
378 let processed_value = match T::ProcessBlockValues::process(
379 accumulated_values.as_slice(),
380 ) {
381 Ok(value) => value,
382 Err(message) => {
383 log::trace!(
384 target: LOG_TARGET,
385 "Failed to process the accumulated native currency values in the current block. \
386 Reason: {:?}",
387 message
388 );
389
390 // Nothing to do if we have no valid value to store.
391 return;
392 }
393 };
394
395 // 3. Attempt to store the processed value.
396 // This operation is practically infallible, but we check the results for the additional safety.
397 let intermediate_value = IntermediateValueAggregator::<T>::get();
398 match intermediate_value.try_add(processed_value) {
399 Ok(new_aggregator) => {
400 IntermediateValueAggregator::<T>::put(new_aggregator);
401 }
402 Err(message) => {
403 log::error!(
404 target: LOG_TARGET,
405 "Failed to add the processed native currency value to the intermediate storage. \
406 Reason: {:?}",
407 message
408 );
409 }
410 }
411 }
412
413 /// Used to process the intermediate aggregated values, and push them to the moving average storage.
414 pub(crate) fn process_intermediate_aggregated_values(now: BlockNumberFor<T>) {
415 // 1. Get the average value from the intermediate aggregator.
416 let average_value = IntermediateValueAggregator::<T>::get().average();
417
418 // 2. Reset the aggregator back to zero, and set the new limit block.
419 IntermediateValueAggregator::<T>::put(ValueAggregator::new(
420 now.saturating_add(T::AggregationDuration::get())
421 .saturated_into(),
422 ));
423
424 // 3. In case aggregated value equals 0, it means something has gone wrong since it's extremely unlikely
425 // that price goes to absolute zero. The much more likely case is that there's a problem with the oracle data feed.
426 if average_value.is_zero() {
427 log::error!(
428 target: LOG_TARGET,
429 "The average aggregated price equals zero, which most likely means that oracle data feed is faulty. \
430 Not pushing the 'zero' value to the moving average storage."
431 );
432 return;
433 }
434
435 // 4. Push the 'valid' average aggregated value to the circular buffer.
436 ValuesCircularBuffer::<T>::mutate(|buffer| buffer.add(average_value));
437 Self::deposit_event(Event::AverageAggregatedValue {
438 value: average_value,
439 });
440 }
441 }
442
443 // Make this pallet an 'observer' ('listener') of the new oracle data feed.
444 impl<T: Config> OnNewData<T::AccountId, CurrencyId, CurrencyAmount> for Pallet<T> {
445 fn on_new_data(who: &T::AccountId, key: &CurrencyId, value: &CurrencyAmount) {
446 // Ignore any currency that is not native currency.
447 if T::NativeCurrencyId::get() != *key {
448 return;
449 }
450
451 CurrentBlockValues::<T>::mutate(|v| match v.try_push(*value) {
452 Ok(()) => {}
453 Err(_) => {
454 log::error!(
455 target: LOG_TARGET,
456 "Failed to push native currency value into the ongoing block due to exceeded capacity. \
457 Value was submitted by: {:?}",
458 who
459 );
460 }
461 });
462 }
463 }
464
465 // Make this pallet a `price provider` for the native currency.
466 //
467 // For this particular implementation, a simple moving average is used to calculate the average price.
468 impl<T: Config> PriceProvider for Pallet<T> {
469 fn average_price() -> FixedU128 {
470 ValuesCircularBuffer::<T>::get().average()
471 }
472 }
473}