moonbeam_rpc_trace/
lib.rs

1// Copyright 2019-2025 PureStake Inc.
2// This file is part of Moonbeam.
3
4// Moonbeam is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Moonbeam is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16
17//! `trace_filter` RPC handler and its associated service task.
18//! The RPC handler rely on `CacheTask` which provides a future that must be run inside a tokio
19//! executor.
20//!
21//! The implementation is composed of multiple tasks :
22//! - Many calls the RPC handler `Trace::filter`, communicating with the main task.
23//! - A main `CacheTask` managing the cache and the communication between tasks.
24//! - For each traced block an async task responsible to wait for a permit, spawn a blocking
25//!   task and waiting for the result, then send it to the main `CacheTask`.
26
27use futures::{select, stream::FuturesUnordered, FutureExt, StreamExt};
28use std::{collections::BTreeMap, future::Future, marker::PhantomData, sync::Arc, time::Duration};
29use tokio::{
30    sync::{mpsc, oneshot, Semaphore},
31    time::sleep,
32};
33use tracing::{instrument, Instrument};
34
35use sc_client_api::backend::{Backend, StateBackend, StorageProvider};
36use sc_utils::mpsc::TracingUnboundedSender;
37use sp_api::{ApiExt, Core, ProvideRuntimeApi};
38use sp_block_builder::BlockBuilder;
39use sp_blockchain::{
40    Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
41};
42use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT};
43use substrate_prometheus_endpoint::{
44    register, Counter, PrometheusError, Registry as PrometheusRegistry, U64,
45};
46
47use ethereum_types::H256;
48use fc_storage::StorageOverride;
49use fp_rpc::EthereumRuntimeRPCApi;
50
51use moonbeam_client_evm_tracing::{
52    formatters::ResponseFormatter,
53    types::block::{self, TransactionTrace},
54};
55pub use moonbeam_rpc_core_trace::{FilterRequest, TraceServer};
56use moonbeam_rpc_core_types::{RequestBlockId, RequestBlockTag};
57use moonbeam_rpc_primitives_debug::DebugRuntimeApi;
58
59type TxsTraceRes = Result<Vec<TransactionTrace>, String>;
60
61/// RPC handler. Will communicate with a `CacheTask` through a `CacheRequester`.
62pub struct Trace<B, C> {
63    _phantom: PhantomData<B>,
64    client: Arc<C>,
65    requester: CacheRequester,
66    max_count: u32,
67}
68
69impl<B, C> Clone for Trace<B, C> {
70    fn clone(&self) -> Self {
71        Self {
72            _phantom: PhantomData,
73            client: Arc::clone(&self.client),
74            requester: self.requester.clone(),
75            max_count: self.max_count,
76        }
77    }
78}
79
80impl<B, C> Trace<B, C>
81where
82    B: BlockT<Hash = H256> + Send + Sync + 'static,
83    B::Header: HeaderT<Number = u32>,
84    C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
85    C: Send + Sync + 'static,
86{
87    /// Create a new RPC handler.
88    pub fn new(client: Arc<C>, requester: CacheRequester, max_count: u32) -> Self {
89        Self {
90            client,
91            requester,
92            max_count,
93            _phantom: PhantomData,
94        }
95    }
96
97    /// Convert an optional block ID (number or tag) to a block height.
98    fn block_id(&self, id: Option<RequestBlockId>) -> Result<u32, &'static str> {
99        match id {
100            Some(RequestBlockId::Number(n)) => Ok(n),
101            None | Some(RequestBlockId::Tag(RequestBlockTag::Latest)) => {
102                Ok(self.client.info().best_number)
103            }
104            Some(RequestBlockId::Tag(RequestBlockTag::Earliest)) => Ok(0),
105            Some(RequestBlockId::Tag(RequestBlockTag::Finalized)) => {
106                Ok(self.client.info().finalized_number)
107            }
108            Some(RequestBlockId::Tag(RequestBlockTag::Pending)) => {
109                Err("'pending' is not supported")
110            }
111            Some(RequestBlockId::Hash(_)) => Err("Block hash not supported"),
112        }
113    }
114
115    /// `trace_filter` endpoint (wrapped in the trait implementation with futures compatibility)
116    async fn filter(self, req: FilterRequest) -> TxsTraceRes {
117        let from_block = self.block_id(req.from_block)?;
118        let to_block = self.block_id(req.to_block)?;
119        let block_heights = from_block..=to_block;
120
121        let count = req.count.unwrap_or(self.max_count);
122        if count > self.max_count {
123            return Err(format!(
124                "count ({}) can't be greater than maximum ({})",
125                count, self.max_count
126            ));
127        }
128
129        // Build a list of all the Substrate block hashes that need to be traced.
130        let mut block_hashes = vec![];
131        for block_height in block_heights {
132            if block_height == 0 {
133                continue; // no traces for genesis block.
134            }
135
136            let block_hash = self
137                .client
138                .hash(block_height)
139                .map_err(|e| {
140                    format!(
141                        "Error when fetching block {} header : {:?}",
142                        block_height, e
143                    )
144                })?
145                .ok_or_else(|| format!("Block with height {} don't exist", block_height))?;
146
147            block_hashes.push(block_hash);
148        }
149
150        // Start a batch with these blocks.
151        let batch_id = self.requester.start_batch(block_hashes.clone()).await?;
152        // Fetch all the traces. It is done in another function to simplify error handling and allow
153        // to call the following `stop_batch` regardless of the result. This is important for the
154        // cache cleanup to work properly.
155        let res = self.fetch_traces(req, &block_hashes, count as usize).await;
156        // Stop the batch, allowing the cache task to remove useless non-started block traces and
157        // start the expiration delay.
158        self.requester.stop_batch(batch_id).await;
159
160        res
161    }
162
163    async fn fetch_traces(
164        &self,
165        req: FilterRequest,
166        block_hashes: &[H256],
167        count: usize,
168    ) -> TxsTraceRes {
169        let from_address = req.from_address.unwrap_or_default();
170        let to_address = req.to_address.unwrap_or_default();
171
172        let mut traces_amount: i64 = -(req.after.unwrap_or(0) as i64);
173        let mut traces = vec![];
174
175        for &block_hash in block_hashes {
176            // Request the traces of this block to the cache service.
177            // This will resolve quickly if the block is already cached, or wait until the block
178            // has finished tracing.
179            let block_traces = self.requester.get_traces(block_hash).await?;
180
181            // Filter addresses.
182            let mut block_traces: Vec<_> = block_traces
183                .iter()
184                .filter(|trace| match trace.action {
185                    block::TransactionTraceAction::Call { from, to, .. } => {
186                        (from_address.is_empty() || from_address.contains(&from))
187                            && (to_address.is_empty() || to_address.contains(&to))
188                    }
189                    block::TransactionTraceAction::Create { from, .. } => {
190                        (from_address.is_empty() || from_address.contains(&from))
191                            && to_address.is_empty()
192                    }
193                    block::TransactionTraceAction::Suicide { address, .. } => {
194                        (from_address.is_empty() || from_address.contains(&address))
195                            && to_address.is_empty()
196                    }
197                })
198                .cloned()
199                .collect();
200
201            // Don't insert anything if we're still before "after"
202            traces_amount += block_traces.len() as i64;
203            if traces_amount > 0 {
204                let traces_amount = traces_amount as usize;
205                // If the current Vec of traces is across the "after" marker,
206                // we skip some elements of it.
207                if traces_amount < block_traces.len() {
208                    let skip = block_traces.len() - traces_amount;
209                    block_traces = block_traces.into_iter().skip(skip).collect();
210                }
211
212                traces.append(&mut block_traces);
213
214                // If we go over "count" (the limit), we trim and exit the loop,
215                // unless we used the default maximum, in which case we return an error.
216                if traces_amount >= count {
217                    if req.count.is_none() {
218                        return Err(format!(
219                            "the amount of traces goes over the maximum ({}), please use 'after' \
220							and 'count' in your request",
221                            self.max_count
222                        ));
223                    }
224
225                    traces = traces.into_iter().take(count).collect();
226                    break;
227                }
228            }
229        }
230
231        Ok(traces)
232    }
233}
234
235#[jsonrpsee::core::async_trait]
236impl<B, C> TraceServer for Trace<B, C>
237where
238    B: BlockT<Hash = H256> + Send + Sync + 'static,
239    B::Header: HeaderT<Number = u32>,
240    C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
241    C: Send + Sync + 'static,
242{
243    async fn filter(
244        &self,
245        filter: FilterRequest,
246    ) -> jsonrpsee::core::RpcResult<Vec<TransactionTrace>> {
247        self.clone()
248            .filter(filter)
249            .await
250            .map_err(fc_rpc::internal_err)
251    }
252}
253
254/// An opaque batch ID.
255#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
256pub struct CacheBatchId(u64);
257
258/// Requests the cache task can accept.
259enum CacheRequest {
260    /// Request to start caching the provided range of blocks.
261    /// The task will add to blocks to its pool and immediately return a new batch ID.
262    StartBatch {
263        /// Returns the ID of the batch for cancellation.
264        sender: oneshot::Sender<CacheBatchId>,
265        /// List of block hash to trace.
266        blocks: Vec<H256>,
267    },
268    /// Fetch the traces for given block hash.
269    /// The task will answer only when it has processed this block.
270    GetTraces {
271        /// Returns the array of traces or an error.
272        sender: oneshot::Sender<TxsTraceRes>,
273        /// Hash of the block.
274        block: H256,
275    },
276    /// Notify the cache that it can stop the batch with that ID. Any block contained only in
277    /// this batch and still not started will be discarded.
278    StopBatch { batch_id: CacheBatchId },
279}
280
281/// Allows to interact with the cache task.
282#[derive(Clone)]
283pub struct CacheRequester(TracingUnboundedSender<CacheRequest>);
284
285impl CacheRequester {
286    /// Request to start caching the provided range of blocks.
287    /// The task will add to blocks to its pool and immediately return the batch ID.
288    #[instrument(skip(self))]
289    pub async fn start_batch(&self, blocks: Vec<H256>) -> Result<CacheBatchId, String> {
290        let (response_tx, response_rx) = oneshot::channel();
291        let sender = self.0.clone();
292
293        sender
294            .unbounded_send(CacheRequest::StartBatch {
295                sender: response_tx,
296                blocks,
297            })
298            .map_err(|e| {
299                format!(
300                    "Failed to send request to the trace cache task. Error : {:?}",
301                    e
302                )
303            })?;
304
305        response_rx.await.map_err(|e| {
306            format!(
307                "Trace cache task closed the response channel. Error : {:?}",
308                e
309            )
310        })
311    }
312
313    /// Fetch the traces for given block hash.
314    /// The task will answer only when it has processed this block.
315    /// The block should be part of a batch first. If no batch has requested the block it will
316    /// return an error.
317    #[instrument(skip(self))]
318    pub async fn get_traces(&self, block: H256) -> TxsTraceRes {
319        let (response_tx, response_rx) = oneshot::channel();
320        let sender = self.0.clone();
321
322        sender
323            .unbounded_send(CacheRequest::GetTraces {
324                sender: response_tx,
325                block,
326            })
327            .map_err(|e| {
328                format!(
329                    "Failed to send request to the trace cache task. Error : {:?}",
330                    e
331                )
332            })?;
333
334        response_rx
335            .await
336            .map_err(|e| {
337                format!(
338                    "Trace cache task closed the response channel. Error : {:?}",
339                    e
340                )
341            })?
342            .map_err(|e| format!("Failed to replay block. Error : {:?}", e))
343    }
344
345    /// Notify the cache that it can stop the batch with that ID. Any block contained only in
346    /// this batch and still in the waiting pool will be discarded.
347    #[instrument(skip(self))]
348    pub async fn stop_batch(&self, batch_id: CacheBatchId) {
349        let sender = self.0.clone();
350
351        // Here we don't care if the request has been accepted or refused, the caller can't
352        // do anything with it.
353        let _ = sender
354            .unbounded_send(CacheRequest::StopBatch { batch_id })
355            .map_err(|e| {
356                format!(
357                    "Failed to send request to the trace cache task. Error : {:?}",
358                    e
359                )
360            });
361    }
362}
363
364/// Data stored for each block in the cache.
365/// `active_batch_count` represents the number of batches using this
366/// block. It will increase immediately when a batch is created, but will be
367/// decrease only after the batch ends and its expiration delay passes.
368/// It allows to keep the data in the cache for following requests that would use
369/// this block, which is important to handle pagination efficiently.
370struct CacheBlock {
371    active_batch_count: usize,
372    state: CacheBlockState,
373}
374
375/// State of a cached block. It can either be polled to be traced or cached.
376enum CacheBlockState {
377    /// Block has been added to the pool blocks to be replayed.
378    /// It may be currently waiting to be replayed or being replayed.
379    Pooled {
380        started: bool,
381        /// Multiple requests might query the same block while it is pooled to be
382        /// traced. They response channel is stored here, and the result will be
383        /// sent in all of them when the tracing is finished.
384        waiting_requests: Vec<oneshot::Sender<TxsTraceRes>>,
385        /// Channel used to unqueue a tracing that has not yet started.
386        /// A tracing will be unqueued if it has not yet been started and the last batch
387        /// needing this block is ended (ignoring the expiration delay).
388        /// It is not used directly, but dropping will wake up the receiver.
389        #[allow(dead_code)]
390        unqueue_sender: oneshot::Sender<()>,
391    },
392    /// Tracing has been completed and the result is available. No Runtime API call
393    /// will be needed until this block cache is removed.
394    Cached { traces: TxsTraceRes },
395}
396
397/// Tracing a block is done in a separate tokio blocking task to avoid clogging the async threads.
398/// For this reason a channel using this type is used by the blocking task to communicate with the
399/// main cache task.
400enum BlockingTaskMessage {
401    /// Notify the tracing for this block has started as the blocking task got a permit from
402    /// the semaphore. This is used to prevent the deletion of a cache entry for a block that has
403    /// started being traced.
404    Started { block_hash: H256 },
405    /// The tracing is finished and the result is sent to the main task.
406    Finished {
407        block_hash: H256,
408        result: TxsTraceRes,
409    },
410}
411
412/// Type wrapper for the cache task, generic over the Client, Block and Backend types.
413pub struct CacheTask<B, C, BE> {
414    client: Arc<C>,
415    backend: Arc<BE>,
416    blocking_permits: Arc<Semaphore>,
417    cached_blocks: BTreeMap<H256, CacheBlock>,
418    batches: BTreeMap<u64, Vec<H256>>,
419    next_batch_id: u64,
420    metrics: Option<Metrics>,
421    _phantom: PhantomData<B>,
422}
423
424impl<B, C, BE> CacheTask<B, C, BE>
425where
426    BE: Backend<B> + 'static,
427    BE::State: StateBackend<BlakeTwo256>,
428    C: ProvideRuntimeApi<B>,
429    C: StorageProvider<B, BE>,
430    C: HeaderMetadata<B, Error = BlockChainError> + HeaderBackend<B>,
431    C: Send + Sync + 'static,
432    B: BlockT<Hash = H256> + Send + Sync + 'static,
433    B::Header: HeaderT<Number = u32>,
434    C::Api: BlockBuilder<B>,
435    C::Api: DebugRuntimeApi<B>,
436    C::Api: EthereumRuntimeRPCApi<B>,
437    C::Api: ApiExt<B>,
438{
439    /// Create a new cache task.
440    ///
441    /// Returns a Future that needs to be added to a tokio executor, and a handle allowing to
442    /// send requests to the task.
443    pub fn create(
444        client: Arc<C>,
445        backend: Arc<BE>,
446        cache_duration: Duration,
447        blocking_permits: Arc<Semaphore>,
448        overrides: Arc<dyn StorageOverride<B>>,
449        prometheus: Option<PrometheusRegistry>,
450    ) -> (impl Future<Output = ()>, CacheRequester) {
451        // Communication with the outside world :
452        let (requester_tx, mut requester_rx) =
453            sc_utils::mpsc::tracing_unbounded("trace-filter-cache", 100_000);
454
455        // Task running in the service.
456        let task = async move {
457			// The following variables are polled by the select! macro, and thus cannot be
458			// part of Self without introducing borrowing issues.
459			let mut batch_expirations = FuturesUnordered::new();
460			let (blocking_tx, mut blocking_rx) =
461				mpsc::channel(blocking_permits.available_permits() * 2);
462			let metrics = if let Some(registry) = prometheus {
463				match Metrics::register(&registry) {
464					Ok(metrics) => Some(metrics),
465					Err(err) => {
466						log::error!(target: "tracing", "Failed to register metrics {err:?}");
467						None
468					}
469				}
470			} else {
471				None
472			};
473			// Contains the inner state of the cache task, excluding the pooled futures/channels.
474			// Having this object allows to refactor each event into its own function, simplifying
475			// the main loop.
476			let mut inner = Self {
477				client,
478				backend,
479				blocking_permits,
480				cached_blocks: BTreeMap::new(),
481				batches: BTreeMap::new(),
482				next_batch_id: 0,
483				metrics,
484				_phantom: Default::default(),
485			};
486
487			// Main event loop. This loop must not contain any direct .await, as we want to
488			// react to events as fast as possible.
489			loop {
490				select! {
491					request = requester_rx.next() => {
492						match request {
493							None => break,
494							Some(CacheRequest::StartBatch {sender, blocks})
495								=> inner.request_start_batch(&blocking_tx, sender, blocks, overrides.clone()),
496							Some(CacheRequest::GetTraces {sender, block})
497								=> inner.request_get_traces(sender, block),
498							Some(CacheRequest::StopBatch {batch_id}) => {
499								// Cannot be refactored inside `request_stop_batch` because
500								// it has an unnamable type :C
501								batch_expirations.push(async move {
502									sleep(cache_duration).await;
503									batch_id
504								});
505
506								inner.request_stop_batch(batch_id);
507							},
508						}
509					},
510					message = blocking_rx.recv().fuse() => {
511						match message {
512							None => (),
513							Some(BlockingTaskMessage::Started { block_hash })
514								=> inner.blocking_started(block_hash),
515							Some(BlockingTaskMessage::Finished { block_hash, result })
516								=> inner.blocking_finished(block_hash, result),
517						}
518					},
519					batch_id = batch_expirations.next() => {
520						match batch_id {
521							None => (),
522							Some(batch_id) => inner.expired_batch(batch_id),
523						}
524					}
525				}
526			}
527		}
528		.instrument(tracing::debug_span!("trace_filter_cache"));
529
530        (task, CacheRequester(requester_tx))
531    }
532
533    /// Handle the creation of a batch.
534    /// Will start the tracing process for blocks that are not already in the cache.
535    #[instrument(skip(self, blocking_tx, sender, blocks, overrides))]
536    fn request_start_batch(
537        &mut self,
538        blocking_tx: &mpsc::Sender<BlockingTaskMessage>,
539        sender: oneshot::Sender<CacheBatchId>,
540        blocks: Vec<H256>,
541        overrides: Arc<dyn StorageOverride<B>>,
542    ) {
543        tracing::trace!("Starting batch {}", self.next_batch_id);
544        self.batches.insert(self.next_batch_id, blocks.clone());
545
546        for block in blocks {
547            // The block is already in the cache, awesome!
548            if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
549                block_cache.active_batch_count += 1;
550                tracing::trace!(
551                    "Cache hit for block {}, now used by {} batches.",
552                    block,
553                    block_cache.active_batch_count
554                );
555            }
556            // Otherwise we need to queue this block for tracing.
557            else {
558                tracing::trace!("Cache miss for block {}, pooling it for tracing.", block);
559
560                let blocking_permits = Arc::clone(&self.blocking_permits);
561                let (unqueue_sender, unqueue_receiver) = oneshot::channel();
562                let client = Arc::clone(&self.client);
563                let backend = Arc::clone(&self.backend);
564                let blocking_tx = blocking_tx.clone();
565                let overrides = overrides.clone();
566
567                // Spawn all block caching asynchronously.
568                // It will wait to obtain a permit, then spawn a blocking task.
569                // When the blocking task returns its result, it is sent
570                // thought a channel to the main task loop.
571                tokio::spawn(
572                    async move {
573                        tracing::trace!("Waiting for blocking permit or task cancellation");
574                        let _permit = select!(
575                            _ = unqueue_receiver.fuse() => {
576                            tracing::trace!("Tracing of the block has been cancelled.");
577                                return;
578                            },
579                            permit = blocking_permits.acquire().fuse() => permit,
580                        );
581
582                        // Warn the main task that block tracing as started, and
583                        // this block cache entry should not be removed.
584                        let _ = blocking_tx
585                            .send(BlockingTaskMessage::Started { block_hash: block })
586                            .await;
587
588                        tracing::trace!("Start block tracing in a blocking task.");
589
590                        // Perform block tracing in a tokio blocking task.
591                        let result = async {
592                            tokio::task::spawn_blocking(move || {
593                                Self::cache_block(client, backend, block, overrides.clone())
594                            })
595                            .await
596                            .map_err(|e| {
597                                format!("Tracing Substrate block {} panicked : {:?}", block, e)
598                            })?
599                        }
600                        .await
601                        .map_err(|e| e.to_string());
602
603                        tracing::trace!("Block tracing finished, sending result to main task.");
604
605                        // Send a response to the main task.
606                        let _ = blocking_tx
607                            .send(BlockingTaskMessage::Finished {
608                                block_hash: block,
609                                result,
610                            })
611                            .await;
612                    }
613                    .instrument(tracing::trace_span!("Block tracing", block = %block)),
614                );
615
616                // Insert the block in the cache.
617                self.cached_blocks.insert(
618                    block,
619                    CacheBlock {
620                        active_batch_count: 1,
621                        state: CacheBlockState::Pooled {
622                            started: false,
623                            waiting_requests: vec![],
624                            unqueue_sender,
625                        },
626                    },
627                );
628            }
629        }
630
631        // Respond with the batch ID.
632        let _ = sender.send(CacheBatchId(self.next_batch_id));
633
634        // Increase batch ID for the next request.
635        self.next_batch_id = self.next_batch_id.overflowing_add(1).0;
636    }
637
638    /// Handle a request to get the traces of the provided block.
639    /// - If the result is stored in the cache, it sends it immediately.
640    /// - If the block is currently being pooled, it is added to this block cache waiting list,
641    ///   and all requests concerning this block will be satisfied when the tracing for this block
642    ///   is finished.
643    /// - If this block is missing from the cache, it means no batch asked for it. All requested
644    ///   blocks should be contained in a batch beforehand, and thus an error is returned.
645    #[instrument(skip(self))]
646    fn request_get_traces(&mut self, sender: oneshot::Sender<TxsTraceRes>, block: H256) {
647        if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
648            match &mut block_cache.state {
649                CacheBlockState::Pooled {
650                    ref mut waiting_requests,
651                    ..
652                } => {
653                    tracing::warn!(
654                        "A request asked a pooled block ({}), adding it to the list of \
655						waiting requests.",
656                        block
657                    );
658                    waiting_requests.push(sender);
659                    if let Some(metrics) = &self.metrics {
660                        metrics.tracing_cache_misses.inc();
661                    }
662                }
663                CacheBlockState::Cached { traces, .. } => {
664                    tracing::warn!(
665                        "A request asked a cached block ({}), sending the traces directly.",
666                        block
667                    );
668                    let _ = sender.send(traces.clone());
669                    if let Some(metrics) = &self.metrics {
670                        metrics.tracing_cache_hits.inc();
671                    }
672                }
673            }
674        } else {
675            tracing::warn!(
676                "An RPC request asked to get a block ({}) which was not batched.",
677                block
678            );
679            let _ = sender.send(Err(format!(
680                "RPC request asked a block ({}) that was not batched",
681                block
682            )));
683        }
684    }
685
686    /// Handle a request to stop a batch.
687    /// For all blocks that needed to be traced, are only in this batch and not yet started, their
688    /// tracing is cancelled to save CPU-time and avoid attacks requesting large amount of blocks.
689    /// This batch data is not yet removed however. Instead a expiration delay timer is started
690    /// after which the data will indeed be cleared. (the code for that is in the main loop code
691    /// as it involved an unnamable type :C)
692    #[instrument(skip(self))]
693    fn request_stop_batch(&mut self, batch_id: CacheBatchId) {
694        tracing::trace!("Stopping batch {}", batch_id.0);
695        if let Some(blocks) = self.batches.get(&batch_id.0) {
696            for block in blocks {
697                let mut remove = false;
698
699                // We remove early the block cache if this batch is the last
700                // pooling this block.
701                if let Some(block_cache) = self.cached_blocks.get_mut(block) {
702                    if block_cache.active_batch_count == 1
703                        && matches!(
704                            block_cache.state,
705                            CacheBlockState::Pooled { started: false, .. }
706                        )
707                    {
708                        remove = true;
709                    }
710                }
711
712                if remove {
713                    tracing::trace!("Pooled block {} is no longer requested.", block);
714                    // Remove block from the cache. Drops the value,
715                    // closing all the channels contained in it.
716                    let _ = self.cached_blocks.remove(block);
717                }
718            }
719        }
720    }
721
722    /// A tracing blocking task notifies it got a permit and is starting the tracing.
723    /// This started status is stored to avoid removing this block entry.
724    #[instrument(skip(self))]
725    fn blocking_started(&mut self, block_hash: H256) {
726        if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
727            if let CacheBlockState::Pooled {
728                ref mut started, ..
729            } = block_cache.state
730            {
731                *started = true;
732            }
733        }
734    }
735
736    /// A tracing blocking task notifies it has finished the tracing and provide the result.
737    #[instrument(skip(self, result))]
738    fn blocking_finished(&mut self, block_hash: H256, result: TxsTraceRes) {
739        // In some cases it might be possible to receive traces of a block
740        // that has no entry in the cache because it was removed of the pool
741        // and received a permit concurrently. We just ignore it.
742        //
743        // TODO : Should we add it back ? Should it have an active_batch_count
744        // of 1 then ?
745        if let Some(block_cache) = self.cached_blocks.get_mut(&block_hash) {
746            if let CacheBlockState::Pooled {
747                ref mut waiting_requests,
748                ..
749            } = block_cache.state
750            {
751                tracing::trace!(
752                    "A new block ({}) has been traced, adding it to the cache and responding to \
753					{} waiting requests.",
754                    block_hash,
755                    waiting_requests.len()
756                );
757                // Send result in waiting channels
758                while let Some(channel) = waiting_requests.pop() {
759                    let _ = channel.send(result.clone());
760                }
761
762                // Update cache entry
763                block_cache.state = CacheBlockState::Cached { traces: result };
764            }
765        }
766    }
767
768    /// A batch expiration delay timer has completed. It performs the cache cleaning for blocks
769    /// not longer used by other batches.
770    #[instrument(skip(self))]
771    fn expired_batch(&mut self, batch_id: CacheBatchId) {
772        if let Some(batch) = self.batches.remove(&batch_id.0) {
773            for block in batch {
774                // For each block of the batch, we remove it if it was the
775                // last batch containing it.
776                let mut remove = false;
777                if let Some(block_cache) = self.cached_blocks.get_mut(&block) {
778                    block_cache.active_batch_count -= 1;
779
780                    if block_cache.active_batch_count == 0 {
781                        remove = true;
782                    }
783                }
784
785                if remove {
786                    let _ = self.cached_blocks.remove(&block);
787                }
788            }
789        }
790    }
791
792    /// (In blocking task) Use the Runtime API to trace the block.
793    #[instrument(skip(client, backend, overrides))]
794    fn cache_block(
795        client: Arc<C>,
796        backend: Arc<BE>,
797        substrate_hash: H256,
798        overrides: Arc<dyn StorageOverride<B>>,
799    ) -> TxsTraceRes {
800        // Get Substrate block data.
801        let api = client.runtime_api();
802        let block_header = client
803            .header(substrate_hash)
804            .map_err(|e| {
805                format!(
806                    "Error when fetching substrate block {} header : {:?}",
807                    substrate_hash, e
808                )
809            })?
810            .ok_or_else(|| format!("Substrate block {} don't exist", substrate_hash))?;
811
812        let height = *block_header.number();
813        let substrate_parent_hash = *block_header.parent_hash();
814
815        // Get Ethereum block data.
816        let (eth_block, eth_transactions) = match (
817            overrides.current_block(substrate_hash),
818            overrides.current_transaction_statuses(substrate_hash),
819        ) {
820            (Some(a), Some(b)) => (a, b),
821            _ => {
822                return Err(format!(
823                    "Failed to get Ethereum block data for Substrate block {}",
824                    substrate_hash
825                ))
826            }
827        };
828
829        let eth_block_hash = eth_block.header.hash();
830        let eth_tx_hashes = eth_transactions
831            .iter()
832            .map(|t| t.transaction_hash)
833            .collect();
834
835        // Get extrinsics (containing Ethereum ones)
836        let extrinsics = backend
837            .blockchain()
838            .body(substrate_hash)
839            .map_err(|e| {
840                format!(
841                    "Blockchain error when fetching extrinsics of block {} : {:?}",
842                    height, e
843                )
844            })?
845            .ok_or_else(|| format!("Could not find block {} when fetching extrinsics.", height))?;
846
847        // Get DebugRuntimeApi version
848        let trace_api_version = if let Ok(Some(api_version)) =
849            api.api_version::<dyn DebugRuntimeApi<B>>(substrate_parent_hash)
850        {
851            api_version
852        } else {
853            return Err("Runtime api version call failed (trace)".to_string());
854        };
855
856        // Trace the block.
857        let f = || -> Result<_, String> {
858            let result = if trace_api_version >= 5 {
859                api.trace_block(
860                    substrate_parent_hash,
861                    extrinsics,
862                    eth_tx_hashes,
863                    &block_header,
864                )
865            } else {
866                // Get core runtime api version
867                let core_api_version = if let Ok(Some(api_version)) =
868                    api.api_version::<dyn Core<B>>(substrate_parent_hash)
869                {
870                    api_version
871                } else {
872                    return Err("Runtime api version call failed (core)".to_string());
873                };
874
875                // Initialize block: calls the "on_initialize" hook on every pallet
876                // in AllPalletsWithSystem
877                // This was fine before pallet-message-queue because the XCM messages
878                // were processed by the "setValidationData" inherent call and not on an
879                // "on_initialize" hook, which runs before enabling XCM tracing
880                if core_api_version >= 5 {
881                    api.initialize_block(substrate_parent_hash, &block_header)
882                        .map_err(|e| format!("Runtime api access error: {:?}", e))?;
883                } else {
884                    #[allow(deprecated)]
885                    api.initialize_block_before_version_5(substrate_parent_hash, &block_header)
886                        .map_err(|e| format!("Runtime api access error: {:?}", e))?;
887                }
888
889                #[allow(deprecated)]
890                api.trace_block_before_version_5(substrate_parent_hash, extrinsics, eth_tx_hashes)
891            };
892
893            result
894                .map_err(|e| format!("Blockchain error when replaying block {} : {:?}", height, e))?
895                .map_err(|e| {
896                    tracing::warn!(
897                        target: "tracing",
898                        "Internal runtime error when replaying block {} : {:?}",
899                        height,
900                        e
901                    );
902                    format!(
903                        "Internal runtime error when replaying block {} : {:?}",
904                        height, e
905                    )
906                })?;
907
908            Ok(moonbeam_rpc_primitives_debug::Response::Block)
909        };
910
911        let eth_transactions_by_index: BTreeMap<u32, H256> = eth_transactions
912            .iter()
913            .map(|t| (t.transaction_index, t.transaction_hash))
914            .collect();
915
916        let mut proxy = moonbeam_client_evm_tracing::listeners::CallList::default();
917        proxy.using(f)?;
918
919        let traces: Vec<TransactionTrace> =
920			moonbeam_client_evm_tracing::formatters::TraceFilter::format(proxy)
921				.ok_or("Fail to format proxy")?
922				.into_iter()
923				.filter_map(|mut trace| {
924					match eth_transactions_by_index.get(&trace.transaction_position) {
925						Some(transaction_hash) => {
926							trace.block_hash = eth_block_hash;
927							trace.block_number = height;
928							trace.transaction_hash = *transaction_hash;
929
930							// Reformat error messages.
931							if let block::TransactionTraceOutput::Error(ref mut error) =
932								trace.output
933							{
934								if error.as_slice() == b"execution reverted" {
935									*error = b"Reverted".to_vec();
936								}
937							}
938
939							Some(trace)
940						}
941						None => {
942							log::warn!(
943								target: "tracing",
944								"A trace in block {} does not map to any known ethereum transaction. Trace: {:?}",
945								height,
946								trace,
947							);
948							None
949						}
950					}
951				})
952				.collect();
953
954        Ok(traces)
955    }
956}
957
958/// Prometheus metrics for tracing.
959#[derive(Clone)]
960pub(crate) struct Metrics {
961    tracing_cache_hits: Counter<U64>,
962    tracing_cache_misses: Counter<U64>,
963}
964
965impl Metrics {
966    pub(crate) fn register(registry: &PrometheusRegistry) -> Result<Self, PrometheusError> {
967        Ok(Self {
968            tracing_cache_hits: register(
969                Counter::new("tracing_cache_hits", "Number of tracing cache hits.")?,
970                registry,
971            )?,
972            tracing_cache_misses: register(
973                Counter::new("tracing_cache_misses", "Number of tracing cache misses.")?,
974                registry,
975            )?,
976        })
977    }
978}