astar_collator/local/
service.rs

1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! Local Service and ServiceFactory implementation. Specialized wrapper over substrate service.
20
21use crate::{
22    evm_tracing_types::{EthApi as EthApiCmd, FrontierConfig},
23    rpc::tracing,
24};
25use cumulus_client_parachain_inherent::MockValidationDataInherentDataProvider;
26use cumulus_primitives_core::{
27    relay_chain::{well_known_keys, AsyncBackingParams, HeadData, UpgradeGoAhead},
28    AbridgedHostConfiguration, CollectCollationInfo, ParaId,
29};
30use fc_consensus::FrontierBlockImport;
31use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
32use fc_storage::StorageOverrideHandler;
33use futures::{FutureExt, StreamExt};
34use parity_scale_codec::Encode;
35use sc_client_api::{Backend, BlockchainEvents};
36use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
37use sc_network::NetworkBackend;
38use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
39use sc_telemetry::{Telemetry, TelemetryWorker};
40use sc_transaction_pool_api::OffchainTransactionPoolFactory;
41use sp_api::ProvideRuntimeApi;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
44use std::{collections::BTreeMap, marker::PhantomData, ops::Sub, sync::Arc, time::Duration};
45
46pub use crate::parachain::fake_runtime_api::RuntimeApi;
47
48use astar_primitives::*;
49
50/// Local pending inherent provider for ETH pending RPC in dev mode.
51pub struct LocalPendingInherentDataProvider<B, C> {
52    client: Arc<C>,
53    para_id: ParaId,
54    phantom_data: PhantomData<B>,
55}
56
57const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6000;
58
59fn build_local_mock_inherent_data(
60    para_id: ParaId,
61    current_para_block: u32,
62    current_para_block_head: Option<HeadData>,
63    relay_blocks_per_para_block: u32,
64    relay_slot: u64,
65    upgrade_go_ahead: Option<UpgradeGoAhead>,
66) -> (
67    sp_timestamp::InherentDataProvider,
68    MockValidationDataInherentDataProvider<()>,
69) {
70    let relay_offset = (relay_slot as u32)
71        .saturating_sub(relay_blocks_per_para_block.saturating_mul(current_para_block));
72
73    let local_host_config = AbridgedHostConfiguration {
74        max_code_size: 16 * 1024 * 1024, // 16 MiB (local dev only)
75        max_head_data_size: 1024 * 1024,
76        max_upward_queue_count: 8,
77        max_upward_queue_size: 1024,
78        max_upward_message_size: 256,
79        max_upward_message_num_per_candidate: 5,
80        hrmp_max_message_num_per_candidate: 5,
81        validation_upgrade_cooldown: 6,
82        validation_upgrade_delay: 6,
83        async_backing_params: AsyncBackingParams {
84            allowed_ancestry_len: 0,
85            max_candidate_depth: 0,
86        },
87    };
88
89    let mocked_parachain = MockValidationDataInherentDataProvider::<()> {
90        current_para_block,
91        para_id,
92        current_para_block_head,
93        relay_blocks_per_para_block,
94        relay_offset,
95        para_blocks_per_relay_epoch: 10,
96        upgrade_go_ahead,
97        additional_key_values: Some(vec![(
98            well_known_keys::ACTIVE_CONFIG.to_vec(),
99            local_host_config.encode(),
100        )]),
101        ..Default::default()
102    };
103
104    let timestamp = relay_slot.saturating_mul(RELAY_CHAIN_SLOT_DURATION_MILLIS);
105    let timestamp_provider = sp_timestamp::InherentDataProvider::new(timestamp.into());
106
107    (timestamp_provider, mocked_parachain)
108}
109
110impl<B, C> LocalPendingInherentDataProvider<B, C> {
111    /// Creates a new instance with the given client and parachain ID.
112    pub fn new(client: Arc<C>, para_id: ParaId) -> Self {
113        Self {
114            client,
115            para_id,
116            phantom_data: Default::default(),
117        }
118    }
119}
120
121#[async_trait::async_trait]
122impl<B, C> sp_inherents::CreateInherentDataProviders<B, ()>
123    for LocalPendingInherentDataProvider<B, C>
124where
125    B: BlockT,
126    C: HeaderBackend<B> + Send + Sync,
127{
128    type InherentDataProviders = (
129        sp_timestamp::InherentDataProvider,
130        MockValidationDataInherentDataProvider<()>,
131    );
132
133    async fn create_inherent_data_providers(
134        &self,
135        _parent: B::Hash,
136        _extra_args: (),
137    ) -> Result<Self::InherentDataProviders, Box<dyn std::error::Error + Send + Sync>> {
138        let relay_slot = std::time::SystemTime::now()
139            .duration_since(std::time::UNIX_EPOCH)
140            .expect("Current time is always after UNIX_EPOCH; qed")
141            .as_millis() as u64
142            / RELAY_CHAIN_SLOT_DURATION_MILLIS;
143
144        let current_para_block = self
145            .client
146            .header(_parent)?
147            .map(|header| {
148                UniqueSaturatedInto::<u32>::unique_saturated_into(*header.number())
149                    .saturating_add(1)
150            })
151            .unwrap_or(1);
152
153        let current_para_block_head = self
154            .client
155            .header(_parent)?
156            .map(|header| header.encode().into());
157
158        let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
159            self.para_id,
160            current_para_block,
161            current_para_block_head,
162            1,
163            relay_slot,
164            None,
165        );
166
167        Ok((timestamp_provider, mocked_parachain))
168    }
169}
170
171/// Parachain host functions
172#[cfg(feature = "runtime-benchmarks")]
173pub type HostFunctions = (
174    frame_benchmarking::benchmarking::HostFunctions,
175    cumulus_client_service::ParachainHostFunctions,
176    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
177);
178
179/// Parachain host functions
180#[cfg(not(feature = "runtime-benchmarks"))]
181pub type HostFunctions = (
182    cumulus_client_service::ParachainHostFunctions,
183    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
184);
185
186type ParachainExecutor = WasmExecutor<HostFunctions>;
187
188type FullClient = sc_service::TFullClient<Block, RuntimeApi, ParachainExecutor>;
189type FullBackend = sc_service::TFullBackend<Block>;
190type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
191
192/// Build a partial chain component config
193pub fn new_partial(
194    config: &Configuration,
195    evm_tracing_config: &FrontierConfig,
196) -> Result<
197    sc_service::PartialComponents<
198        FullClient,
199        FullBackend,
200        FullSelectChain,
201        sc_consensus::DefaultImportQueue<Block>,
202        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
203        (
204            FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
205            Option<Telemetry>,
206            Arc<fc_db::Backend<Block, FullClient>>,
207        ),
208    >,
209    ServiceError,
210> {
211    let telemetry = config
212        .telemetry_endpoints
213        .clone()
214        .filter(|x| !x.is_empty())
215        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
216            let worker = TelemetryWorker::new(16)?;
217            let telemetry = worker.handle().new_telemetry(endpoints);
218            Ok((worker, telemetry))
219        })
220        .transpose()?;
221
222    let heap_pages = config
223        .executor
224        .default_heap_pages
225        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
226            extra_pages: h as _,
227        });
228
229    let executor = ParachainExecutor::builder()
230        .with_execution_method(config.executor.wasm_method)
231        .with_onchain_heap_alloc_strategy(heap_pages)
232        .with_offchain_heap_alloc_strategy(heap_pages)
233        .with_max_runtime_instances(config.executor.max_runtime_instances)
234        .with_runtime_cache_size(config.executor.runtime_cache_size)
235        .build();
236
237    let (client, backend, keystore_container, task_manager) =
238        sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
239            config,
240            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
241            executor,
242            true,
243        )?;
244    let client = Arc::new(client);
245    let telemetry = telemetry.map(|(worker, telemetry)| {
246        task_manager
247            .spawn_handle()
248            .spawn("telemetry", None, worker.run());
249        telemetry
250    });
251    let select_chain = sc_consensus::LongestChain::new(backend.clone());
252    let transaction_pool = sc_transaction_pool::Builder::new(
253        task_manager.spawn_essential_handle(),
254        client.clone(),
255        config.role.is_authority().into(),
256    )
257    .with_options(config.transaction_pool.clone())
258    .with_prometheus(config.prometheus_registry())
259    .build();
260    let frontier_backend = Arc::new(crate::rpc::open_frontier_backend(
261        client.clone(),
262        config,
263        evm_tracing_config,
264    )?);
265    let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
266
267    let import_queue = sc_consensus_manual_seal::import_queue(
268        Box::new(client.clone()),
269        &task_manager.spawn_essential_handle(),
270        config.prometheus_registry(),
271    );
272
273    Ok(sc_service::PartialComponents {
274        client,
275        backend,
276        task_manager,
277        import_queue,
278        keystore_container,
279        select_chain,
280        transaction_pool: transaction_pool.into(),
281        other: (frontier_block_import, telemetry, frontier_backend),
282    })
283}
284
285/// Builds a new local development service (parachain-oriented).
286pub fn start_node<N>(
287    mut config: Configuration,
288    evm_tracing_config: FrontierConfig,
289) -> Result<TaskManager, ServiceError>
290where
291    N: NetworkBackend<Block, <Block as BlockT>::Hash>,
292{
293    let sc_service::PartialComponents {
294        client,
295        backend,
296        mut task_manager,
297        import_queue,
298        keystore_container,
299        select_chain,
300        transaction_pool,
301        other: (block_import, mut telemetry, frontier_backend),
302    } = new_partial(&config, &evm_tracing_config)?;
303
304    // Dev node: no peers
305    config.network.default_peers_set.in_peers = 0;
306    config.network.default_peers_set.out_peers = 0;
307
308    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
309        &config.network,
310        config.prometheus_registry().cloned(),
311    );
312
313    let metrics = N::register_notification_metrics(
314        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
315    );
316    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
317        sc_service::build_network(sc_service::BuildNetworkParams {
318            config: &config,
319            net_config,
320            client: client.clone(),
321            transaction_pool: transaction_pool.clone(),
322            spawn_handle: task_manager.spawn_handle(),
323            import_queue,
324            block_announce_validator_builder: None,
325            warp_sync_config: None,
326            block_relay: None,
327            metrics,
328        })?;
329
330    if config.offchain_worker.enabled {
331        task_manager.spawn_handle().spawn(
332            "offchain-workers-runner",
333            "offchain-work",
334            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
335                runtime_api_provider: client.clone(),
336                keystore: Some(keystore_container.keystore()),
337                offchain_db: backend.offchain_storage(),
338                transaction_pool: Some(OffchainTransactionPoolFactory::new(
339                    transaction_pool.clone(),
340                )),
341                network_provider: Arc::new(network.clone()),
342                is_validator: config.role.is_authority(),
343                enable_http_requests: true,
344                custom_extensions: move |_| vec![],
345            })?
346            .run(client.clone(), task_manager.spawn_handle())
347            .boxed(),
348        );
349    }
350
351    let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
352    let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
353    let storage_override = Arc::new(StorageOverrideHandler::new(client.clone()));
354
355    // Sinks for pubsub notifications.
356    // Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
357    // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel.
358    // This way we avoid race conditions when using native substrate block import notification stream.
359    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
360        fc_mapping_sync::EthereumBlockNotification<Block>,
361    > = Default::default();
362    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
363
364    let ethapi_cmd = evm_tracing_config.ethapi.clone();
365
366    let tracing_requesters =
367        if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
368            tracing::spawn_tracing_tasks(
369                &evm_tracing_config,
370                config.prometheus_registry().cloned(),
371                tracing::SpawnTasksParams {
372                    task_manager: &task_manager,
373                    client: client.clone(),
374                    substrate_backend: backend.clone(),
375                    frontier_backend: frontier_backend.clone(),
376                    storage_override: storage_override.clone(),
377                },
378            )
379        } else {
380            tracing::RpcRequesters {
381                debug: None,
382                trace: None,
383            }
384        };
385
386    // Frontier offchain DB task. Essential.
387    // Maps emulated ethereum data to substrate native data.
388    match frontier_backend.as_ref() {
389        fc_db::Backend::KeyValue(ref b) => {
390            task_manager.spawn_essential_handle().spawn(
391                "frontier-mapping-sync-worker",
392                Some("frontier"),
393                fc_mapping_sync::kv::MappingSyncWorker::new(
394                    client.import_notification_stream(),
395                    Duration::new(6, 0),
396                    client.clone(),
397                    backend.clone(),
398                    storage_override.clone(),
399                    b.clone(),
400                    3,
401                    0,
402                    fc_mapping_sync::SyncStrategy::Parachain,
403                    sync_service.clone(),
404                    pubsub_notification_sinks.clone(),
405                )
406                .for_each(|()| futures::future::ready(())),
407            );
408        }
409        fc_db::Backend::Sql(ref b) => {
410            task_manager.spawn_essential_handle().spawn_blocking(
411                "frontier-mapping-sync-worker",
412                Some("frontier"),
413                fc_mapping_sync::sql::SyncWorker::run(
414                    client.clone(),
415                    backend.clone(),
416                    b.clone(),
417                    client.import_notification_stream(),
418                    fc_mapping_sync::sql::SyncWorkerConfig {
419                        read_notification_timeout: Duration::from_secs(10),
420                        check_indexed_blocks_interval: Duration::from_secs(60),
421                    },
422                    fc_mapping_sync::SyncStrategy::Parachain,
423                    sync_service.clone(),
424                    pubsub_notification_sinks.clone(),
425                ),
426            );
427        }
428    }
429
430    // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters.
431    // Each filter is allowed to stay in the pool for 100 blocks.
432    const FILTER_RETAIN_THRESHOLD: u64 = 100;
433    task_manager.spawn_essential_handle().spawn(
434        "frontier-filter-pool",
435        Some("frontier"),
436        fc_rpc::EthTask::filter_pool_task(
437            client.clone(),
438            filter_pool.clone(),
439            FILTER_RETAIN_THRESHOLD,
440        ),
441    );
442
443    const FEE_HISTORY_LIMIT: u64 = 2048;
444    task_manager.spawn_essential_handle().spawn(
445        "frontier-fee-history",
446        Some("frontier"),
447        fc_rpc::EthTask::fee_history_task(
448            client.clone(),
449            storage_override.clone(),
450            fee_history_cache.clone(),
451            FEE_HISTORY_LIMIT,
452        ),
453    );
454
455    let role = config.role.clone();
456    let prometheus_registry = config.prometheus_registry().cloned();
457    let is_authority = config.role.is_authority();
458
459    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
460        task_manager.spawn_handle(),
461        storage_override.clone(),
462        50,
463        50,
464        prometheus_registry.clone(),
465    ));
466
467    // Channel for the rpc handler to communicate with the authorship task.
468    let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024);
469    let local_para_id = ParaId::from(
470        crate::parachain::chain_spec::Extensions::try_get(&*config.chain_spec)
471            .map(|e| e.para_id)
472            .unwrap_or(2000),
473    );
474
475    let rpc_extensions_builder = {
476        let client = client.clone();
477        let network = network.clone();
478        let transaction_pool = transaction_pool.clone();
479        let sync = sync_service.clone();
480        let pubsub_notification_sinks = pubsub_notification_sinks.clone();
481
482        Box::new(move |subscription| {
483            let deps = crate::rpc::FullDeps {
484                client: client.clone(),
485                pool: transaction_pool.clone(),
486                graph: transaction_pool.clone(),
487                network: network.clone(),
488                sync: sync.clone(),
489                is_authority,
490                frontier_backend: match *frontier_backend {
491                    fc_db::Backend::KeyValue(ref b) => b.clone(),
492                    fc_db::Backend::Sql(ref b) => b.clone(),
493                },
494                filter_pool: filter_pool.clone(),
495                fee_history_limit: FEE_HISTORY_LIMIT,
496                fee_history_cache: fee_history_cache.clone(),
497                block_data_cache: block_data_cache.clone(),
498                storage_override: storage_override.clone(),
499                enable_evm_rpc: true, // enable EVM RPC for dev node by default
500                command_sink: Some(command_sink.clone()),
501            };
502
503            crate::rpc::create_full_local_dev(
504                deps,
505                subscription,
506                pubsub_notification_sinks.clone(),
507                local_para_id,
508                crate::rpc::EvmTracingConfig {
509                    tracing_requesters: tracing_requesters.clone(),
510                    trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count,
511                    enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool),
512                },
513            )
514            .map_err::<ServiceError, _>(Into::into)
515        })
516    };
517
518    let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
519        network: network.clone(),
520        client: client.clone(),
521        keystore: keystore_container.keystore(),
522        task_manager: &mut task_manager,
523        transaction_pool: transaction_pool.clone(),
524        rpc_builder: rpc_extensions_builder,
525        backend,
526        system_rpc_tx,
527        tx_handler_controller,
528        sync_service: sync_service.clone(),
529        config,
530        telemetry: telemetry.as_mut(),
531    })?;
532
533    if role.is_authority() {
534        let proposer_factory = sc_basic_authorship::ProposerFactory::new(
535            task_manager.spawn_handle(),
536            client.clone(),
537            transaction_pool.clone(),
538            prometheus_registry.as_ref(),
539            telemetry.as_ref().map(|x| x.handle()),
540        );
541
542        let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
543
544        let para_id = local_para_id;
545        let initial_relay_slot = std::time::SystemTime::now()
546            .duration_since(std::time::UNIX_EPOCH)
547            .expect("Current time is always after UNIX_EPOCH; qed")
548            .sub(Duration::from_secs(2 * 60 * 60))
549            .as_millis() as u64
550            / RELAY_CHAIN_SLOT_DURATION_MILLIS;
551
552        let aura =
553            sc_consensus_manual_seal::run_manual_seal(sc_consensus_manual_seal::ManualSealParams {
554                block_import,
555                env: proposer_factory,
556                client: client.clone(),
557                pool: transaction_pool.clone(),
558                commands_stream,
559                select_chain,
560                consensus_data_provider: Some(Box::new(
561                    sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new(
562                        client.clone(),
563                    ),
564                )),
565                create_inherent_data_providers: move |parent_hash, ()| {
566                    let client = client.clone();
567                    async move {
568                        let current_para_head = client
569                            .header(parent_hash)
570                            .expect("Header lookup should succeed")
571                            .expect("Header passed in as parent should be present in backend.");
572
573                        let should_send_go_ahead = client
574                            .runtime_api()
575                            .collect_collation_info(parent_hash, &current_para_head)
576                            .map(|info| info.new_validation_code.is_some())
577                            .unwrap_or_default();
578
579                        let current_para_block = UniqueSaturatedInto::<u32>::unique_saturated_into(
580                            *current_para_head.number(),
581                        ) + 1;
582
583                        let relay_blocks_per_para_block =
584                            (slot_duration.as_millis() / RELAY_CHAIN_SLOT_DURATION_MILLIS).max(1)
585                                as u32;
586                        let current_para_block_u64 = u64::from(current_para_block);
587                        let relay_blocks_per_para_block_u64 =
588                            u64::from(relay_blocks_per_para_block);
589                        let target_relay_slot = initial_relay_slot.saturating_add(
590                            current_para_block_u64.saturating_mul(relay_blocks_per_para_block_u64),
591                        );
592
593                        let current_para_block_head = Some(current_para_head.encode().into());
594
595                        let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
596                            para_id,
597                            current_para_block,
598                            current_para_block_head,
599                            relay_blocks_per_para_block,
600                            target_relay_slot,
601                            should_send_go_ahead.then_some(UpgradeGoAhead::GoAhead),
602                        );
603
604                        Ok((timestamp_provider, mocked_parachain))
605                    }
606                },
607            });
608
609        task_manager
610            .spawn_essential_handle()
611            .spawn_blocking("aura", Some("block-authoring"), aura);
612    }
613
614    Ok(task_manager)
615}