astar_collator/parachain/
service.rs

1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! Parachain Service and ServiceFactory implementation.
20
21use astar_primitives::*;
22use cumulus_client_cli::CollatorOptions;
23use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
24use cumulus_client_consensus_common::ParachainBlockImport;
25use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier;
26use cumulus_client_service::{
27    prepare_node_config, start_relay_chain_tasks, BuildNetworkParams, DARecoveryProfile,
28    StartRelayChainTasksParams,
29};
30use cumulus_primitives_core::{
31    relay_chain::{CollatorPair, ValidationCode},
32    ParaId,
33};
34use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
35use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
36use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
37use fc_consensus::FrontierBlockImport;
38use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
39use fc_storage::StorageOverrideHandler;
40use futures::StreamExt;
41use sc_client_api::BlockchainEvents;
42use sc_consensus::{import_queue::BasicQueue, ImportQueue};
43use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
44use sc_network::{config::NetworkBackendType, NetworkBackend, NetworkBlock};
45use sc_network_sync::SyncingService;
46use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
47use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
48use sp_api::{ApiExt, ProvideRuntimeApi};
49use sp_consensus_aura::{
50    sr25519::AuthorityId as AuraId, sr25519::AuthorityPair as AuraPair, AuraApi,
51};
52use sp_keystore::KeystorePtr;
53use sp_runtime::{traits::Block as BlockT, Percent};
54use std::{collections::BTreeMap, sync::Arc, time::Duration};
55use substrate_prometheus_endpoint::Registry;
56
57use super::shell_upgrade::*;
58
59use crate::{
60    evm_tracing_types::{EthApi as EthApiCmd, FrontierConfig},
61    rpc::tracing,
62};
63
64/// Parachain host functions
65#[cfg(feature = "runtime-benchmarks")]
66pub type HostFunctions = (
67    frame_benchmarking::benchmarking::HostFunctions,
68    cumulus_client_service::ParachainHostFunctions,
69    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
70);
71
72/// Parachain host functions
73#[cfg(not(feature = "runtime-benchmarks"))]
74pub type HostFunctions = (
75    cumulus_client_service::ParachainHostFunctions,
76    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
77);
78
79/// Parachain executor
80pub type ParachainExecutor = WasmExecutor<HostFunctions>;
81
82type FullClient =
83    TFullClient<Block, crate::parachain::fake_runtime_api::RuntimeApi, ParachainExecutor>;
84
85/// Starts a `ServiceBuilder` for a full service.
86///
87/// Use this macro if you don't actually need the full service, but just the builder in order to
88/// be able to perform chain operations.
89pub fn new_partial(
90    config: &Configuration,
91    evm_tracing_config: &FrontierConfig,
92) -> Result<
93    PartialComponents<
94        FullClient,
95        TFullBackend<Block>,
96        (),
97        sc_consensus::DefaultImportQueue<Block>,
98        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
99        (
100            ParachainBlockImport<
101                Block,
102                FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
103                TFullBackend<Block>,
104            >,
105            Option<Telemetry>,
106            Option<TelemetryWorkerHandle>,
107            Arc<fc_db::Backend<Block, FullClient>>,
108        ),
109    >,
110    sc_service::Error,
111> {
112    let telemetry = config
113        .telemetry_endpoints
114        .clone()
115        .filter(|x| !x.is_empty())
116        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
117            let worker = TelemetryWorker::new(16)?;
118            let telemetry = worker.handle().new_telemetry(endpoints);
119            Ok((worker, telemetry))
120        })
121        .transpose()?;
122
123    let heap_pages = config
124        .executor
125        .default_heap_pages
126        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
127            extra_pages: h as _,
128        });
129
130    let executor = ParachainExecutor::builder()
131        .with_execution_method(config.executor.wasm_method)
132        .with_onchain_heap_alloc_strategy(heap_pages)
133        .with_offchain_heap_alloc_strategy(heap_pages)
134        .with_max_runtime_instances(config.executor.max_runtime_instances)
135        .with_runtime_cache_size(config.executor.runtime_cache_size)
136        .build();
137
138    let (client, backend, keystore_container, task_manager) =
139        sc_service::new_full_parts_record_import::<Block, _, _>(
140            config,
141            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
142            executor,
143            true,
144        )?;
145    let client = Arc::new(client);
146
147    let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
148
149    let telemetry = telemetry.map(|(worker, telemetry)| {
150        task_manager
151            .spawn_handle()
152            .spawn("telemetry", None, worker.run());
153        telemetry
154    });
155
156    let transaction_pool = sc_transaction_pool::Builder::new(
157        task_manager.spawn_essential_handle(),
158        client.clone(),
159        config.role.is_authority().into(),
160    )
161    .with_options(config.transaction_pool.clone())
162    .with_prometheus(config.prometheus_registry())
163    .build();
164
165    let frontier_backend = Arc::new(crate::rpc::open_frontier_backend(
166        client.clone(),
167        config,
168        evm_tracing_config,
169    )?);
170    let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
171
172    let parachain_block_import: ParachainBlockImport<_, _, _> =
173        ParachainBlockImport::new(frontier_block_import, backend.clone());
174
175    let import_queue = build_import_queue(
176        client.clone(),
177        parachain_block_import.clone(),
178        config,
179        telemetry.as_ref().map(|telemetry| telemetry.handle()),
180        &task_manager,
181    );
182
183    let params = PartialComponents {
184        backend,
185        client,
186        import_queue,
187        keystore_container,
188        task_manager,
189        transaction_pool: transaction_pool.into(),
190        select_chain: (),
191        other: (
192            parachain_block_import,
193            telemetry,
194            telemetry_worker_handle,
195            frontier_backend,
196        ),
197    };
198
199    Ok(params)
200}
201
202async fn build_relay_chain_interface(
203    polkadot_config: Configuration,
204    parachain_config: &Configuration,
205    telemetry_worker_handle: Option<TelemetryWorkerHandle>,
206    task_manager: &mut TaskManager,
207    collator_options: CollatorOptions,
208    hwbench: Option<sc_sysinfo::HwBench>,
209) -> RelayChainResult<(
210    Arc<(dyn RelayChainInterface + 'static)>,
211    Option<CollatorPair>,
212)> {
213    let result = if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
214        collator_options.relay_chain_mode
215    {
216        build_minimal_relay_chain_node_with_rpc(
217            polkadot_config,
218            parachain_config.prometheus_registry(),
219            task_manager,
220            rpc_target_urls,
221        )
222        .await
223    } else {
224        build_inprocess_relay_chain(
225            polkadot_config,
226            parachain_config,
227            telemetry_worker_handle,
228            task_manager,
229            hwbench,
230        )
231    };
232
233    // Extract only the first two elements from the 4-tuple
234    result
235        .map(|(relay_chain_interface, collator_pair, _, _)| (relay_chain_interface, collator_pair))
236}
237
238#[derive(Clone)]
239/// To add additional config to start_xyz_node functions
240pub struct AdditionalConfig {
241    /// EVM tracing configuration
242    pub evm_tracing_config: FrontierConfig,
243
244    /// Whether EVM RPC be enabled
245    pub enable_evm_rpc: bool,
246
247    /// Maxium allowed block size limit to propose
248    pub proposer_block_size_limit: usize,
249
250    /// Soft deadline limit used by `Proposer`
251    pub proposer_soft_deadline_percent: u8,
252
253    /// Hardware benchmarks score
254    pub hwbench: Option<sc_sysinfo::HwBench>,
255}
256
257/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
258///
259/// This is the actual implementation that is abstract over the executor and the runtime api.
260#[sc_tracing::logging::prefix_logs_with("Parachain")]
261async fn start_node_impl<N>(
262    parachain_config: Configuration,
263    polkadot_config: Configuration,
264    collator_options: CollatorOptions,
265    para_id: ParaId,
266    additional_config: AdditionalConfig,
267) -> sc_service::error::Result<(TaskManager, Arc<FullClient>)>
268where
269    N: NetworkBackend<Block, <Block as BlockT>::Hash>,
270{
271    let parachain_config = prepare_node_config(parachain_config);
272
273    let PartialComponents {
274        client,
275        backend,
276        mut task_manager,
277        keystore_container,
278        select_chain: _,
279        import_queue,
280        transaction_pool,
281        other: (parachain_block_import, mut telemetry, telemetry_worker_handle, frontier_backend),
282    } = new_partial(&parachain_config, &additional_config.evm_tracing_config)?;
283
284    let prometheus_registry = parachain_config.prometheus_registry().cloned();
285    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
286        &parachain_config.network,
287        prometheus_registry.clone(),
288    );
289
290    let metrics = N::register_notification_metrics(
291        parachain_config
292            .prometheus_config
293            .as_ref()
294            .map(|cfg| &cfg.registry),
295    );
296
297    let (relay_chain_interface, collator_key) = build_relay_chain_interface(
298        polkadot_config,
299        &parachain_config,
300        telemetry_worker_handle,
301        &mut task_manager,
302        collator_options.clone(),
303        additional_config.hwbench.clone(),
304    )
305    .await
306    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
307
308    let is_authority = parachain_config.role.is_authority();
309    let import_queue_service = import_queue.service();
310    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
311        cumulus_client_service::build_network(BuildNetworkParams {
312            parachain_config: &parachain_config,
313            net_config,
314            para_id,
315            client: client.clone(),
316            transaction_pool: transaction_pool.clone(),
317            spawn_handle: task_manager.spawn_handle(),
318            import_queue,
319            relay_chain_interface: relay_chain_interface.clone(),
320            sybil_resistance_level: cumulus_client_service::CollatorSybilResistance::Resistant,
321            metrics,
322        })
323        .await?;
324
325    let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
326    let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
327    let storage_override = Arc::new(StorageOverrideHandler::new(client.clone()));
328
329    // Sinks for pubsub notifications.
330    // Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
331    // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel.
332    // This way we avoid race conditions when using native substrate block import notification stream.
333    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
334        fc_mapping_sync::EthereumBlockNotification<Block>,
335    > = Default::default();
336    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
337
338    let ethapi_cmd = additional_config.evm_tracing_config.ethapi.clone();
339    let tracing_requesters =
340        if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
341            tracing::spawn_tracing_tasks(
342                &additional_config.evm_tracing_config,
343                prometheus_registry.clone(),
344                tracing::SpawnTasksParams {
345                    task_manager: &task_manager,
346                    client: client.clone(),
347                    substrate_backend: backend.clone(),
348                    frontier_backend: frontier_backend.clone(),
349                    storage_override: storage_override.clone(),
350                },
351            )
352        } else {
353            tracing::RpcRequesters {
354                debug: None,
355                trace: None,
356            }
357        };
358
359    // Frontier offchain DB task. Essential.
360    // Maps emulated ethereum data to substrate native data.
361    match frontier_backend.as_ref() {
362        fc_db::Backend::KeyValue(ref b) => {
363            task_manager.spawn_essential_handle().spawn(
364                "frontier-mapping-sync-worker",
365                Some("frontier"),
366                fc_mapping_sync::kv::MappingSyncWorker::new(
367                    client.import_notification_stream(),
368                    Duration::new(6, 0),
369                    client.clone(),
370                    backend.clone(),
371                    storage_override.clone(),
372                    b.clone(),
373                    3,
374                    0,
375                    fc_mapping_sync::SyncStrategy::Parachain,
376                    sync_service.clone(),
377                    pubsub_notification_sinks.clone(),
378                )
379                .for_each(|()| futures::future::ready(())),
380            );
381        }
382        fc_db::Backend::Sql(ref b) => {
383            task_manager.spawn_essential_handle().spawn_blocking(
384                "frontier-mapping-sync-worker",
385                Some("frontier"),
386                fc_mapping_sync::sql::SyncWorker::run(
387                    client.clone(),
388                    backend.clone(),
389                    b.clone(),
390                    client.import_notification_stream(),
391                    fc_mapping_sync::sql::SyncWorkerConfig {
392                        read_notification_timeout: Duration::from_secs(10),
393                        check_indexed_blocks_interval: Duration::from_secs(60),
394                    },
395                    fc_mapping_sync::SyncStrategy::Parachain,
396                    sync_service.clone(),
397                    pubsub_notification_sinks.clone(),
398                ),
399            );
400        }
401    }
402
403    // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters.
404    // Each filter is allowed to stay in the pool for 100 blocks.
405    const FILTER_RETAIN_THRESHOLD: u64 = 100;
406    task_manager.spawn_essential_handle().spawn(
407        "frontier-filter-pool",
408        Some("frontier"),
409        fc_rpc::EthTask::filter_pool_task(
410            client.clone(),
411            filter_pool.clone(),
412            FILTER_RETAIN_THRESHOLD,
413        ),
414    );
415
416    const FEE_HISTORY_LIMIT: u64 = 2048;
417    task_manager.spawn_essential_handle().spawn(
418        "frontier-fee-history",
419        Some("frontier"),
420        fc_rpc::EthTask::fee_history_task(
421            client.clone(),
422            storage_override.clone(),
423            fee_history_cache.clone(),
424            FEE_HISTORY_LIMIT,
425        ),
426    );
427
428    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
429        task_manager.spawn_handle(),
430        storage_override.clone(),
431        50,
432        50,
433        prometheus_registry.clone(),
434    ));
435
436    let rpc_extensions_builder = {
437        let client = client.clone();
438        let network = network.clone();
439        let transaction_pool = transaction_pool.clone();
440        let rpc_config = crate::rpc::EvmTracingConfig {
441            tracing_requesters,
442            trace_filter_max_count: additional_config.evm_tracing_config.ethapi_trace_max_count,
443            enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool),
444        };
445        let sync = sync_service.clone();
446        let pubsub_notification_sinks = pubsub_notification_sinks.clone();
447
448        Box::new(move |subscription| {
449            let deps = crate::rpc::FullDeps {
450                client: client.clone(),
451                pool: transaction_pool.clone(),
452                graph: transaction_pool.clone(),
453                network: network.clone(),
454                sync: sync.clone(),
455                is_authority,
456                frontier_backend: match *frontier_backend {
457                    fc_db::Backend::KeyValue(ref b) => b.clone(),
458                    fc_db::Backend::Sql(ref b) => b.clone(),
459                },
460                filter_pool: filter_pool.clone(),
461                fee_history_limit: FEE_HISTORY_LIMIT,
462                fee_history_cache: fee_history_cache.clone(),
463                block_data_cache: block_data_cache.clone(),
464                storage_override: storage_override.clone(),
465                enable_evm_rpc: additional_config.enable_evm_rpc,
466                command_sink: None,
467            };
468
469            crate::rpc::create_full(
470                deps,
471                subscription,
472                pubsub_notification_sinks.clone(),
473                rpc_config.clone(),
474            )
475            .map_err(Into::into)
476        })
477    };
478
479    // Spawn basic services.
480    sc_service::spawn_tasks(sc_service::SpawnTasksParams {
481        rpc_builder: rpc_extensions_builder,
482        client: client.clone(),
483        transaction_pool: transaction_pool.clone(),
484        task_manager: &mut task_manager,
485        config: parachain_config,
486        keystore: keystore_container.keystore(),
487        backend: backend.clone(),
488        network: network.clone(),
489        system_rpc_tx,
490        sync_service: sync_service.clone(),
491        tx_handler_controller,
492        telemetry: telemetry.as_mut(),
493    })?;
494
495    if let Some(hwbench) = additional_config.hwbench.clone() {
496        sc_sysinfo::print_hwbench(&hwbench);
497        if is_authority {
498            warn_if_slow_hardware(&hwbench);
499        }
500
501        if let Some(ref mut telemetry) = telemetry {
502            let telemetry_handle = telemetry.handle();
503            task_manager.spawn_handle().spawn(
504                "telemetry_hwbench",
505                None,
506                sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
507            );
508        }
509    }
510
511    let announce_block = {
512        let sync_service = sync_service.clone();
513        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
514    };
515
516    let overseer_handle = relay_chain_interface
517        .overseer_handle()
518        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
519
520    start_relay_chain_tasks(StartRelayChainTasksParams {
521        client: client.clone(),
522        announce_block: announce_block.clone(),
523        task_manager: &mut task_manager,
524        para_id,
525        relay_chain_interface: relay_chain_interface.clone(),
526        relay_chain_slot_duration: Duration::from_secs(6),
527        import_queue: import_queue_service,
528        recovery_handle: Box::new(overseer_handle.clone()),
529        sync_service: sync_service.clone(),
530        da_recovery_profile: if is_authority {
531            DARecoveryProfile::Collator
532        } else {
533            DARecoveryProfile::FullNode
534        },
535        prometheus_registry: prometheus_registry.as_ref(),
536    })?;
537
538    if is_authority {
539        start_aura_consensus(
540            client.clone(),
541            backend,
542            parachain_block_import,
543            prometheus_registry.as_ref(),
544            telemetry.map(|t| t.handle()),
545            &mut task_manager,
546            relay_chain_interface,
547            transaction_pool,
548            sync_service,
549            keystore_container.keystore(),
550            para_id,
551            collator_key.expect("Command line arguments do not allow this. qed"),
552            additional_config,
553        )?;
554    }
555
556    Ok((task_manager, client))
557}
558
559/// Build aura import queue with fallback to relay-chain verifier.
560/// Starts with relay-chain verifier until aura becomes available.
561pub fn build_import_queue(
562    client: Arc<FullClient>,
563    block_import: ParachainBlockImport<
564        Block,
565        FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
566        TFullBackend<Block>,
567    >,
568    config: &Configuration,
569    telemetry_handle: Option<TelemetryHandle>,
570    task_manager: &TaskManager,
571) -> sc_consensus::DefaultImportQueue<Block> {
572    let verifier_client = client.clone();
573    // CIDP for Aura verifier - requires AuraApi for slot_duration
574    let create_aura_inherent_data_providers = move |parent_hash, _| {
575        let cidp_client = verifier_client.clone();
576        async move {
577            let slot_duration =
578                cumulus_client_consensus_aura::slot_duration_at(&*cidp_client, parent_hash)?;
579            let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
580
581            let slot =
582                sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
583                    *timestamp,
584                    slot_duration,
585                );
586
587            Ok((slot, timestamp))
588        }
589    };
590
591    // CIDP for relay chain verifier - minimal, only timestamp (no AuraApi required)
592    // This is used for pre-AuraApi blocks (Shiden genesis did not start with AuraApi)
593    let create_relay_inherent_data_providers = move |_parent_hash: Hash, _| async move {
594        let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
595        Ok(timestamp)
596    };
597
598    let aura_verifier = Box::new(cumulus_client_consensus_aura::build_verifier::<
599        AuraPair,
600        _,
601        _,
602        _,
603    >(cumulus_client_consensus_aura::BuildVerifierParams {
604        client: client.clone(),
605        create_inherent_data_providers: create_aura_inherent_data_providers,
606        telemetry: telemetry_handle,
607    }));
608
609    let relay_chain_verifier = Box::new(RelayChainVerifier::new(
610        client.clone(),
611        create_relay_inherent_data_providers,
612    )) as Box<_>;
613
614    let verifier = Verifier {
615        client,
616        relay_chain_verifier,
617        aura_verifier,
618    };
619
620    let registry = config.prometheus_registry();
621    let spawner = task_manager.spawn_essential_handle();
622
623    BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry)
624}
625
626/// Start collating with the `shell` runtime while waiting for an upgrade to an Aura compatible runtime.
627fn start_aura_consensus(
628    client: Arc<FullClient>,
629    backend: Arc<TFullBackend<Block>>,
630    parachain_block_import: ParachainBlockImport<
631        Block,
632        FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
633        TFullBackend<Block>,
634    >,
635    prometheus_registry: Option<&Registry>,
636    telemetry: Option<TelemetryHandle>,
637    task_manager: &TaskManager,
638    relay_chain_interface: Arc<dyn RelayChainInterface>,
639    transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, FullClient>>,
640    sync_oracle: Arc<SyncingService<Block>>,
641    keystore: KeystorePtr,
642    para_id: ParaId,
643    collator_key: CollatorPair,
644    additional_config: AdditionalConfig,
645) -> Result<(), sc_service::Error> {
646    let mut proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
647        task_manager.spawn_handle(),
648        client.clone(),
649        transaction_pool,
650        prometheus_registry,
651        telemetry,
652    );
653
654    proposer_factory.set_default_block_size_limit(additional_config.proposer_block_size_limit);
655    proposer_factory.set_soft_deadline(Percent::from_percent(
656        additional_config.proposer_soft_deadline_percent,
657    ));
658
659    let overseer_handle = relay_chain_interface
660        .overseer_handle()
661        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
662
663    let announce_block = {
664        let sync_service = sync_oracle.clone();
665        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
666    };
667
668    let collator_service = cumulus_client_collator::service::CollatorService::new(
669        client.clone(),
670        Arc::new(task_manager.spawn_handle()),
671        announce_block,
672        client.clone(),
673    );
674
675    let params = AuraParams {
676        create_inherent_data_providers: move |_, ()| async move { Ok(()) },
677        block_import: parachain_block_import.clone(),
678        para_client: client.clone(),
679        para_backend: backend,
680        relay_client: relay_chain_interface.clone(),
681        code_hash_provider: {
682            let client = client.clone();
683            move |block_hash| {
684                client
685                    .code_at(block_hash)
686                    .ok()
687                    .map(|c| ValidationCode::from(c).hash())
688            }
689        },
690        keystore,
691        collator_key,
692        para_id,
693        overseer_handle,
694        relay_chain_slot_duration: Duration::from_secs(6),
695        proposer: cumulus_client_consensus_proposer::Proposer::new(proposer_factory),
696        collator_service,
697        authoring_duration: Duration::from_millis(2000),
698        reinitialize: false,
699        // If necessary, AdditionalConfig CLI params could be extend to make it configurable.
700        // However, it will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed.
701        max_pov_percentage: None, // default is 85%
702    };
703
704    let fut = async move {
705        wait_for_aura(client).await;
706        aura::run::<Block, AuraPair, _, _, _, _, _, _, _, _>(params).await
707    };
708
709    task_manager
710        .spawn_essential_handle()
711        .spawn("aura", None, fut);
712    Ok(())
713}
714
715/// Wait for the Aura runtime API to appear on chain.
716/// This is useful for chains that started out without Aura. Components that
717/// are depending on Aura functionality will wait until Aura appears in the runtime.
718async fn wait_for_aura(client: Arc<FullClient>) {
719    let finalized_hash = client.chain_info().finalized_hash;
720    if client
721        .runtime_api()
722        .has_api::<dyn AuraApi<Block, AuraId>>(finalized_hash)
723        .unwrap_or_default()
724    {
725        return;
726    };
727
728    let mut stream = client.finality_notification_stream();
729    while let Some(notification) = stream.next().await {
730        if client
731            .runtime_api()
732            .has_api::<dyn AuraApi<Block, AuraId>>(notification.hash)
733            .unwrap_or_default()
734        {
735            return;
736        }
737    }
738}
739
740/// Checks that the hardware meets the requirements and print a warning otherwise.
741fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) {
742    // Polkadot para-chains should generally use these requirements to ensure that the relay-chain
743    // will not take longer than expected to import its blocks.
744    if let Err(err) =
745        frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false)
746    {
747        log::warn!(
748            "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\
749            https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware",
750            err
751        );
752    }
753}
754
755/// Start a parachain node.
756pub async fn start_node(
757    parachain_config: Configuration,
758    polkadot_config: Configuration,
759    collator_options: CollatorOptions,
760    para_id: ParaId,
761    additional_config: AdditionalConfig,
762) -> sc_service::error::Result<(TaskManager, Arc<FullClient>)> {
763    match parachain_config.network.network_backend {
764        NetworkBackendType::Libp2p => {
765            start_node_impl::<sc_network::NetworkWorker<_, _>>(
766                parachain_config,
767                polkadot_config,
768                collator_options,
769                para_id,
770                additional_config,
771            )
772            .await
773        }
774        NetworkBackendType::Litep2p => {
775            start_node_impl::<sc_network::Litep2pNetworkBackend>(
776                parachain_config,
777                polkadot_config,
778                collator_options,
779                para_id,
780                additional_config,
781            )
782            .await
783        }
784    }
785}