Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,24 +281,6 @@ pub struct WorkerStartOpts {
/// It should *NOT* be placed on a network filesystem.
#[arg(long)]
pub work_dir: Option<PathBuf>,

/// The maximal parallel downloads for data objects
#[arg(long, default_value = "4")]
pub max_parallel_downloads: u32,

/// The maximal data object download tries
///
/// Specifies how many times the worker tries to download a data object
/// from the remote side before download is considered as failed.
#[arg(long, default_value = "8")]
pub max_download_tries: u32,

#[arg(long,
default_value = "1s", value_parser = parse_hms_or_human_time,
help = duration_doc!("The delay between download attempts\n\nSets how long to wait between failed downloads of data object. This time is multiplied by the number of previous retries. Therefore between 4th and 5th retry it waits 4 * the given duration"),
value_name = "TIME")
]
pub wait_between_download_tries: Duration,
}

pub async fn start_hq_worker(
Expand Down Expand Up @@ -344,24 +326,13 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
hostname,
on_server_lost,
work_dir,
max_parallel_downloads,
max_download_tries,
wait_between_download_tries,
} = opts;

let detect_resources = detect_resources_cli
.clone()
.map(|p| p.into_parsed_arg())
.unwrap_or_default();

if max_download_tries == 0 {
bail!("--max-download-tries cannot be zero");
}

if max_parallel_downloads == 0 {
bail!("--max-parallel-downloads cannot be zero");
}

let hostname = get_hostname(hostname);
let mut resources: Vec<_> = resource.into_iter().map(|x| x.into_parsed_arg()).collect();
let cpu_resources = resources.iter().find(|x| x.name == CPU_RESOURCE_NAME);
Expand Down Expand Up @@ -480,9 +451,6 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
idle_timeout,
overview_configuration,
extra,
max_parallel_downloads,
max_download_tries,
wait_between_download_tries,
})
}

Expand Down
10 changes: 0 additions & 10 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,6 @@ impl Output for CliOutput {
.cell(),
],
vec!["Group".cell().bold(true), configuration.group.cell()],
vec![
"Downloads".cell().bold(true),
format!(
"{} parallel; max {} fails + {} delay",
configuration.max_parallel_downloads,
configuration.max_download_tries,
format_duration(configuration.wait_between_download_tries)
)
.cell(),
],
vec![
"Manager".cell().bold(true),
manager_info
Expand Down
6 changes: 0 additions & 6 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,6 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
time_limit,
on_server_lost,
group,
max_parallel_downloads,
max_download_tries,
wait_between_download_tries,
extra: _,
},
started,
Expand All @@ -579,9 +576,6 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
"listen_address": listen_address,
"resources": format_resource_descriptor(&resources),
"on_server_lost": crate::common::format::server_lost_policy_to_str(&on_server_lost),
"max_parallel_downloads": max_parallel_downloads,
"max_download_tries": max_download_tries,
"wait_between_download_tries": format_duration(wait_between_download_tries),
}),
"allocation": manager_info.map(|info| json!({
"manager": FormattedManagerType(info.manager),
Expand Down
8 changes: 1 addition & 7 deletions crates/pyhq/src/cluster/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use hyperqueue::common::utils::network::get_hostname;
use tokio::task::LocalSet;

use hyperqueue::worker::bootstrap::{finalize_configuration, initialize_worker};
use tako::internal::worker::configuration::{
DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration,
};
use tako::internal::worker::configuration::OverviewConfiguration;
use tako::resources::{
CPU_RESOURCE_NAME, ResourceDescriptor, ResourceDescriptorItem, ResourceDescriptorKind,
ResourceIndex,
Expand Down Expand Up @@ -55,9 +52,6 @@ impl RunningWorker {
idle_timeout: None,
time_limit: None,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS,
max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES,
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
extra: Default::default(),
};
finalize_configuration(&mut configuration);
Expand Down
3 changes: 0 additions & 3 deletions crates/tako/src/internal/scheduler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ pub(crate) fn compute_new_worker_query(
overview_configuration: Default::default(),
idle_timeout: None,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: 0,
max_download_tries: 0,
wait_between_download_tries: Default::default(),
extra: Default::default(),
};
let worker = Worker::new(worker_id, configuration, &resource_map, now);
Expand Down
8 changes: 1 addition & 7 deletions crates/tako/src/internal/tests/integration/utils/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use std::time::Duration;

use crate::internal::common::error::DsError;
use crate::internal::common::resources::ResourceDescriptor;
use crate::internal::worker::configuration::{
DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration,
};
use crate::internal::worker::configuration::OverviewConfiguration;
use crate::launcher::{StopReason, TaskBuildContext, TaskResult};
use crate::program::ProgramDefinition;
use crate::worker::WorkerConfiguration;
Expand Down Expand Up @@ -81,9 +78,6 @@ pub fn create_worker_configuration(
},
idle_timeout,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS,
max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES,
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
time_limit: None,
extra,
},
Expand Down
11 changes: 1 addition & 10 deletions crates/tako/src/internal/tests/test_reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use crate::internal::tests::utils::shared::{res_kind_groups, res_kind_sum};
use crate::internal::tests::utils::sorted_vec;
use crate::internal::tests::utils::task::{TaskBuilder, task_running_msg};
use crate::internal::tests::utils::workflows::{submit_example_1, submit_example_3};
use crate::internal::worker::configuration::{
DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration,
};
use crate::internal::worker::configuration::OverviewConfiguration;
use crate::resources::{ResourceAmount, ResourceDescriptorItem, ResourceIdMap};
use crate::tests::utils::env::{TestComm, TestEnv};
use crate::tests::utils::worker::WorkerBuilder;
Expand Down Expand Up @@ -48,9 +45,6 @@ fn test_worker_add() {
idle_timeout: None,
time_limit: None,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS,
max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES,
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
extra: Default::default(),
group: "default".to_string(),
};
Expand Down Expand Up @@ -108,9 +102,6 @@ fn test_worker_add() {
idle_timeout: None,
time_limit: None,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS,
max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES,
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
extra: Default::default(),
};

Expand Down
8 changes: 1 addition & 7 deletions crates/tako/src/internal/tests/utils/worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::WorkerId;
use crate::internal::server::worker::Worker;
use crate::internal::worker::configuration::{
DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration,
};
use crate::internal::worker::configuration::OverviewConfiguration;
use crate::resources::{ResourceDescriptor, ResourceDescriptorItem, ResourceIdMap};
use crate::worker::{ServerLostPolicy, WorkerConfiguration};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -74,9 +71,6 @@ impl WorkerBuilder {
idle_timeout: None,
time_limit: self.time_limit,
on_server_lost: ServerLostPolicy::Stop,
max_parallel_downloads: DEFAULT_MAX_PARALLEL_DOWNLOADS,
max_download_tries: DEFAULT_MAX_DOWNLOAD_TRIES,
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
extra: Default::default(),
}
}
Expand Down
8 changes: 0 additions & 8 deletions crates/tako/src/internal/worker/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ impl OverviewConfiguration {
}
}

pub const DEFAULT_MAX_PARALLEL_DOWNLOADS: u32 = 4;
pub const DEFAULT_MAX_DOWNLOAD_TRIES: u32 = 8;
pub const DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES: Duration = Duration::from_secs(1);

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerConfiguration {
pub resources: ResourceDescriptor,
Expand All @@ -52,10 +48,6 @@ pub struct WorkerConfiguration {
pub idle_timeout: Option<Duration>,
pub time_limit: Option<Duration>,
pub on_server_lost: ServerLostPolicy,
pub max_parallel_downloads: u32,
pub max_download_tries: u32,
pub wait_between_download_tries: Duration,

pub extra: Map<String, String>,
}

Expand Down
3 changes: 0 additions & 3 deletions tests/output/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ def test_print_worker_info(hq_env: HqEnv):
"work_dir": str,
"group": str,
"on_server_lost": "stop",
"max_download_tries": int,
"max_parallel_downloads": int,
"wait_between_download_tries": float,
},
"allocation": None,
"started": str,
Expand Down
Loading