1use crate::{
22 evm_tracing_types::{EthApi as EthApiCmd, FrontierConfig},
23 rpc::tracing,
24};
25use cumulus_client_parachain_inherent::MockValidationDataInherentDataProvider;
26use cumulus_primitives_core::{
27 relay_chain::{well_known_keys, AsyncBackingParams, HeadData, UpgradeGoAhead},
28 AbridgedHostConfiguration, CollectCollationInfo, ParaId,
29};
30use fc_consensus::FrontierBlockImport;
31use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
32use fc_storage::StorageOverrideHandler;
33use futures::{FutureExt, StreamExt};
34use parity_scale_codec::Encode;
35use sc_client_api::{Backend, BlockchainEvents};
36use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
37use sc_network::NetworkBackend;
38use sc_service::{error::Error as ServiceError, Configuration, TaskManager};
39use sc_telemetry::{Telemetry, TelemetryWorker};
40use sc_transaction_pool_api::OffchainTransactionPoolFactory;
41use sp_api::ProvideRuntimeApi;
42use sp_blockchain::HeaderBackend;
43use sp_runtime::traits::{Block as BlockT, Header as HeaderT, UniqueSaturatedInto};
44use std::{collections::BTreeMap, marker::PhantomData, ops::Sub, sync::Arc, time::Duration};
45
46pub use crate::parachain::fake_runtime_api::RuntimeApi;
47
48use astar_primitives::*;
49
50pub struct LocalPendingInherentDataProvider<B, C> {
52 client: Arc<C>,
53 para_id: ParaId,
54 phantom_data: PhantomData<B>,
55}
56
57const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6000;
58
59fn build_local_mock_inherent_data(
60 para_id: ParaId,
61 current_para_block: u32,
62 current_para_block_head: Option<HeadData>,
63 relay_blocks_per_para_block: u32,
64 relay_slot: u64,
65 upgrade_go_ahead: Option<UpgradeGoAhead>,
66) -> (
67 sp_timestamp::InherentDataProvider,
68 MockValidationDataInherentDataProvider<()>,
69) {
70 let relay_offset = (relay_slot as u32)
71 .saturating_sub(relay_blocks_per_para_block.saturating_mul(current_para_block));
72
73 let local_host_config = AbridgedHostConfiguration {
74 max_code_size: 16 * 1024 * 1024, max_head_data_size: 1024 * 1024,
76 max_upward_queue_count: 8,
77 max_upward_queue_size: 1024,
78 max_upward_message_size: 256,
79 max_upward_message_num_per_candidate: 5,
80 hrmp_max_message_num_per_candidate: 5,
81 validation_upgrade_cooldown: 6,
82 validation_upgrade_delay: 6,
83 async_backing_params: AsyncBackingParams {
84 allowed_ancestry_len: 0,
85 max_candidate_depth: 0,
86 },
87 };
88
89 let mocked_parachain = MockValidationDataInherentDataProvider::<()> {
90 current_para_block,
91 para_id,
92 current_para_block_head,
93 relay_blocks_per_para_block,
94 relay_offset,
95 para_blocks_per_relay_epoch: 10,
96 upgrade_go_ahead,
97 additional_key_values: Some(vec![(
98 well_known_keys::ACTIVE_CONFIG.to_vec(),
99 local_host_config.encode(),
100 )]),
101 ..Default::default()
102 };
103
104 let timestamp = relay_slot.saturating_mul(RELAY_CHAIN_SLOT_DURATION_MILLIS);
105 let timestamp_provider = sp_timestamp::InherentDataProvider::new(timestamp.into());
106
107 (timestamp_provider, mocked_parachain)
108}
109
110impl<B, C> LocalPendingInherentDataProvider<B, C> {
111 pub fn new(client: Arc<C>, para_id: ParaId) -> Self {
113 Self {
114 client,
115 para_id,
116 phantom_data: Default::default(),
117 }
118 }
119}
120
121#[async_trait::async_trait]
122impl<B, C> sp_inherents::CreateInherentDataProviders<B, ()>
123 for LocalPendingInherentDataProvider<B, C>
124where
125 B: BlockT,
126 C: HeaderBackend<B> + Send + Sync,
127{
128 type InherentDataProviders = (
129 sp_timestamp::InherentDataProvider,
130 MockValidationDataInherentDataProvider<()>,
131 );
132
133 async fn create_inherent_data_providers(
134 &self,
135 _parent: B::Hash,
136 _extra_args: (),
137 ) -> Result<Self::InherentDataProviders, Box<dyn std::error::Error + Send + Sync>> {
138 let relay_slot = std::time::SystemTime::now()
139 .duration_since(std::time::UNIX_EPOCH)
140 .expect("Current time is always after UNIX_EPOCH; qed")
141 .as_millis() as u64
142 / RELAY_CHAIN_SLOT_DURATION_MILLIS;
143
144 let current_para_block = self
145 .client
146 .header(_parent)?
147 .map(|header| {
148 UniqueSaturatedInto::<u32>::unique_saturated_into(*header.number())
149 .saturating_add(1)
150 })
151 .unwrap_or(1);
152
153 let current_para_block_head = self
154 .client
155 .header(_parent)?
156 .map(|header| header.encode().into());
157
158 let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
159 self.para_id,
160 current_para_block,
161 current_para_block_head,
162 1,
163 relay_slot,
164 None,
165 );
166
167 Ok((timestamp_provider, mocked_parachain))
168 }
169}
170
171#[cfg(feature = "runtime-benchmarks")]
173pub type HostFunctions = (
174 frame_benchmarking::benchmarking::HostFunctions,
175 cumulus_client_service::ParachainHostFunctions,
176 moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
177);
178
179#[cfg(not(feature = "runtime-benchmarks"))]
181pub type HostFunctions = (
182 cumulus_client_service::ParachainHostFunctions,
183 moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
184);
185
186type ParachainExecutor = WasmExecutor<HostFunctions>;
187
188type FullClient = sc_service::TFullClient<Block, RuntimeApi, ParachainExecutor>;
189type FullBackend = sc_service::TFullBackend<Block>;
190type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>;
191
192pub fn new_partial(
194 config: &Configuration,
195 evm_tracing_config: &FrontierConfig,
196) -> Result<
197 sc_service::PartialComponents<
198 FullClient,
199 FullBackend,
200 FullSelectChain,
201 sc_consensus::DefaultImportQueue<Block>,
202 sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
203 (
204 FrontierBlockImport<Block, Arc<FullClient>, FullClient>,
205 Option<Telemetry>,
206 Arc<fc_db::Backend<Block, FullClient>>,
207 ),
208 >,
209 ServiceError,
210> {
211 let telemetry = config
212 .telemetry_endpoints
213 .clone()
214 .filter(|x| !x.is_empty())
215 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
216 let worker = TelemetryWorker::new(16)?;
217 let telemetry = worker.handle().new_telemetry(endpoints);
218 Ok((worker, telemetry))
219 })
220 .transpose()?;
221
222 let heap_pages = config
223 .executor
224 .default_heap_pages
225 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
226 extra_pages: h as _,
227 });
228
229 let executor = ParachainExecutor::builder()
230 .with_execution_method(config.executor.wasm_method)
231 .with_onchain_heap_alloc_strategy(heap_pages)
232 .with_offchain_heap_alloc_strategy(heap_pages)
233 .with_max_runtime_instances(config.executor.max_runtime_instances)
234 .with_runtime_cache_size(config.executor.runtime_cache_size)
235 .build();
236
237 let (client, backend, keystore_container, task_manager) =
238 sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
239 config,
240 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
241 executor,
242 true,
243 )?;
244 let client = Arc::new(client);
245 let telemetry = telemetry.map(|(worker, telemetry)| {
246 task_manager
247 .spawn_handle()
248 .spawn("telemetry", None, worker.run());
249 telemetry
250 });
251 let select_chain = sc_consensus::LongestChain::new(backend.clone());
252 let transaction_pool = sc_transaction_pool::Builder::new(
253 task_manager.spawn_essential_handle(),
254 client.clone(),
255 config.role.is_authority().into(),
256 )
257 .with_options(config.transaction_pool.clone())
258 .with_prometheus(config.prometheus_registry())
259 .build();
260 let frontier_backend = Arc::new(crate::rpc::open_frontier_backend(
261 client.clone(),
262 config,
263 evm_tracing_config,
264 )?);
265 let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
266
267 let import_queue = sc_consensus_manual_seal::import_queue(
268 Box::new(client.clone()),
269 &task_manager.spawn_essential_handle(),
270 config.prometheus_registry(),
271 );
272
273 Ok(sc_service::PartialComponents {
274 client,
275 backend,
276 task_manager,
277 import_queue,
278 keystore_container,
279 select_chain,
280 transaction_pool: transaction_pool.into(),
281 other: (frontier_block_import, telemetry, frontier_backend),
282 })
283}
284
285pub fn start_node<N>(
287 mut config: Configuration,
288 evm_tracing_config: FrontierConfig,
289) -> Result<TaskManager, ServiceError>
290where
291 N: NetworkBackend<Block, <Block as BlockT>::Hash>,
292{
293 let sc_service::PartialComponents {
294 client,
295 backend,
296 mut task_manager,
297 import_queue,
298 keystore_container,
299 select_chain,
300 transaction_pool,
301 other: (block_import, mut telemetry, frontier_backend),
302 } = new_partial(&config, &evm_tracing_config)?;
303
304 config.network.default_peers_set.in_peers = 0;
306 config.network.default_peers_set.out_peers = 0;
307
308 let net_config = sc_network::config::FullNetworkConfiguration::<_, _, N>::new(
309 &config.network,
310 config.prometheus_registry().cloned(),
311 );
312
313 let metrics = N::register_notification_metrics(
314 config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
315 );
316 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
317 sc_service::build_network(sc_service::BuildNetworkParams {
318 config: &config,
319 net_config,
320 client: client.clone(),
321 transaction_pool: transaction_pool.clone(),
322 spawn_handle: task_manager.spawn_handle(),
323 import_queue,
324 block_announce_validator_builder: None,
325 warp_sync_config: None,
326 block_relay: None,
327 metrics,
328 })?;
329
330 if config.offchain_worker.enabled {
331 task_manager.spawn_handle().spawn(
332 "offchain-workers-runner",
333 "offchain-work",
334 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
335 runtime_api_provider: client.clone(),
336 keystore: Some(keystore_container.keystore()),
337 offchain_db: backend.offchain_storage(),
338 transaction_pool: Some(OffchainTransactionPoolFactory::new(
339 transaction_pool.clone(),
340 )),
341 network_provider: Arc::new(network.clone()),
342 is_validator: config.role.is_authority(),
343 enable_http_requests: true,
344 custom_extensions: move |_| vec![],
345 })?
346 .run(client.clone(), task_manager.spawn_handle())
347 .boxed(),
348 );
349 }
350
351 let filter_pool: FilterPool = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
352 let fee_history_cache: FeeHistoryCache = Arc::new(std::sync::Mutex::new(BTreeMap::new()));
353 let storage_override = Arc::new(StorageOverrideHandler::new(client.clone()));
354
355 let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
360 fc_mapping_sync::EthereumBlockNotification<Block>,
361 > = Default::default();
362 let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
363
364 let ethapi_cmd = evm_tracing_config.ethapi.clone();
365
366 let tracing_requesters =
367 if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
368 tracing::spawn_tracing_tasks(
369 &evm_tracing_config,
370 config.prometheus_registry().cloned(),
371 tracing::SpawnTasksParams {
372 task_manager: &task_manager,
373 client: client.clone(),
374 substrate_backend: backend.clone(),
375 frontier_backend: frontier_backend.clone(),
376 storage_override: storage_override.clone(),
377 },
378 )
379 } else {
380 tracing::RpcRequesters {
381 debug: None,
382 trace: None,
383 }
384 };
385
386 match frontier_backend.as_ref() {
389 fc_db::Backend::KeyValue(ref b) => {
390 task_manager.spawn_essential_handle().spawn(
391 "frontier-mapping-sync-worker",
392 Some("frontier"),
393 fc_mapping_sync::kv::MappingSyncWorker::new(
394 client.import_notification_stream(),
395 Duration::new(6, 0),
396 client.clone(),
397 backend.clone(),
398 storage_override.clone(),
399 b.clone(),
400 3,
401 0,
402 fc_mapping_sync::SyncStrategy::Parachain,
403 sync_service.clone(),
404 pubsub_notification_sinks.clone(),
405 )
406 .for_each(|()| futures::future::ready(())),
407 );
408 }
409 fc_db::Backend::Sql(ref b) => {
410 task_manager.spawn_essential_handle().spawn_blocking(
411 "frontier-mapping-sync-worker",
412 Some("frontier"),
413 fc_mapping_sync::sql::SyncWorker::run(
414 client.clone(),
415 backend.clone(),
416 b.clone(),
417 client.import_notification_stream(),
418 fc_mapping_sync::sql::SyncWorkerConfig {
419 read_notification_timeout: Duration::from_secs(10),
420 check_indexed_blocks_interval: Duration::from_secs(60),
421 },
422 fc_mapping_sync::SyncStrategy::Parachain,
423 sync_service.clone(),
424 pubsub_notification_sinks.clone(),
425 ),
426 );
427 }
428 }
429
430 const FILTER_RETAIN_THRESHOLD: u64 = 100;
433 task_manager.spawn_essential_handle().spawn(
434 "frontier-filter-pool",
435 Some("frontier"),
436 fc_rpc::EthTask::filter_pool_task(
437 client.clone(),
438 filter_pool.clone(),
439 FILTER_RETAIN_THRESHOLD,
440 ),
441 );
442
443 const FEE_HISTORY_LIMIT: u64 = 2048;
444 task_manager.spawn_essential_handle().spawn(
445 "frontier-fee-history",
446 Some("frontier"),
447 fc_rpc::EthTask::fee_history_task(
448 client.clone(),
449 storage_override.clone(),
450 fee_history_cache.clone(),
451 FEE_HISTORY_LIMIT,
452 ),
453 );
454
455 let role = config.role.clone();
456 let prometheus_registry = config.prometheus_registry().cloned();
457 let is_authority = config.role.is_authority();
458
459 let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
460 task_manager.spawn_handle(),
461 storage_override.clone(),
462 50,
463 50,
464 prometheus_registry.clone(),
465 ));
466
467 let (command_sink, commands_stream) = futures::channel::mpsc::channel(1024);
469 let local_para_id = ParaId::from(
470 crate::parachain::chain_spec::Extensions::try_get(&*config.chain_spec)
471 .map(|e| e.para_id)
472 .unwrap_or(2000),
473 );
474
475 let rpc_extensions_builder = {
476 let client = client.clone();
477 let network = network.clone();
478 let transaction_pool = transaction_pool.clone();
479 let sync = sync_service.clone();
480 let pubsub_notification_sinks = pubsub_notification_sinks.clone();
481
482 Box::new(move |subscription| {
483 let deps = crate::rpc::FullDeps {
484 client: client.clone(),
485 pool: transaction_pool.clone(),
486 graph: transaction_pool.clone(),
487 network: network.clone(),
488 sync: sync.clone(),
489 is_authority,
490 frontier_backend: match *frontier_backend {
491 fc_db::Backend::KeyValue(ref b) => b.clone(),
492 fc_db::Backend::Sql(ref b) => b.clone(),
493 },
494 filter_pool: filter_pool.clone(),
495 fee_history_limit: FEE_HISTORY_LIMIT,
496 fee_history_cache: fee_history_cache.clone(),
497 block_data_cache: block_data_cache.clone(),
498 storage_override: storage_override.clone(),
499 enable_evm_rpc: true, command_sink: Some(command_sink.clone()),
501 };
502
503 crate::rpc::create_full_local_dev(
504 deps,
505 subscription,
506 pubsub_notification_sinks.clone(),
507 local_para_id,
508 crate::rpc::EvmTracingConfig {
509 tracing_requesters: tracing_requesters.clone(),
510 trace_filter_max_count: evm_tracing_config.ethapi_trace_max_count,
511 enable_txpool: ethapi_cmd.contains(&EthApiCmd::TxPool),
512 },
513 )
514 .map_err::<ServiceError, _>(Into::into)
515 })
516 };
517
518 let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
519 network: network.clone(),
520 client: client.clone(),
521 keystore: keystore_container.keystore(),
522 task_manager: &mut task_manager,
523 transaction_pool: transaction_pool.clone(),
524 rpc_builder: rpc_extensions_builder,
525 backend,
526 system_rpc_tx,
527 tx_handler_controller,
528 sync_service: sync_service.clone(),
529 config,
530 telemetry: telemetry.as_mut(),
531 })?;
532
533 if role.is_authority() {
534 let proposer_factory = sc_basic_authorship::ProposerFactory::new(
535 task_manager.spawn_handle(),
536 client.clone(),
537 transaction_pool.clone(),
538 prometheus_registry.as_ref(),
539 telemetry.as_ref().map(|x| x.handle()),
540 );
541
542 let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
543
544 let para_id = local_para_id;
545 let initial_relay_slot = std::time::SystemTime::now()
546 .duration_since(std::time::UNIX_EPOCH)
547 .expect("Current time is always after UNIX_EPOCH; qed")
548 .sub(Duration::from_secs(2 * 60 * 60))
549 .as_millis() as u64
550 / RELAY_CHAIN_SLOT_DURATION_MILLIS;
551
552 let aura =
553 sc_consensus_manual_seal::run_manual_seal(sc_consensus_manual_seal::ManualSealParams {
554 block_import,
555 env: proposer_factory,
556 client: client.clone(),
557 pool: transaction_pool.clone(),
558 commands_stream,
559 select_chain,
560 consensus_data_provider: Some(Box::new(
561 sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider::new(
562 client.clone(),
563 ),
564 )),
565 create_inherent_data_providers: move |parent_hash, ()| {
566 let client = client.clone();
567 async move {
568 let current_para_head = client
569 .header(parent_hash)
570 .expect("Header lookup should succeed")
571 .expect("Header passed in as parent should be present in backend.");
572
573 let should_send_go_ahead = client
574 .runtime_api()
575 .collect_collation_info(parent_hash, ¤t_para_head)
576 .map(|info| info.new_validation_code.is_some())
577 .unwrap_or_default();
578
579 let current_para_block = UniqueSaturatedInto::<u32>::unique_saturated_into(
580 *current_para_head.number(),
581 ) + 1;
582
583 let relay_blocks_per_para_block =
584 (slot_duration.as_millis() / RELAY_CHAIN_SLOT_DURATION_MILLIS).max(1)
585 as u32;
586 let current_para_block_u64 = u64::from(current_para_block);
587 let relay_blocks_per_para_block_u64 =
588 u64::from(relay_blocks_per_para_block);
589 let target_relay_slot = initial_relay_slot.saturating_add(
590 current_para_block_u64.saturating_mul(relay_blocks_per_para_block_u64),
591 );
592
593 let current_para_block_head = Some(current_para_head.encode().into());
594
595 let (timestamp_provider, mocked_parachain) = build_local_mock_inherent_data(
596 para_id,
597 current_para_block,
598 current_para_block_head,
599 relay_blocks_per_para_block,
600 target_relay_slot,
601 should_send_go_ahead.then_some(UpgradeGoAhead::GoAhead),
602 );
603
604 Ok((timestamp_provider, mocked_parachain))
605 }
606 },
607 });
608
609 task_manager
610 .spawn_essential_handle()
611 .spawn_blocking("aura", Some("block-authoring"), aura);
612 }
613
614 Ok(task_manager)
615}