7. Custom Workflow#
This example shows how to do a fully-automated run of the introductory T20S tutorial workflow from in the CryoSPARC Guide. It includes the following steps:
Import Movies
Motion Correction
CTF Estimation
Curate Exposures
Blob Picker
Template Picker
Inspect Picks
Extract Particles
2D Classification for Blob Picks
2D Classification for Template Picks
Select 2D Classes
Ab-Initio Reconstruction
Homogeneous Refinement
Use this example as a template for writing automated cryo-EM workflows that may be repeated with different datasets.
7.1. Import Movies#
First initialize a connection to CryoSPARC, find the target project and workspace where the workflow will run, and set a scheduler lane where jobs will be queued to.
from cryosparc.tools import CryoSPARC
cs = CryoSPARC(host="cryoem0.sbi", base_port=40000)
assert cs.test_connection()
project = cs.find_project("P251")
workspace = project.find_workspace("W10")
lane = "cryoem3"
Connection succeeded to CryoSPARC command_core at http://cryoem0.sbi:40002
Connection succeeded to CryoSPARC command_vis at http://cryoem0.sbi:40003
Connection succeeded to CryoSPARC command_rtp at http://cryoem0.sbi:40005
Import the movies with an Import Movies job. Note that you may use the CryoSPARC.get_job_sections
method to inspect available job type keys to use with Workspace.create_job
.
job_sections = cs.get_job_sections() # [{'contains': ['import_movies', 'import_micrographs', ...] ... }, ...]
import_movies_job = workspace.create_job(
"import_movies",
params={
"blob_paths": "/bulk5/data/EMPIAR/10025/data/empiar_10025_subset/*.tif",
"gainref_path": "/bulk5/data/EMPIAR/10025/data/empiar_10025_subset/norm-amibox05-0.mrc",
"psize_A": 0.6575,
"accel_kv": 300,
"cs_mm": 2.7,
"total_dose_e_per_A2": 53,
},
)
You may inspect any job’s internal document to view available parameter keys, their standard titles, type and default values:
import_movies_job.print_param_spec()
Param | Title | Type | Default
=============================================================================
accel_kv | Accelerating Voltage (kV) | number | None
blob_paths | Movies data path | path | None
cs_mm | Spherical Aberration (mm) | number | None
defect_path | Defect file path | path | None
eer_num_fractions | EER Number of Fractions | number | 40
eer_upsamp_factor | EER Upsampling Factor | number | 2
gainref_flip_x | Flip gain ref & defect file in X? | boolean | False
gainref_flip_y | Flip gain ref & defect file in Y? | boolean | False
gainref_path | Gain reference path | path | None
gainref_rotate_num | Rotate gain ref? | number | 0
negative_stain_data | Negative Stain Data | boolean | False
output_constant_ctf | Output Constant CTF | boolean | False
override_exp_group_id | Override Exposure Group ID | number | None
phase_plate_data | Phase Plate Data | boolean | False
psize_A | Raw pixel size (A) | number | None
skip_header_check | Skip Header Check | boolean | False
total_dose_e_per_A2 | Total exposure dose (e/A^2) | number | None
Make further parameter values with Job.set_param
while the job is in ‘building’ status.
import_movies_job.set_param("skip_header_check", True)
True
Queue and run the job. Wait until it completes.
import_movies_job.queue(lane)
import_movies_job.wait_for_done()
'completed'
7.2. Motion Correction and CTF Estimation#
Repeat with Patch Motion Correction and Patch CTF Estimation jobs. Use the connections
parameter to connect the jobs to the Import Movies job and to each other.
Both jobs may be queued at the same time. The CryoSPARC scheduler ensures both run to completion.
motion_correction_job = workspace.create_job(
"patch_motion_correction_multi",
connections={"movies": (import_movies_job.uid, "imported_movies")},
params={"compute_num_gpus": 2},
)
ctf_estimation_job = workspace.create_job(
"patch_ctf_estimation_multi",
connections={"exposures": (motion_correction_job.uid, "micrographs")},
params={"compute_num_gpus": 2},
)
motion_correction_job.queue(lane)
ctf_estimation_job.queue(lane)
motion_correction_job.wait_for_done(), ctf_estimation_job.wait_for_done()
('completed', 'completed')
7.3. Curate Exposures#
Use half the micrographs to pick particles with the Blob picker. These will be used to generate more precise template-based picks on the full dataset. This requires running a Curate Exposures interactive job.
Note
Interactive jobs are special jobs that allow visual adjustment of data curation parameters from the CryoSPARC web interface. The following interactive jobs are used in this workflow:
Curate Exposures
Inspect Picks
Select 2D Classes
When queued, interactive jobs soon enter status “waiting” (unlike regular jobs which get status “running”). This means they are ready for interaction from the CryoSPARC interface.
After the job enters “waiting” status, either interact with the job from the
CryoSPARC interface or use the Job.interact
method to
programmatically invoke the same interactive actions.
Example interactive invocation for a Curate Exposures job:
data = job.interact("get_fields_and_thresholds")
This returns a curation data structure which may be mutated in Python and written back with the following:
job.interact("set_thresholds", data)
An interactive job has a shutdown function that may be invoked when interaction is complete. Example shutdown invocations for different interactive job types:
Job Type |
Shutdown Function |
---|---|
Manual Picker |
|
Curate Exposures |
|
Inspect Picks |
|
Select 2D Classes |
|
Build and queue a Curate Exposures job and wait for “waiting” status.
curate_exposures_job = workspace.create_job(
"curate_exposures_v2",
connections={"exposures": (ctf_estimation_job.uid, "exposures")},
)
curate_exposures_job.queue()
curate_exposures_job.wait_for_status("waiting")
'waiting'
Either curate exposures from the CryoSPARC interface or use Job.interact
method to perform interactive job actions, as follows:
from cryosparc.util import first
data = curate_exposures_job.interact("get_fields_and_thresholds")
idx_field = first(field for field in data["fields"] if field["name"] == "idx")
assert idx_field
idx_field["thresholds"] = [5, 14]
idx_field["active"] = True
curate_exposures_job.interact("set_thresholds", data)
curate_exposures_job.interact("shutdown_interactive")
curate_exposures_job.wait_for_done()
'completed'
Detailed explanation of the previous code block:
Call
get_fields_and_thresholds
to get a dictionary with afields
key. The value is a list of adjustable curation fields end thresholds. Each item has this format:{ 'name': str, 'title': str 'short': str, 'active': bool, 'range': [number, number], 'thresholds': [number, number], }
For each field to threshold (just the Index field in this case):
Modify the
thresholds
list to[MIN, MAX]
, whereMIN
is a number greater than or equal to the first item inrange
MAX
is a number less than or equal to the second item inrange
Set
active
toTrue
to enable the threshold
Call
set_thresholds
with the modified dictionaryCall
shutdown_interactive
to finish curating and wait until the job is Completed.
7.4. Blob Picker#
The complated curation job will have 10 accepted and 10 rejected exposures. Provide the accepted ones as input to the Blob Picker.
blob_picker_job = workspace.create_job(
"blob_picker_gpu",
connections={"micrographs": (curate_exposures_job.uid, "exposures_accepted")},
params={"diameter": 100, "diameter_max": 200},
)
blob_picker_job.queue(lane)
blob_picker_job.wait_for_done()
'completed'
7.5. Inspect Picks#
Create an Inspect Picks job and interact with it similarly to Curate Exposures.
inspect_blob_picks_job = workspace.create_job(
"inspect_picks_v2",
connections={
"micrographs": (blob_picker_job.uid, "micrographs"),
"particles": (blob_picker_job.uid, "particles"),
},
)
inspect_blob_picks_job.queue()
inspect_blob_picks_job.wait_for_status("waiting")
inspect_blob_picks_job.interact(
"set_thresholds",
{"ncc_score_thresh": 0.3, "lpower_thresh_min": 600, "lpower_thresh_max": 1000},
)
inspect_blob_picks_job.interact("shutdown_interactive")
inspect_blob_picks_job.wait_for_done()
'completed'
7.6. 2D Classification#
Extract the selected particles and classify them with a 2D Classification job.
extract_blob_picks_job = workspace.create_job(
"extract_micrographs_cpu_parallel",
connections={
"micrographs": (inspect_blob_picks_job.uid, "micrographs"),
"particles": (inspect_blob_picks_job.uid, "particles"),
},
params={"box_size_pix": 448},
)
classify_blob_picks_job = workspace.create_job(
"class_2D",
connections={"particles": (extract_blob_picks_job.uid, "particles")},
params={"class2D_K": 10},
)
extract_blob_picks_job.queue(lane)
classify_blob_picks_job.queue(lane)
extract_blob_picks_job.wait_for_done(), classify_blob_picks_job.wait_for_done()
('completed', 'completed')
7.7. Select 2D Classes#
Create a Select 2D Classes job and either select templates from the CryoSPARC interface or interact with the job as follows:
select_blob_templates_job = workspace.create_job(
"select_2D",
connections={
"particles": (classify_blob_picks_job.uid, "particles"),
"templates": (classify_blob_picks_job.uid, "class_averages"),
},
)
select_blob_templates_job.queue()
select_blob_templates_job.wait_for_status("waiting")
# Auto-interact
class_info = select_blob_templates_job.interact("get_class_info")
for c in class_info:
if 1.0 < c["res_A"] < 19.0 and c["num_particles_total"] > 900:
select_blob_templates_job.interact(
"set_class_selected",
{
"class_idx": c["class_idx"],
"selected": True,
},
)
select_blob_templates_job.interact("finish")
select_blob_templates_job.wait_for_done()
'completed'
7.8. Template Picker#
Create and run a Template Picker job with all micrographs.
template_picker_job = workspace.create_job(
"template_picker_gpu",
connections={
"micrographs": (ctf_estimation_job.uid, "exposures"),
"templates": (select_blob_templates_job.uid, "templates_selected"),
},
params={"diameter": 200},
)
template_picker_job.queue(lane)
template_picker_job.wait_for_done()
'completed'
Repeat all previous steps from Inspect Picks to Select 2D, using the template picks as input. Note that when queuing a series of connected jobs, only interactive jobs and the last job in the chain need to be waited on.
For example, given the following job chain:
Inspect Picks -> Extract -> 2D Classification -> Select 2D Classes
Queue all the jobs
Wait for Inspect Picks to be interactive
Invoke
shutdown_interactive
when finished interactingWait for Select 2D Classes to be interactive (occurs after Extraction and 2D Classification complete)
Shutdown when finished interacting
Wait for Select 2D to be done
# Create and connect jobs
inspect_template_picks_job = workspace.create_job(
"inspect_picks_v2",
connections={
"micrographs": (template_picker_job.uid, "micrographs"),
"particles": (template_picker_job.uid, "particles"),
},
)
extract_template_picks_job = workspace.create_job(
"extract_micrographs_cpu_parallel",
connections={
"micrographs": (inspect_template_picks_job.uid, "micrographs"),
"particles": (inspect_template_picks_job.uid, "particles"),
},
params={"box_size_pix": 448},
)
classify_template_picks_job = workspace.create_job(
"class_2D",
connections={"particles": (extract_template_picks_job.uid, "particles")},
params={"class2D_K": 50},
)
select_templates_job = workspace.create_job(
"select_2D",
connections={
"particles": (classify_template_picks_job.uid, "particles"),
"templates": (classify_template_picks_job.uid, "class_averages"),
},
)
# Queue Jobs
inspect_template_picks_job.queue()
extract_template_picks_job.queue(lane)
classify_template_picks_job.queue(lane)
select_templates_job.queue()
# Inspect template picks
inspect_template_picks_job.wait_for_status("waiting")
inspect_template_picks_job.interact(
"set_thresholds",
{"ncc_score_thresh": 0.3, "lpower_thresh_min": 900.0, "lpower_thresh_max": 1800.0},
)
inspect_template_picks_job.interact("shutdown_interactive")
# Select 2D Classes
select_templates_job.wait_for_status("waiting")
class_info = select_templates_job.interact("get_class_info")
for c in class_info:
if 1.0 < c["res_A"] < 19.0 and c["num_particles_total"] > 100:
select_templates_job.interact(
"set_class_selected",
{
"class_idx": c["class_idx"],
"selected": True,
},
)
select_templates_job.interact("finish")
select_templates_job.wait_for_done()
'completed'
7.9. Reconstruction and Refinement#
Finally, queue and run Ab-Initio Reconstruction and Homogeneous Refinement jobs.
abinit_job = workspace.create_job(
"homo_abinit",
connections={"particles": (select_templates_job.uid, "particles_selected")},
)
refine_job = workspace.create_job(
"homo_refine_new",
connections={
"particles": (abinit_job.uid, "particles_all_classes"),
"volume": (abinit_job.uid, "volume_class_0"),
},
params={
"refine_symmetry": "D7",
"refine_defocus_refine": True,
"refine_ctf_global_refine": True,
},
)
abinit_job.queue(lane)
refine_job.queue(lane)
abinit_job.wait_for_done(), refine_job.wait_for_done()
('completed', 'completed')