astar_collator/local/
service.rs

1// This file is part of Astar.
2
3// Copyright (C) Stake Technologies Pte.Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later
5
6// Astar is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// Astar is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with Astar. If not, see <http://www.gnu.org/licenses/>.
18
19//! Local Service and ServiceFactory implementation. Specialized wrapper over substrate service.
20
21pub use crate::parachain::fake_runtime_api::RuntimeApi;
22use crate::{
23    evm_tracing_types::{EthApi as EthApiCmd, FrontierConfig},
24    rpc::tracing,
25};
26use astar_test_utils::RelayStateSproofBuilder;
27use cumulus_client_parachain_inherent::MockXcmConfig;
28use cumulus_primitives_aura::Slot;
29use cumulus_primitives_core::{
30    relay_chain,
31    relay_chain::{well_known_keys, AsyncBackingParams, HeadData, UpgradeGoAhead},
32    AbridgedHostConfiguration, CollectCollationInfo, InboundHrmpMessage, ParaId,
33    RelayParentOffsetApi,
34};
35use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData};
36use fc_consensus::FrontierBlockImport;
37use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
38use fc_storage::StorageOverrideHandler;
39use futures::{FutureExt, StreamExt};
40use parity_scale_codec::Encode;
41use polkadot_core_primitives::InboundDownwardMessage;
42use polkadot_primitives::PersistedValidationData;
43use sc_client_api::{Backend, BlockchainEvents};
44use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
45use sc_network::NetworkBackend;
46use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
47use sc_telemetry::{Telemetry, TelemetryWorker};
48use sc_transaction_pool_api::OffchainTransactionPoolFactory;
49use sp_api::ProvideRuntimeApi;
50use sp_blockchain::HeaderBackend;
51use sp_inherents::{InherentData, InherentDataProvider};
52use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
53use std::{collections::BTreeMap, marker::PhantomData, ops::Sub, sync::Arc, time::Duration};
54
55use astar_primitives::*;
56
57/// Local pending inherent provider for ETH pending RPC in dev mode.
58pub struct LocalPendingInherentDataProvider<B, C> {
59    client: Arc<C>,
60    para_id: ParaId,
61    phantom_data: PhantomData<B>,
62}
63
64const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6000;
65
66/// Inherent data provider that supplies mocked validation data.
67/// TODO: Use it from PolkadotSDK again after stable2512 uplift
68#[derive(Default)]
69pub struct MockValidationDataInherentDataProvider<R = ()> {
70    /// The current block number of the local block chain (the parachain).
71    pub current_para_block: u32,
72    /// The parachain ID of the parachain for that the inherent data is created.
73    pub para_id: ParaId,
74    /// The current block head data of the local block chain (the parachain).
75    pub current_para_block_head: Option<cumulus_primitives_core::relay_chain::HeadData>,
76    /// The relay block in which this parachain appeared to start. This will be the relay block
77    /// number in para block #P1.
78    pub relay_offset: u32,
79    /// The relay parent offset that determines how many relay parent descendants are required.
80    pub relay_parent_offset: u32,
81    /// The number of relay blocks that elapses between each parablock. Probably set this to 1 or 2
82    /// to simulate optimistic or realistic relay chain behavior.
83    pub relay_blocks_per_para_block: u32,
84    /// Number of parachain blocks per relay chain epoch
85    /// Mock epoch is computed by dividing `current_para_block` by this value.
86    pub para_blocks_per_relay_epoch: u32,
87    /// Function to mock BABE one epoch ago randomness.
88    pub relay_randomness_config: R,
89    /// XCM messages and associated configuration information.
90    pub xcm_config: MockXcmConfig,
91    /// Inbound downward XCM messages to be injected into the block.
92    pub raw_downward_messages: Vec<Vec<u8>>,
93    /// Inbound Horizontal messages sorted by channel.
94    pub raw_horizontal_messages: Vec<(ParaId, Vec<u8>)>,
95    /// Additional key-value pairs that should be injected.
96    pub additional_key_values: Option<Vec<(Vec<u8>, Vec<u8>)>>,
97    /// Whether upgrade go ahead should be set.
98    pub upgrade_go_ahead: Option<UpgradeGoAhead>,
99}
100
101/// Something that can generate randomness.
102pub trait GenerateRandomness<I> {
103    /// Generate the randomness using the given `input`.
104    fn generate_randomness(&self, input: I) -> relay_chain::Hash;
105}
106
107impl GenerateRandomness<u64> for () {
108    /// Default implementation uses relay epoch as randomness value
109    /// A more seemingly random implementation may hash the relay epoch instead
110    fn generate_randomness(&self, input: u64) -> relay_chain::Hash {
111        let mut mock_randomness: [u8; 32] = [0u8; 32];
112        mock_randomness[..8].copy_from_slice(&input.to_be_bytes());
113        mock_randomness.into()
114    }
115}
116
117#[async_trait::async_trait]
118impl<R: Send + Sync + GenerateRandomness<u64>> InherentDataProvider
119    for MockValidationDataInherentDataProvider<R>
120{
121    async fn provide_inherent_data(
122        &self,
123        inherent_data: &mut InherentData,
124    ) -> Result<(), sp_inherents::Error> {
125        // Use the "sproof" (spoof proof) builder to build valid mock state root and proof.
126        let mut sproof_builder = RelayStateSproofBuilder {
127            para_id: self.para_id,
128            ..Default::default()
129        };
130
131        // Calculate the mocked relay block based on the current para block
132        let relay_parent_number =
133            self.relay_offset + self.relay_blocks_per_para_block * self.current_para_block;
134        sproof_builder.current_slot = Slot::from(relay_parent_number as u64);
135
136        sproof_builder.upgrade_go_ahead = self.upgrade_go_ahead;
137        // Process the downward messages and set up the correct head
138        let mut downward_messages = Vec::new();
139        let mut dmq_mqc = MessageQueueChain::new(self.xcm_config.starting_dmq_mqc_head);
140        for msg in &self.raw_downward_messages {
141            let wrapped = InboundDownwardMessage {
142                sent_at: relay_parent_number,
143                msg: msg.clone(),
144            };
145
146            dmq_mqc.extend_downward(&wrapped);
147            downward_messages.push(wrapped);
148        }
149        sproof_builder.dmq_mqc_head = Some(dmq_mqc.head());
150
151        // Process the hrmp messages and set up the correct heads
152        // Begin by collecting them into a Map
153        let mut horizontal_messages = BTreeMap::<ParaId, Vec<InboundHrmpMessage>>::new();
154        for (para_id, msg) in &self.raw_horizontal_messages {
155            let wrapped = InboundHrmpMessage {
156                sent_at: relay_parent_number,
157                data: msg.clone(),
158            };
159
160            horizontal_messages
161                .entry(*para_id)
162                .or_default()
163                .push(wrapped);
164        }
165
166        // Now iterate again, updating the heads as we go
167        for (para_id, messages) in &horizontal_messages {
168            let mut channel_mqc = MessageQueueChain::new(
169                *self
170                    .xcm_config
171                    .starting_hrmp_mqc_heads
172                    .get(para_id)
173                    .unwrap_or(&relay_chain::Hash::default()),
174            );
175            for message in messages {
176                channel_mqc.extend_hrmp(message);
177            }
178            sproof_builder.upsert_inbound_channel(*para_id).mqc_head = Some(channel_mqc.head());
179        }
180
181        // Epoch is set equal to current para block / blocks per epoch
182        sproof_builder.current_epoch = if self.para_blocks_per_relay_epoch == 0 {
183            // do not divide by 0 => set epoch to para block number
184            self.current_para_block.into()
185        } else {
186            (self.current_para_block / self.para_blocks_per_relay_epoch).into()
187        };
188        // Randomness is set by randomness generator
189        sproof_builder.randomness = self
190            .relay_randomness_config
191            .generate_randomness(self.current_para_block.into());
192
193        if let Some(key_values) = &self.additional_key_values {
194            sproof_builder.additional_key_values = key_values.clone()
195        }
196
197        // Inject current para block head, if any
198        sproof_builder.included_para_head = self.current_para_block_head.clone();
199
200        let (relay_parent_storage_root, proof, relay_parent_descendants) =
201            sproof_builder.into_state_root_proof_and_descendants(self.relay_parent_offset.into());
202        let parachain_inherent_data = ParachainInherentData {
203            validation_data: PersistedValidationData {
204                parent_head: Default::default(),
205                relay_parent_storage_root,
206                relay_parent_number,
207                max_pov_size: Default::default(),
208            },
209            downward_messages,
210            horizontal_messages,
211            relay_chain_state: proof,
212            relay_parent_descendants,
213            collator_peer_id: None,
214        };
215
216        parachain_inherent_data
217            .provide_inherent_data(inherent_data)
218            .await
219    }
220
221    // Copied from the real implementation
222    async fn try_handle_error(
223        &self,
224        _: &sp_inherents::InherentIdentifier,
225        _: &[u8],
226    ) -> Option<Result<(), sp_inherents::Error>> {
227        None
228    }
229}
230
231fn build_local_mock_inherent_data(
232    para_id: ParaId,
233    current_para_block: u32,
234    current_para_block_head: Option<HeadData>,
235    relay_blocks_per_para_block: u32,
236    relay_slot: u64,
237    relay_parent_offset: u32,
238    upgrade_go_ahead: Option<UpgradeGoAhead>,
239) -> (
240    sp_timestamp::InherentDataProvider,
241    MockValidationDataInherentDataProvider<()>,
242) {
243    let relay_offset = relay_parent_offset.saturating_add(
244        (relay_slot as u32)
245            .saturating_sub(relay_blocks_per_para_block.saturating_mul(current_para_block)),
246    );
247
248    let local_host_config = AbridgedHostConfiguration {
249        max_code_size: 16 * 1024 * 1024, // 16 MiB (local dev only)
250        max_head_data_size: 1024 * 1024,
251        max_upward_queue_count: 8,
252        max_upward_queue_size: 1024,
253        max_upward_message_size: 256,
254        max_upward_message_num_per_candidate: 5,
255        hrmp_max_message_num_per_candidate: 5,
256        validation_upgrade_cooldown: 6,
257        validation_upgrade_delay: 6,
258        async_backing_params: AsyncBackingParams {
259            allowed_ancestry_len: 0,
260            max_candidate_depth: 0,
261        },
262    };
263
264    let mocked_parachain = MockValidationDataInherentDataProvider::<()> {
265        current_para_block,
266        para_id,
267        current_para_block_head,
268        relay_blocks_per_para_block,
269        relay_offset,
270        relay_parent_offset,
271        para_blocks_per_relay_epoch: 10,
272        upgrade_go_ahead,
273        additional_key_values: Some(vec![(
274            well_known_keys::ACTIVE_CONFIG.to_vec(),
275            local_host_config.encode(),
276        )]),
277        ..Default::default()
278    };
279
280    let timestamp = relay_slot.saturating_mul(RELAY_CHAIN_SLOT_DURATION_MILLIS);
281    let timestamp_provider = sp_timestamp::InherentDataProvider::new(timestamp.into());
282
283    (timestamp_provider, mocked_parachain)
284}
285
286impl<B, C> LocalPendingInherentDataProvider<B, C> {
287    /// Creates a new instance with the given client and parachain ID.
288    pub fn new(client: Arc<C>, para_id: ParaId) -> Self {
289        Self {
290            client,
291            para_id,
292            phantom_data: Default::default(),
293        }
294    }
295}
296
297#[async_trait::async_trait]
298impl<B, C> sp_inherents::CreateInherentDataProviders<B, ()>
299    for LocalPendingInherentDataProvider<B, C>
300where
301    B: BlockT,
302    C: ProvideRuntimeApi<B> + HeaderBackend<B> + Send + Sync,
303    C::Api: RelayParentOffsetApi<B>,
304{
305    type InherentDataProviders = (
306        sp_timestamp::InherentDataProvider,
307        MockValidationDataInherentDataProvider<()>,
308    );
309
310    async fn create_inherent_data_providers(
311        &self,
312        parent: B::Hash,
313        _extra_args: (),
314    ) -> Result<Self::InherentDataProviders, Box<dyn std::error::Error + Send + Sync>> {
315        let relay_slot = std::time::SystemTime::now()
316            .duration_since(std::time::UNIX_EPOCH)
317            .expect("Current time is always after UNIX_EPOCH; qed")
318            .as_millis() as u64
319            / RELAY_CHAIN_SLOT_DURATION_MILLIS;
320
321        let current_para_block = self
322            .client
323            .header(parent)?
324            .map(|header| {
325                UniqueSaturatedInto::<u32>::unique_saturated_into(*header.number())
326                    .saturating_add(1)
327            })
328            .unwrap_or(1);
329
330        let current_para_block_head = self
331            .client
332            .header(parent)?
333            .map(|header| header.encode().into());
334
335        let relay_parent_offset = self.client.runtime_api().relay_parent_offset(parent)?;
336
337        let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
338            self.para_id,
339            current_para_block,
340            current_para_block_head,
341            1,
342            relay_slot,
343            relay_parent_offset,
344            None,
345        );
346
347        Ok((timestamp_provider, mocked_parachain))
348    }
349}
350
351/// Parachain host functions
352#[cfg(feature = "runtime-benchmarks")]
353pub type HostFunctions = (
354    frame_benchmarking::benchmarking::HostFunctions,
355    cumulus_client_service::ParachainHostFunctions,
356    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
357);
358
359/// Parachain host functions
360#[cfg(not(feature = "runtime-benchmarks"))]
361pub type HostFunctions = (
362    cumulus_client_service::ParachainHostFunctions,
363    moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
364);
365
366type ParachainExecutor = WasmExecutor<HostFunctions>;
367
368type FullClient = sc_service::TFullClient<Block, RuntimeApi, ParachainExecutor>;
369type FullBackend = sc_service::TFullBackend<Block>;
370type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
371
372/// Build a partial chain component config
373pub fn new_partial(
374    config: &Configuration,
375    evm_tracing_config: &FrontierConfig,
376) -> Result<
377    sc_service::PartialComponents<
378        FullClient,
379        FullBackend,
380        FullSelectChain,
381        sc_consensus::DefaultImportQueue<Block>,
382        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
383        (
384            FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
385            Option<Telemetry>,
386            Arc<fc_db::Backend<Block, FullClient>>,
387        ),
388    >,
389    ServiceError,
390> {
391    let telemetry = config
392        .telemetry_endpoints
393        .clone()
394        .filter(|x| !x.is_empty())
395        .map(|endpoints| -> Result<_, sc_telemetry::Error> {
396            let worker = TelemetryWorker::new(16)?;
397            let telemetry = worker.handle().new_telemetry(endpoints);
398            Ok((worker, telemetry))
399        })
400        .transpose()?;
401
402    let heap_pages = config
403        .executor
404        .default_heap_pages
405        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
406            extra_pages: h as _,
407        });
408
409    let executor = ParachainExecutor::builder()
410        .with_execution_method(config.executor.wasm_method)
411        .with_onchain_heap_alloc_strategy(heap_pages)
412        .with_offchain_heap_alloc_strategy(heap_pages)
413        .with_max_runtime_instances(config.executor.max_runtime_instances)
414        .with_runtime_cache_size(config.executor.runtime_cache_size)
415        .build();
416
417    let (client, backend, keystore_container, task_manager) =
418        sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
419            config,
420            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
421            executor,
422            true,
423        )?;
424    let client = Arc::new(client);
425    let telemetry = telemetry.map(|(worker, telemetry)| {
426        task_manager
427            .spawn_handle()
428            .spawn("telemetry", None, worker.run());
429        telemetry
430    });
431    let select_chain = sc_consensus::LongestChain::new(backend.clone());
432    let transaction_pool = sc_transaction_pool::Builder::new(
433        task_manager.spawn_essential_handle(),
434        client.clone(),
435        config.role.is_authority().into(),
436    )
437    .with_options(config.transaction_pool.clone())
438    .with_prometheus(config.prometheus_registry())
439    .build();
440    let frontier_backend = Arc::new(crate::rpc::open_frontier_backend(
441        client.clone(),
442        config,
443        evm_tracing_config,
444    )?);
445    let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
446
447    let import_queue = sc_consensus_manual_seal::import_queue(
448        Box::new(client.clone()),
449        &task_manager.spawn_essential_handle(),
450        config.prometheus_registry(),
451    );
452
453    Ok(sc_service::PartialComponents {
454        client,
455        backend,
456        task_manager,
457        import_queue,
458        keystore_container,
459        select_chain,
460        transaction_pool: transaction_pool.into(),
461        other: (frontier_block_import, telemetry, frontier_backend),
462    })
463}
464
465/// Builds a new local development service (parachain-oriented).
466pub fn start_node<N>(
467    mut config: Configuration,
468    evm_tracing_config: FrontierConfig,
469) -> Result<TaskManager, ServiceError>
470where
471    N: NetworkBackend<Block, <Block as BlockT>::Hash>,
472{
473    let sc_service::PartialComponents {
474        client,
475        backend,
476        mut task_manager,
477        import_queue,
478        keystore_container,
479        select_chain,
480        transaction_pool,
481        other: (block_import, mut telemetry, frontier_backend),
482    } = new_partial(&config, &evm_tracing_config)?;
483
484    // Dev node: no peers
485    config.network.default_peers_set.in_peers = 0;
486    config.network.default_peers_set.out_peers = 0;
487
488    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
489        &config.network,
490        config.prometheus_registry().cloned(),
491    );
492
493    let metrics = N::register_notification_metrics(
494        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
495    );
496    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
497        sc_service::build_network(sc_service::BuildNetworkParams {
498            config: &config,
499            net_config,
500            client: client.clone(),
501            transaction_pool: transaction_pool.clone(),
502            spawn_handle: task_manager.spawn_handle(),
503            import_queue,
504            block_announce_validator_builder: None,
505            warp_sync_config: None,
506            block_relay: None,
507            metrics,
508        })?;
509
510    if config.offchain_worker.enabled {
511        task_manager.spawn_handle().spawn(
512            "offchain-workers-runner",
513            "offchain-work",
514            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
515                runtime_api_provider: client.clone(),
516                keystore: Some(keystore_container.keystore()),
517                offchain_db: backend.offchain_storage(),
518                transaction_pool: Some(OffchainTransactionPoolFactory::new(
519                    transaction_pool.clone(),
520                )),
521                network_provider: Arc::new(network.clone()),
522                is_validator: config.role.is_authority(),
523                enable_http_requests: true,
524                custom_extensions: move |_| vec![],
525            })?
526            .run(client.clone(), task_manager.spawn_handle())
527            .boxed(),
528        );
529    }
530
531    let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
532    let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
533    let storage_override = Arc::new(StorageOverrideHandler::new(client.clone()));
534
535    // Sinks for pubsub notifications.
536    // Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
537    // The MappingSyncWorker sends through the channel on block import and the subscription emits a notification to the subscriber on receiving a message through this channel.
538    // This way we avoid race conditions when using native substrate block import notification stream.
539    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
540        fc_mapping_sync::EthereumBlockNotification<Block>,
541    > = Default::default();
542    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
543
544    let ethapi_cmd = evm_tracing_config.ethapi.clone();
545
546    let tracing_requesters =
547        if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
548            tracing::spawn_tracing_tasks(
549                &evm_tracing_config,
550                config.prometheus_registry().cloned(),
551                tracing::SpawnTasksParams {
552                    task_manager: &task_manager,
553                    client: client.clone(),
554                    substrate_backend: backend.clone(),
555                    frontier_backend: frontier_backend.clone(),
556                    storage_override: storage_override.clone(),
557                },
558            )
559        } else {
560            tracing::RpcRequesters {
561                debug: None,
562                trace: None,
563            }
564        };
565
566    // Frontier offchain DB task. Essential.
567    // Maps emulated ethereum data to substrate native data.
568    match frontier_backend.as_ref() {
569        fc_db::Backend::KeyValue(ref b) => {
570            task_manager.spawn_essential_handle().spawn(
571                "frontier-mapping-sync-worker",
572                Some("frontier"),
573                fc_mapping_sync::kv::MappingSyncWorker::new(
574                    client.import_notification_stream(),
575                    Duration::new(6, 0),
576                    client.clone(),
577                    backend.clone(),
578                    storage_override.clone(),
579                    b.clone(),
580                    3,
581                    0,
582                    fc_mapping_sync::SyncStrategy::Parachain,
583                    sync_service.clone(),
584                    pubsub_notification_sinks.clone(),
585                )
586                .for_each(|()| futures::future::ready(())),
587            );
588        }
589        fc_db::Backend::Sql(ref b) => {
590            task_manager.spawn_essential_handle().spawn_blocking(
591                "frontier-mapping-sync-worker",
592                Some("frontier"),
593                fc_mapping_sync::sql::SyncWorker::run(
594                    client.clone(),
595                    backend.clone(),
596                    b.clone(),
597                    client.import_notification_stream(),
598                    fc_mapping_sync::sql::SyncWorkerConfig {
599                        read_notification_timeout: Duration::from_secs(10),
600                        check_indexed_blocks_interval: Duration::from_secs(60),
601                    },
602                    fc_mapping_sync::SyncStrategy::Parachain,
603                    sync_service.clone(),
604                    pubsub_notification_sinks.clone(),
605                ),
606            );
607        }
608    }
609
610    // Frontier `EthFilterApi` maintenance. Manages the pool of user-created Filters.
611    // Each filter is allowed to stay in the pool for 100 blocks.
612    const FILTER_RETAIN_THRESHOLD: u64 = 100;
613    task_manager.spawn_essential_handle().spawn(
614        "frontier-filter-pool",
615        Some("frontier"),
616        fc_rpc::EthTask::filter_pool_task(
617            client.clone(),
618            filter_pool.clone(),
619            FILTER_RETAIN_THRESHOLD,
620        ),
621    );
622
623    const FEE_HISTORY_LIMIT: u64 = 2048;
624    task_manager.spawn_essential_handle().spawn(
625        "frontier-fee-history",
626        Some("frontier"),
627        fc_rpc::EthTask::fee_history_task(
628            client.clone(),
629            storage_override.clone(),
630            fee_history_cache.clone(),
631            FEE_HISTORY_LIMIT,
632        ),
633    );
634
635    let role = config.role.clone();
636    let prometheus_registry = config.prometheus_registry().cloned();
637    let is_authority = config.role.is_authority();
638
639    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
640        task_manager.spawn_handle(),
641        storage_override.clone(),
642        50,
643        50,
644        prometheus_registry.clone(),
645    ));
646
647    // Channel for the rpc handler to communicate with the authorship task.
648    let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024);
649    let local_para_id = ParaId::from(
650        crate::parachain::chain_spec::Extensions::try_get(&*config.chain_spec)
651            .map(|e| e.para_id)
652            .unwrap_or(2000),
653    );
654
655    let rpc_extensions_builder = {
656        let client = client.clone();
657        let network = network.clone();
658        let transaction_pool = transaction_pool.clone();
659        let sync = sync_service.clone();
660        let pubsub_notification_sinks = pubsub_notification_sinks.clone();
661
662        Box::new(move |subscription| {
663            let deps = crate::rpc::FullDeps {
664                client: client.clone(),
665                pool: transaction_pool.clone(),
666                graph: transaction_pool.clone(),
667                network: network.clone(),
668                sync: sync.clone(),
669                is_authority,
670                frontier_backend: match *frontier_backend {
671                    fc_db::Backend::KeyValue(ref b) => b.clone(),
672                    fc_db::Backend::Sql(ref b) => b.clone(),
673                },
674                filter_pool: filter_pool.clone(),
675                fee_history_limit: FEE_HISTORY_LIMIT,
676                fee_history_cache: fee_history_cache.clone(),
677                block_data_cache: block_data_cache.clone(),
678                storage_override: storage_override.clone(),
679                enable_evm_rpc: true, // enable EVM RPC for dev node by default
680                command_sink: Some(command_sink.clone()),
681            };
682
683            crate::rpc::create_full_local_dev(
684                deps,
685                subscription,
686                pubsub_notification_sinks.clone(),
687                local_para_id,
688                crate::rpc::EvmTracingConfig {
689                    tracing_requesters: tracing_requesters.clone(),
690                    trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count,
691                    enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool),
692                },
693            )
694            .map_err::<ServiceError, _>(Into::into)
695        })
696    };
697
698    let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
699        network: network.clone(),
700        client: client.clone(),
701        keystore: keystore_container.keystore(),
702        task_manager: &mut task_manager,
703        transaction_pool: transaction_pool.clone(),
704        rpc_builder: rpc_extensions_builder,
705        backend,
706        system_rpc_tx,
707        tx_handler_controller,
708        sync_service: sync_service.clone(),
709        config,
710        telemetry: telemetry.as_mut(),
711    })?;
712
713    if role.is_authority() {
714        let proposer_factory = sc_basic_authorship::ProposerFactory::new(
715            task_manager.spawn_handle(),
716            client.clone(),
717            transaction_pool.clone(),
718            prometheus_registry.as_ref(),
719            telemetry.as_ref().map(|x| x.handle()),
720        );
721
722        let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
723
724        let para_id = local_para_id;
725        let initial_relay_slot = std::time::SystemTime::now()
726            .duration_since(std::time::UNIX_EPOCH)
727            .expect("Current time is always after UNIX_EPOCH; qed")
728            .sub(Duration::from_secs(2 * 60 * 60))
729            .as_millis() as u64
730            / RELAY_CHAIN_SLOT_DURATION_MILLIS;
731
732        let aura =
733            sc_consensus_manual_seal::run_manual_seal(sc_consensus_manual_seal::ManualSealParams {
734                block_import,
735                env: proposer_factory,
736                client: client.clone(),
737                pool: transaction_pool.clone(),
738                commands_stream,
739                select_chain,
740                consensus_data_provider: Some(Box::new(
741                    sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new(
742                        client.clone(),
743                    ),
744                )),
745                create_inherent_data_providers: move |parent_hash, ()| {
746                    let client = client.clone();
747                    async move {
748                        let current_para_head = client
749                            .header(parent_hash)
750                            .expect("Header lookup should succeed")
751                            .expect("Header passed in as parent should be present in backend.");
752
753                        let should_send_go_ahead = client
754                            .runtime_api()
755                            .collect_collation_info(parent_hash, &current_para_head)
756                            .map(|info| info.new_validation_code.is_some())
757                            .unwrap_or_default();
758
759                        let current_para_block = UniqueSaturatedInto::<u32>::unique_saturated_into(
760                            *current_para_head.number(),
761                        ) + 1;
762
763                        let relay_blocks_per_para_block =
764                            (slot_duration.as_millis() / RELAY_CHAIN_SLOT_DURATION_MILLIS).max(1)
765                                as u32;
766                        let current_para_block_u64 = u64::from(current_para_block);
767                        let relay_blocks_per_para_block_u64 =
768                            u64::from(relay_blocks_per_para_block);
769                        let target_relay_slot = initial_relay_slot.saturating_add(
770                            current_para_block_u64.saturating_mul(relay_blocks_per_para_block_u64),
771                        );
772
773                        let current_para_block_head = Some(current_para_head.encode().into());
774                        let relay_parent_offset =
775                            client.runtime_api().relay_parent_offset(parent_hash)?;
776
777                        let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
778                            para_id,
779                            current_para_block,
780                            current_para_block_head,
781                            relay_blocks_per_para_block,
782                            target_relay_slot,
783                            relay_parent_offset,
784                            should_send_go_ahead.then_some(UpgradeGoAhead::GoAhead),
785                        );
786
787                        Ok((timestamp_provider, mocked_parachain))
788                    }
789                },
790            });
791
792        task_manager
793            .spawn_essential_handle()
794            .spawn_blocking("aura", Some("block-authoring"), aura);
795    }
796
797    Ok(task_manager)
798}