pallet_price_aggregator/
lib.rs

1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! # Price Aggregator Pallet
20//!
21//! ## Overview
22//!
23//! Purpose of this pallet is to aggregate price data over some time, and then calculate the moving average.
24//!
25//! ## Solution
26//!
27//! The overall solution is broken down into several steps that occur over the course of various time periods.
28//!
29//! ### Block Aggregation
30//!
31//! During each block, the native currency price data is accumulated. This is done 'outside' the pallet, and it's only expected
32//! that 'something' will push this data to the price aggregator pallet. The pallet itself doesn't care about the source of the data, nor who submitted it.
33//!
34//! At the end of each block, accumulated data is processed according to the specified algorithm (e.g. can be average, median, or something else).
35//! In case processing was successful, the result is stored in the intermediate value aggregator.
36//! In case processing fails, value is simply ignored.
37//!
38//! ### Intermediate Value Aggregation
39//!
40//! After a predetermined amount of time (blocks) has passed, the average value is calculated from the intermediate value aggregator.
41//! In case it's a valid value (non-zero), it's pushed into the circular buffer used to calculate the moving average.
42//! In case of an error, the value is simply ignored.
43//!
44//! ### Moving Average Calculation
45//!
46//! The moving average is calculated from the circular buffer, and is used to provide the 'average' price of the native currency, over some time period.
47//! It's important to note that the moving average is not a 'real-time' value, but rather a 'lagging' indicator.
48
49#![cfg_attr(not(feature = "std"), no_std)]
50
51use frame_support::{pallet_prelude::*, DefaultNoBound};
52use frame_system::pallet_prelude::*;
53pub use pallet::*;
54use sp_arithmetic::{
55    fixed_point::FixedU128,
56    traits::{CheckedAdd, SaturatedConversion, Saturating, Zero},
57};
58use sp_std::marker::PhantomData;
59
60use orml_traits::OnNewData;
61
62use astar_primitives::{
63    oracle::{CurrencyAmount, CurrencyId, PriceProvider},
64    BlockNumber,
65};
66
67pub mod weights;
68pub use weights::WeightInfo;
69
70#[cfg(test)]
71mod mock;
72#[cfg(test)]
73mod tests;
74
75#[cfg(feature = "runtime-benchmarks")]
76mod benchmarking;
77
78/// Trait for processing accumulated currency values within a single block.
79///
80/// This can be anything from median, average, or more complex calculation.
81pub trait ProcessBlockValues {
82    /// Process the accumulated values and return the result.
83    ///
84    /// In case of an error, return an error message.
85    fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str>;
86}
87
88/// Used to calculate the simple average of the accumulated values.
89pub struct AverageBlockValue;
90impl ProcessBlockValues for AverageBlockValue {
91    fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str> {
92        if values.is_empty() {
93            return Err("No values exist for the current block.");
94        }
95
96        let sum = values.iter().fold(CurrencyAmount::zero(), |acc, &value| {
97            acc.saturating_add(value)
98        });
99
100        Ok(sum.saturating_mul(FixedU128::from_rational(1, values.len() as u128)))
101    }
102}
103
104/// Used to calculate the median of the accumulated values.
105pub struct MedianBlockValue;
106impl ProcessBlockValues for MedianBlockValue {
107    fn process(values: &[CurrencyAmount]) -> Result<CurrencyAmount, &'static str> {
108        if values.is_empty() {
109            return Err("No values exist for the current block.");
110        }
111
112        let mut sorted_values = values.to_vec();
113        sorted_values.sort_unstable();
114
115        let mid = sorted_values.len() / 2;
116
117        if sorted_values.len() % 2 == 0 {
118            Ok(sorted_values[mid.saturating_sub(1)]
119                .saturating_add(sorted_values[mid])
120                .saturating_mul(CurrencyAmount::from_rational(1, 2)))
121        } else {
122            Ok(sorted_values[mid])
123        }
124    }
125}
126
127/// Used to aggregate the accumulated values over some time period.
128///
129/// To avoid having a large memory footprint, values are summed up into a single accumulator.
130/// Number of summed up values is tracked in a separate field.
131#[derive(Encode, Decode, MaxEncodedLen, Default, Clone, Copy, Debug, PartialEq, Eq, TypeInfo)]
132pub struct ValueAggregator {
133    /// Total accumulated value amount.
134    #[codec(compact)]
135    pub(crate) total: CurrencyAmount,
136    /// Number of values accumulated.
137    #[codec(compact)]
138    pub(crate) count: u32,
139    /// Block number at which aggregation should reset.
140    #[codec(compact)]
141    pub(crate) limit_block: BlockNumber,
142}
143
144impl ValueAggregator {
145    /// New value aggregator, with the given block number as the new limit.
146    pub fn new(limit_block: BlockNumber) -> Self {
147        Self {
148            limit_block,
149            ..Default::default()
150        }
151    }
152
153    /// Attempts to add a value to the aggregator, consuming `self` in the process.
154    ///
155    /// Returns an error if the addition would cause an overflow in the accumulator or the counter.
156    /// Otherwise returns the updated aggregator.
157    pub fn try_add(mut self, value: CurrencyAmount) -> Result<Self, &'static str> {
158        self.total = self
159            .total
160            .checked_add(&value)
161            .ok_or("Failed to add value to the aggregator due to overflow.")?;
162
163        self.count = self
164            .count
165            .checked_add(1)
166            .ok_or("Failed to increment count in the aggregator due to overflow.")?;
167
168        Ok(self)
169    }
170
171    /// Returns the average of the accumulated values.
172    pub fn average(&self) -> CurrencyAmount {
173        if self.count.is_zero() {
174            CurrencyAmount::zero()
175        } else {
176            self.total
177                .saturating_mul(FixedU128::from_rational(1, self.count.into()))
178        }
179    }
180}
181
182/// Used to store the aggregated intermediate values into a circular buffer.
183///
184/// Inserts values sequentially into the buffer, until the buffer has been filled out.
185/// After that, the oldest value is always overwritten with the new value.
186#[derive(
187    Encode,
188    Decode,
189    MaxEncodedLen,
190    RuntimeDebugNoBound,
191    PartialEqNoBound,
192    EqNoBound,
193    CloneNoBound,
194    TypeInfo,
195    DefaultNoBound,
196)]
197#[scale_info(skip_type_params(L))]
198pub struct CircularBuffer<L: Get<u32>> {
199    /// Currency values store.
200    pub(crate) buffer: BoundedVec<CurrencyAmount, L>,
201    /// Next index to write to.
202    #[codec(compact)]
203    pub(crate) head: u32,
204}
205
206impl<L: Get<u32>> CircularBuffer<L> {
207    /// Adds a new value to the circular buffer, possibly overriding the oldest value if capacity is filled.
208    pub fn add(&mut self, value: CurrencyAmount) {
209        // This can never happen, parameters must ensure that.
210        // But we still check it and log an error if it does.
211        if self.head >= L::get() || self.head as usize > self.buffer.len() {
212            log::error!(
213                target: LOG_TARGET,
214                "Failed to push value to the circular buffer due to invalid next index. \
215                Next index: {:?}, Buffer length: {:?}, Buffer capacity: {:?}",
216                self.head,
217                self.buffer.len(),
218                L::get()
219            );
220            return;
221        }
222
223        if self.buffer.len() > self.head as usize {
224            // Vec has been filled out, so we need to override the 'head' value
225            self.buffer[self.head as usize] = value;
226        } else {
227            // Vec is not full yet, so we can just push the value
228            let _ignorable = self.buffer.try_push(value);
229        }
230        self.head = self.head.saturating_add(1) % L::get();
231    }
232
233    /// Returns the average of the accumulated values.
234    pub fn average(&self) -> CurrencyAmount {
235        if self.buffer.is_empty() {
236            return CurrencyAmount::zero();
237        }
238
239        let sum = self
240            .buffer
241            .iter()
242            .fold(CurrencyAmount::zero(), |acc, &value| {
243                acc.saturating_add(value)
244            });
245
246        // At this point, length of the buffer is guaranteed to be greater than zero.
247        sum.saturating_mul(FixedU128::from_rational(1, self.buffer.len() as u128))
248    }
249}
250
251const LOG_TARGET: &str = "price-aggregator";
252
253#[frame_support::pallet]
254pub mod pallet {
255    use super::*;
256
257    /// The current storage version.
258    pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
259
260    #[pallet::pallet]
261    #[pallet::storage_version(STORAGE_VERSION)]
262    pub struct Pallet<T>(PhantomData<T>);
263
264    #[pallet::config]
265    pub trait Config: frame_system::Config {
266        /// Maximum number of distinct currency values we can store during a single block.
267        #[pallet::constant]
268        type MaxValuesPerBlock: Get<u32>;
269
270        /// Used to process accumulated values in the current block.
271        type ProcessBlockValues: ProcessBlockValues;
272
273        /// Native currency ID that this pallet is supposed to track.
274        type NativeCurrencyId: Get<CurrencyId>;
275
276        /// Maximum length of the circular buffer used to calculate the moving average.
277        #[pallet::constant]
278        type CircularBufferLength: Get<u32>;
279
280        /// Duration of aggregation period expressed in the number of blocks.
281        /// During this time, currency values are aggregated, and are then used to calculate the average value.
282        #[pallet::constant]
283        type AggregationDuration: Get<BlockNumberFor<Self>>;
284
285        type WeightInfo: WeightInfo;
286    }
287
288    #[pallet::genesis_config]
289    #[derive(frame_support::DefaultNoBound)]
290    pub struct GenesisConfig<T: Config> {
291        pub circular_buffer: BoundedVec<CurrencyAmount, T::CircularBufferLength>,
292    }
293
294    #[pallet::genesis_build]
295    impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
296        fn build(&self) {
297            ValuesCircularBuffer::<T>::put(CircularBuffer::<T::CircularBufferLength> {
298                buffer: self.circular_buffer.clone(),
299                head: self.circular_buffer.len() as u32 % T::CircularBufferLength::get(),
300            });
301
302            IntermediateValueAggregator::<T>::mutate(|aggregator| {
303                aggregator.limit_block = T::AggregationDuration::get().saturated_into();
304            });
305        }
306    }
307
308    #[pallet::event]
309    #[pallet::generate_deposit(pub(crate) fn deposit_event)]
310    pub enum Event<T: Config> {
311        /// New average native currency value has been calculated and pushed into the moving average buffer.
312        AverageAggregatedValue { value: CurrencyAmount },
313    }
314
315    /// Storage for the accumulated native currency price in the current block.
316    #[pallet::storage]
317    #[pallet::whitelist_storage]
318    pub type CurrentBlockValues<T: Config> =
319        StorageValue<_, BoundedVec<CurrencyAmount, T::MaxValuesPerBlock>, ValueQuery>;
320
321    /// Used to store the aggregated processed block values during some time period.
322    #[pallet::storage]
323    #[pallet::whitelist_storage]
324    pub type IntermediateValueAggregator<T: Config> = StorageValue<_, ValueAggregator, ValueQuery>;
325
326    /// Used to store aggregated intermediate values for some time period.
327    #[pallet::storage]
328    pub type ValuesCircularBuffer<T: Config> =
329        StorageValue<_, CircularBuffer<T::CircularBufferLength>, ValueQuery>;
330
331    #[pallet::hooks]
332    impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
333        fn on_initialize(now: BlockNumberFor<T>) -> Weight {
334            // Need to account for the reads and writes of:
335            // - CurrentBlockValues
336            // - IntermediateValueAggregator
337            //
338            // Also need to account for the weight of processing block accumulated values.
339            let mut total_weight = T::DbWeight::get()
340                .reads_writes(2, 2)
341                .saturating_add(T::WeightInfo::process_block_aggregated_values());
342
343            if IntermediateValueAggregator::<T>::get().limit_block <= now.saturated_into() {
344                total_weight
345                    .saturating_accrue(T::WeightInfo::process_intermediate_aggregated_values());
346            }
347
348            total_weight
349        }
350
351        fn on_finalize(now: BlockNumberFor<T>) {
352            // 1. Process the accumulated native currency values in the current block.
353            Self::process_block_aggregated_values();
354
355            // 2. Check if we need to push the average aggregated value to the storage.
356            if IntermediateValueAggregator::<T>::get().limit_block <= now.saturated_into() {
357                Self::process_intermediate_aggregated_values(now);
358            }
359        }
360
361        fn integrity_test() {
362            assert!(T::MaxValuesPerBlock::get() > 0);
363            assert!(T::CircularBufferLength::get() > 0);
364            assert!(!T::AggregationDuration::get().is_zero());
365        }
366    }
367
368    impl<T: Config> Pallet<T> {
369        /// Used to process the native currency values accumulated in the current block.
370        ///
371        /// Guarantees that the accumulated values are cleared after processing.
372        /// In case of an error during processing, intermediate aggregated value is not updated.
373        pub(crate) fn process_block_aggregated_values() {
374            // 1. Take the accumulated block values, clearing the existing storage.
375            let accumulated_values = CurrentBlockValues::<T>::take();
376
377            // 2. Attempt to process accumulated block values.
378            let processed_value = match T::ProcessBlockValues::process(
379                accumulated_values.as_slice(),
380            ) {
381                Ok(value) => value,
382                Err(message) => {
383                    log::trace!(
384                        target: LOG_TARGET,
385                        "Failed to process the accumulated native currency values in the current block. \
386                        Reason: {:?}",
387                        message
388                    );
389
390                    // Nothing to do if we have no valid value to store.
391                    return;
392                }
393            };
394
395            // 3. Attempt to store the processed value.
396            // This operation is practically infallible, but we check the results for the additional safety.
397            let intermediate_value = IntermediateValueAggregator::<T>::get();
398            match intermediate_value.try_add(processed_value) {
399                Ok(new_aggregator) => {
400                    IntermediateValueAggregator::<T>::put(new_aggregator);
401                }
402                Err(message) => {
403                    log::error!(
404                        target: LOG_TARGET,
405                        "Failed to add the processed native currency value to the intermediate storage. \
406                        Reason: {:?}",
407                        message
408                    );
409                }
410            }
411        }
412
413        /// Used to process the intermediate aggregated values, and push them to the moving average storage.
414        pub(crate) fn process_intermediate_aggregated_values(now: BlockNumberFor<T>) {
415            // 1. Get the average value from the intermediate aggregator.
416            let average_value = IntermediateValueAggregator::<T>::get().average();
417
418            // 2. Reset the aggregator back to zero, and set the new limit block.
419            IntermediateValueAggregator::<T>::put(ValueAggregator::new(
420                now.saturating_add(T::AggregationDuration::get())
421                    .saturated_into(),
422            ));
423
424            // 3. In case aggregated value equals 0, it means something has gone wrong since it's extremely unlikely
425            // that price goes to absolute zero. The much more likely case is that there's a problem with the oracle data feed.
426            if average_value.is_zero() {
427                log::error!(
428                    target: LOG_TARGET,
429                    "The average aggregated price equals zero, which most likely means that oracle data feed is faulty. \
430                    Not pushing the 'zero' value to the moving average storage."
431                );
432                return;
433            }
434
435            // 4. Push the 'valid' average aggregated value to the circular buffer.
436            ValuesCircularBuffer::<T>::mutate(|buffer| buffer.add(average_value));
437            Self::deposit_event(Event::AverageAggregatedValue {
438                value: average_value,
439            });
440        }
441    }
442
443    // Make this pallet an 'observer' ('listener') of the new oracle data feed.
444    impl<T: Config> OnNewData<T::AccountId, CurrencyId, CurrencyAmount> for Pallet<T> {
445        fn on_new_data(who: &T::AccountId, key: &CurrencyId, value: &CurrencyAmount) {
446            // Ignore any currency that is not native currency.
447            if T::NativeCurrencyId::get() != *key {
448                return;
449            }
450
451            CurrentBlockValues::<T>::mutate(|v| match v.try_push(*value) {
452                Ok(()) => {}
453                Err(_) => {
454                    log::error!(
455                    target: LOG_TARGET,
456                        "Failed to push native currency value into the ongoing block due to exceeded capacity. \
457                        Value was submitted by: {:?}",
458                        who
459                    );
460                }
461            });
462        }
463    }
464
465    // Make this pallet a `price provider` for the native currency.
466    //
467    // For this particular implementation, a simple moving average is used to calculate the average price.
468    impl<T: Config> PriceProvider for Pallet<T> {
469        fn average_price() -> FixedU128 {
470            ValuesCircularBuffer::<T>::get().average()
471        }
472    }
473}