astar_collator/local/
service.rs

1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! Local Service and ServiceFactory implementation. Specialized wrapper over substrate service.
20
21use crate::{
22    evm_tracing_types::{EthApi as EthApiCmd, FrontierConfig},
23    rpc::tracing,
24};
25use fc_consensus::FrontierBlockImport;
26use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
27use fc_storage::StorageOverrideHandler;
28use futures::{FutureExt, StreamExt};
29use sc_client_api::{Backend, BlockBackend, BlockchainEvents};
30use sc_consensus_grandpa::SharedVoterState;
31use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
32use sc_network::NetworkBackend;
33use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
34use sc_telemetry::{Telemetry, TelemetryWorker};
35use sc_transaction_pool_api::OffchainTransactionPoolFactory;
36#[cfg(not(feature = "manual-seal"))]
37use sp_consensus_aura::sr25519::AuthorityPair as AuraPair;
38use sp_runtime::traits::Block as BlockT;
39use std::{collections::BTreeMap, sync::Arc, time::Duration};
40
41pub use local_runtime::RuntimeApi;
42
43use astar_primitives::*;
44
45/// The minimum period of blocks on which justifications will be
46/// imported and generated.
47const GRANDPA_JUSTIFICATION_PERIOD: u32 = 512;
48
49/// Parachain host functions
50#[cfg(feature = "runtime-benchmarks")]
51pub type HostFunctions = (
52    frame_benchmarking::benchmarking::HostFunctions,
53    cumulus_client_service::ParachainHostFunctions,
54    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
55);
56
57/// Parachain host functions
58#[cfg(not(feature = "runtime-benchmarks"))]
59pub type HostFunctions = (
60    cumulus_client_service::ParachainHostFunctions,
61    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
62);
63
64type ParachainExecutor = WasmExecutor<HostFunctions>;
65
66type FullClient = sc_service::TFullClient<Block, RuntimeApi, ParachainExecutor>;
67type FullBackend = sc_service::TFullBackend<Block>;
68type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
69
70/// Build a partial chain component config
71pub fn new_partial(
72    config: &Configuration,
73    evm_tracing_config: &FrontierConfig,
74) -> Result<
75    sc_service::PartialComponents<
76        FullClient,
77        FullBackend,
78        FullSelectChain,
79        sc_consensus::DefaultImportQueue<Block>,
80        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
81        (
82            FrontierBlockImport<
83                Block,
84                sc_consensus_grandpa::GrandpaBlockImport<
85                    FullBackend,
86                    Block,
87                    FullClient,
88                    FullSelectChain,
89                >,
90                FullClient,
91            >,
92            sc_consensus_grandpa::LinkHalf<Block, FullClient, FullSelectChain>,
93            Option<Telemetry>,
94            Arc<fc_db::Backend<Block, FullClient>>,
95        ),
96    >,
97    ServiceError,
98> {
99    let telemetry = config
100        .telemetry_endpoints
101        .clone()
102        .filter(|x| !x.is_empty())
103        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
104            let worker = TelemetryWorker::new(16)?;
105            let telemetry = worker.handle().new_telemetry(endpoints);
106            Ok((worker, telemetry))
107        })
108        .transpose()?;
109
110    let heap_pages = config
111        .executor
112        .default_heap_pages
113        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
114            extra_pages: h as _,
115        });
116
117    let executor = ParachainExecutor::builder()
118        .with_execution_method(config.executor.wasm_method)
119        .with_onchain_heap_alloc_strategy(heap_pages)
120        .with_offchain_heap_alloc_strategy(heap_pages)
121        .with_max_runtime_instances(config.executor.max_runtime_instances)
122        .with_runtime_cache_size(config.executor.runtime_cache_size)
123        .build();
124
125    let (client, backend, keystore_container, task_manager) =
126        sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
127            config,
128            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
129            executor,
130            true,
131        )?;
132    let client = Arc::new(client);
133    let telemetry = telemetry.map(|(worker, telemetry)| {
134        task_manager
135            .spawn_handle()
136            .spawn("telemetry", None, worker.run());
137        telemetry
138    });
139    let select_chain = sc_consensus::LongestChain::new(backend.clone());
140    let transaction_pool = sc_transaction_pool::Builder::new(
141        task_manager.spawn_essential_handle(),
142        client.clone(),
143        config.role.is_authority().into(),
144    )
145    .with_options(config.transaction_pool.clone())
146    .with_prometheus(config.prometheus_registry())
147    .build();
148    let (grandpa_block_import, grandpa_link) = sc_consensus_grandpa::block_import(
149        client.clone(),
150        GRANDPA_JUSTIFICATION_PERIOD,
151        &(client.clone() as Arc<_>),
152        select_chain.clone(),
153        telemetry.as_ref().map(|x| x.handle()),
154    )?;
155    let frontier_backend = Arc::new(crate::rpc::open_frontier_backend(
156        client.clone(),
157        config,
158        evm_tracing_config,
159    )?);
160    let frontier_block_import =
161        FrontierBlockImport::new(grandpa_block_import.clone(), client.clone());
162
163    #[cfg(feature = "manual-seal")]
164    let import_queue = sc_consensus_manual_seal::import_queue(
165        Box::new(client.clone()),
166        &task_manager.spawn_essential_handle(),
167        config.prometheus_registry(),
168    );
169
170    #[cfg(not(feature = "manual-seal"))]
171    let import_queue = {
172        let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
173        sc_consensus_aura::import_queue::<AuraPair, _, _, _, _, _>(
174            sc_consensus_aura::ImportQueueParams {
175                block_import: frontier_block_import.clone(),
176                justification_import: Some(Box::new(grandpa_block_import)),
177                client: client.clone(),
178                create_inherent_data_providers: move |_, ()| async move {
179                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
180                    let slot =
181                        sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
182                            *timestamp,
183                            slot_duration,
184                        );
185                    Ok((slot, timestamp))
186                },
187                spawner: &task_manager.spawn_essential_handle(),
188                registry: config.prometheus_registry(),
189                check_for_equivocation: Default::default(),
190                telemetry: telemetry.as_ref().map(|x| x.handle()),
191                compatibility_mode: Default::default(),
192            },
193        )?
194    };
195
196    Ok(sc_service::PartialComponents {
197        client,
198        backend,
199        task_manager,
200        import_queue,
201        keystore_container,
202        select_chain,
203        transaction_pool: transaction_pool.into(),
204        other: (
205            frontier_block_import,
206            grandpa_link,
207            telemetry,
208            frontier_backend,
209        ),
210    })
211}
212
213/// Builds a new service.
214pub fn start_node<N>(
215    config: Configuration,
216    evm_tracing_config: FrontierConfig,
217) -> Result<TaskManager, ServiceError>
218where
219    N: NetworkBackend<Block, <Block as BlockT>::Hash>,
220{
221    let sc_service::PartialComponents {
222        client,
223        backend,
224        mut task_manager,
225        import_queue,
226        keystore_container,
227        select_chain,
228        transaction_pool,
229        other: (block_import, grandpa_link, mut telemetry, frontier_backend),
230    } = new_partial(&config, &evm_tracing_config)?;
231
232    let protocol_name = sc_consensus_grandpa::protocol_standard_name(
233        &client
234            .block_hash(0)
235            .ok()
236            .flatten()
237            .expect("Genesis block exists; qed"),
238        &config.chain_spec,
239    );
240    let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
241        &config.network,
242        config.prometheus_registry().cloned(),
243    );
244
245    let metrics = N::register_notification_metrics(
246        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
247    );
248    let peer_store_handle = net_config.peer_store_handle();
249
250    let (grandpa_protocol_config, grandpa_notification_service) =
251        sc_consensus_grandpa::grandpa_peers_set_config::<_, N>(
252            protocol_name.clone(),
253            metrics.clone(),
254            Arc::clone(&peer_store_handle),
255        );
256    net_config.add_notification_protocol(grandpa_protocol_config);
257
258    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
259        sc_service::build_network(sc_service::BuildNetworkParams {
260            config: &config,
261            net_config,
262            client: client.clone(),
263            transaction_pool: transaction_pool.clone(),
264            spawn_handle: task_manager.spawn_handle(),
265            import_queue,
266            block_announce_validator_builder: None,
267            warp_sync_config: None,
268            block_relay: None,
269            metrics,
270        })?;
271
272    if config.offchain_worker.enabled {
273        task_manager.spawn_handle().spawn(
274            "offchain-workers-runner",
275            "offchain-work",
276            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
277                runtime_api_provider: client.clone(),
278                keystore: Some(keystore_container.keystore()),
279                offchain_db: backend.offchain_storage(),
280                transaction_pool: Some(OffchainTransactionPoolFactory::new(
281                    transaction_pool.clone(),
282                )),
283                network_provider: Arc::new(network.clone()),
284                is_validator: config.role.is_authority(),
285                enable_http_requests: true,
286                custom_extensions: move |_| vec![],
287            })?
288            .run(client.clone(), task_manager.spawn_handle())
289            .boxed(),
290        );
291    }
292
293    let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
294    let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
295    let storage_override = Arc::new(StorageOverrideHandler::new(client.clone()));
296
297    // Sinks for pubsub notifications.
298    // Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
299    // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel.
300    // This way we avoid race conditions when using native substrate block import notification stream.
301    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
302        fc_mapping_sync::EthereumBlockNotification<Block>,
303    > = Default::default();
304    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
305
306    let ethapi_cmd = evm_tracing_config.ethapi.clone();
307
308    let tracing_requesters =
309        if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
310            tracing::spawn_tracing_tasks(
311                &evm_tracing_config,
312                config.prometheus_registry().cloned(),
313                tracing::SpawnTasksParams {
314                    task_manager: &task_manager,
315                    client: client.clone(),
316                    substrate_backend: backend.clone(),
317                    frontier_backend: frontier_backend.clone(),
318                    storage_override: storage_override.clone(),
319                },
320            )
321        } else {
322            tracing::RpcRequesters {
323                debug: None,
324                trace: None,
325            }
326        };
327
328    // Frontier offchain DB task. Essential.
329    // Maps emulated ethereum data to substrate native data.
330    match frontier_backend.as_ref() {
331        fc_db::Backend::KeyValue(ref b) => {
332            task_manager.spawn_essential_handle().spawn(
333                "frontier-mapping-sync-worker",
334                Some("frontier"),
335                fc_mapping_sync::kv::MappingSyncWorker::new(
336                    client.import_notification_stream(),
337                    Duration::new(6, 0),
338                    client.clone(),
339                    backend.clone(),
340                    storage_override.clone(),
341                    b.clone(),
342                    3,
343                    0,
344                    fc_mapping_sync::SyncStrategy::Parachain,
345                    sync_service.clone(),
346                    pubsub_notification_sinks.clone(),
347                )
348                .for_each(|()| futures::future::ready(())),
349            );
350        }
351        fc_db::Backend::Sql(ref b) => {
352            task_manager.spawn_essential_handle().spawn_blocking(
353                "frontier-mapping-sync-worker",
354                Some("frontier"),
355                fc_mapping_sync::sql::SyncWorker::run(
356                    client.clone(),
357                    backend.clone(),
358                    b.clone(),
359                    client.import_notification_stream(),
360                    fc_mapping_sync::sql::SyncWorkerConfig {
361                        read_notification_timeout: Duration::from_secs(10),
362                        check_indexed_blocks_interval: Duration::from_secs(60),
363                    },
364                    fc_mapping_sync::SyncStrategy::Parachain,
365                    sync_service.clone(),
366                    pubsub_notification_sinks.clone(),
367                ),
368            );
369        }
370    }
371
372    // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters.
373    // Each filter is allowed to stay in the pool for 100 blocks.
374    const FILTER_RETAIN_THRESHOLD: u64 = 100;
375    task_manager.spawn_essential_handle().spawn(
376        "frontier-filter-pool",
377        Some("frontier"),
378        fc_rpc::EthTask::filter_pool_task(
379            client.clone(),
380            filter_pool.clone(),
381            FILTER_RETAIN_THRESHOLD,
382        ),
383    );
384
385    const FEE_HISTORY_LIMIT: u64 = 2048;
386    task_manager.spawn_essential_handle().spawn(
387        "frontier-fee-history",
388        Some("frontier"),
389        fc_rpc::EthTask::fee_history_task(
390            client.clone(),
391            storage_override.clone(),
392            fee_history_cache.clone(),
393            FEE_HISTORY_LIMIT,
394        ),
395    );
396
397    #[cfg(not(feature = "manual-seal"))]
398    let force_authoring = config.force_authoring;
399    #[cfg(not(feature = "manual-seal"))]
400    let backoff_authoring_blocks: Option<()> = None;
401
402    let role = config.role.clone();
403    let name = config.network.node_name.clone();
404    let enable_grandpa = !config.disable_grandpa;
405    let prometheus_registry = config.prometheus_registry().cloned();
406    let is_authority = config.role.is_authority();
407
408    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
409        task_manager.spawn_handle(),
410        storage_override.clone(),
411        50,
412        50,
413        prometheus_registry.clone(),
414    ));
415
416    // Channel for the rpc handler to communicate with the authorship task.
417    #[cfg(feature = "manual-seal")]
418    let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024);
419
420    let rpc_extensions_builder = {
421        let client = client.clone();
422        let network = network.clone();
423        let transaction_pool = transaction_pool.clone();
424        let sync = sync_service.clone();
425        let pubsub_notification_sinks = pubsub_notification_sinks.clone();
426
427        Box::new(move |subscription| {
428            let deps = crate::rpc::FullDeps {
429                client: client.clone(),
430                pool: transaction_pool.clone(),
431                graph: transaction_pool.clone(),
432                network: network.clone(),
433                sync: sync.clone(),
434                is_authority,
435                frontier_backend: match *frontier_backend {
436                    fc_db::Backend::KeyValue(ref b) => b.clone(),
437                    fc_db::Backend::Sql(ref b) => b.clone(),
438                },
439                filter_pool: filter_pool.clone(),
440                fee_history_limit: FEE_HISTORY_LIMIT,
441                fee_history_cache: fee_history_cache.clone(),
442                block_data_cache: block_data_cache.clone(),
443                storage_override: storage_override.clone(),
444                enable_evm_rpc: true, // enable EVM RPC for dev node by default
445                #[cfg(feature = "manual-seal")]
446                command_sink: Some(command_sink.clone()),
447            };
448
449            crate::rpc::create_full(
450                deps,
451                subscription,
452                pubsub_notification_sinks.clone(),
453                crate::rpc::EvmTracingConfig {
454                    tracing_requesters: tracing_requesters.clone(),
455                    trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count,
456                    enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool),
457                },
458            )
459            .map_err::<ServiceError, _>(Into::into)
460        })
461    };
462
463    let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
464        network: network.clone(),
465        client: client.clone(),
466        keystore: keystore_container.keystore(),
467        task_manager: &mut task_manager,
468        transaction_pool: transaction_pool.clone(),
469        rpc_builder: rpc_extensions_builder,
470        backend,
471        system_rpc_tx,
472        tx_handler_controller,
473        sync_service: sync_service.clone(),
474        config,
475        telemetry: telemetry.as_mut(),
476    })?;
477
478    if role.is_authority() {
479        let proposer_factory = sc_basic_authorship::ProposerFactory::new(
480            task_manager.spawn_handle(),
481            client.clone(),
482            transaction_pool.clone(),
483            prometheus_registry.as_ref(),
484            telemetry.as_ref().map(|x| x.handle()),
485        );
486
487        let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
488
489        #[cfg(feature = "manual-seal")]
490        let aura = sc_consensus_manual_seal::run_manual_seal(
491            sc_consensus_manual_seal::ManualSealParams {
492                block_import,
493                env: proposer_factory,
494                client: client.clone(),
495                pool: transaction_pool.clone(),
496                commands_stream,
497                select_chain,
498                consensus_data_provider: Some(Box::new(
499                    sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new(
500                        client.clone(),
501                    ),
502                )),
503                create_inherent_data_providers: move |_, ()| async move {
504                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
505                    let slot =
506                        sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
507                            *timestamp,
508                            slot_duration.clone(),
509                        );
510                    Ok((slot, timestamp))
511                },
512            },
513        );
514
515        #[cfg(not(feature = "manual-seal"))]
516        let aura = sc_consensus_aura::start_aura::<AuraPair, _, _, _, _, _, _, _, _, _, _>(
517            sc_consensus_aura::StartAuraParams {
518                slot_duration,
519                client,
520                select_chain,
521                block_import,
522                proposer_factory,
523                create_inherent_data_providers: move |_, ()| async move {
524                    let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
525
526                    let slot =
527                        sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
528                            *timestamp,
529                            slot_duration,
530                        );
531
532                    Ok((slot, timestamp))
533                },
534                force_authoring,
535                backoff_authoring_blocks,
536                keystore: keystore_container.keystore(),
537                sync_oracle: sync_service.clone(),
538                justification_sync_link: sync_service.clone(),
539                block_proposal_slot_portion: sc_consensus_aura::SlotProportion::new(2f32 / 3f32),
540                max_block_proposal_slot_portion: None,
541                telemetry: telemetry.as_ref().map(|x| x.handle()),
542                compatibility_mode: Default::default(),
543            },
544        )?;
545
546        // the AURA authoring task is considered essential, i.e. if it
547        // fails we take down the service with it.
548        task_manager
549            .spawn_essential_handle()
550            .spawn_blocking("aura", Some("block-authoring"), aura);
551    }
552
553    // if the node isn't actively participating in consensus then it doesn't
554    // need a keystore, regardless of which protocol we use below.
555    let keystore = if role.is_authority() {
556        Some(keystore_container.keystore())
557    } else {
558        None
559    };
560
561    let grandpa_config = sc_consensus_grandpa::Config {
562        // FIXME #1578 make this available through chainspec
563        gossip_duration: Duration::from_millis(333),
564        justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD,
565        name: Some(name),
566        observer_enabled: false,
567        keystore,
568        local_role: role,
569        telemetry: telemetry.as_ref().map(|x| x.handle()),
570        protocol_name,
571    };
572
573    if enable_grandpa {
574        // start the full GRANDPA voter
575        // NOTE: non-authorities could run the GRANDPA observer protocol, but at
576        // this point the full voter should provide better guarantees of block
577        // and vote data availability than the observer. The observer has not
578        // been tested extensively yet and having most nodes in a network run it
579        // could lead to finality stalls.
580        let grandpa_config = sc_consensus_grandpa::GrandpaParams {
581            config: grandpa_config,
582            link: grandpa_link,
583            network,
584            sync: Arc::new(sync_service),
585            notification_service: grandpa_notification_service,
586            voting_rule: sc_consensus_grandpa::VotingRulesBuilder::default().build(),
587            prometheus_registry,
588            shared_voter_state: SharedVoterState::empty(),
589            telemetry: telemetry.as_ref().map(|x| x.handle()),
590            offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(transaction_pool),
591        };
592
593        // the GRANDPA voter task is considered infallible, i.e.
594        // if it fails we take down the service with it.
595        task_manager.spawn_essential_handle().spawn_blocking(
596            "grandpa-voter",
597            None,
598            sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?,
599        );
600    }
601
602    Ok(task_manager)
603}