7. Custom Workflow#

This example shows how to do a fully-automated run of the introductory T20S tutorial workflow from in the CryoSPARC Guide. It includes the following steps:

  • Import Movies

  • Motion Correction

  • CTF Estimation

  • Curate Exposures

  • Blob Picker

  • Template Picker

  • Inspect Picks

  • Extract Particles

  • 2D Classification for Blob Picks

  • 2D Classification for Template Picks

  • Select 2D Classes

  • Ab-Initio Reconstruction

  • Homogeneous Refinement

Use this example as a template for writing automated cryo-EM workflows that may be repeated with different datasets.

7.1. Import Movies#

First initialize a connection to CryoSPARC, find the target project and workspace where the workflow will run, and set a scheduler lane where jobs will be queued to.

from cryosparc.tools import CryoSPARC

cs = CryoSPARC(host="cryoem0", base_port=40000)
assert cs.test_connection()

project = cs.find_project("P251")
workspace = project.find_workspace("W10")
lane = "cryoem3"
Connection succeeded to CryoSPARC command_core at http://cryoem0:40002
Connection succeeded to CryoSPARC command_vis at http://cryoem0:40003
Connection succeeded to CryoSPARC command_rtp at http://cryoem0:40005

Import the movies with an Import Movies job. Note that you may use the CryoSPARC.get_job_sections method to inspect available job type keys to use with Workspace.create_job.

job_sections = cs.get_job_sections()  # [{'contains': ['import_movies', 'import_micrographs', ...] ... }, ...]
import_movies_job = workspace.create_job(
    "import_movies",
    params={
        "blob_paths": "/bulk5/data/EMPIAR/10025/data/empiar_10025_subset/*.tif",
        "gainref_path": "/bulk5/data/EMPIAR/10025/data/empiar_10025_subset/norm-amibox05-0.mrc",
        "psize_A": 0.6575,
        "accel_kv": 300,
        "cs_mm": 2.7,
        "total_dose_e_per_A2": 53,
    },
)

You may inspect any job’s internal document to view available parameter keys, their standard titles, type and default values:

print(f"{'Param key':21s} | {'Title':33s} | {'Type':7s} | {'Default'}")
print("=" * 80)
for key, details in import_movies_job.doc["params_base"].items():
    print(f"{key:21s} | {details['title']:33s} | {details['type']:7s} | {details['value']}")
Param key             | Title                             | Type    | Default
================================================================================
accel_kv              | Accelerating Voltage (kV)         | number  | None
blob_paths            | Movies data path                  | path    | None
compute_num_cpus      | Number of CPUs to parallelize     | number  | 4
cs_mm                 | Spherical Aberration (mm)         | number  | None
defect_path           | Defect file path                  | path    | None
eer_num_fractions     | EER Number of Fractions           | number  | 40
eer_upsamp_factor     | EER Upsampling Factor             | number  | 2
gainref_flip_x        | Flip gain ref & defect file in X? | boolean | False
gainref_flip_y        | Flip gain ref & defect file in Y? | boolean | False
gainref_path          | Gain reference path               | path    | None
gainref_rotate_num    | Rotate gain ref?                  | number  | 0
negative_stain_data   | Negative Stain Data               | boolean | False
output_constant_ctf   | Output Constant CTF               | boolean | False
override_exp_group_id | Override Exposure Group ID        | number  | None
phase_plate_data      | Phase Plate Data                  | boolean | False
psize_A               | Raw pixel size (A)                | number  | None
skip_header_check     | Skip Header Check                 | boolean | True
total_dose_e_per_A2   | Total exposure dose (e/A^2)       | number  | None

Make further parameter values with Job.set_param while the job is in ‘building’ status.

import_movies_job.set_param("compute_num_cpus", 8)
True

Queue and run the job. Wait until it completes.

import_movies_job.queue(lane)
import_movies_job.wait_for_done()
'completed'

7.2. Motion Correction and CTF Estimation#

Repeat with Patch Motion Correction and Patch CTF Estimation jobs. Use the connections parameter to connect the jobs to the Import Movies job and to each other.

Both jobs may be queued at the same time. The CryoSPARC scheduler ensures both run to completion.

motion_correction_job = workspace.create_job(
    "patch_motion_correction_multi",
    connections={"movies": (import_movies_job.uid, "imported_movies")},
    params={"compute_num_gpus": 2},
)
ctf_estimation_job = workspace.create_job(
    "patch_ctf_estimation_multi",
    connections={"exposures": (motion_correction_job.uid, "micrographs")},
    params={"compute_num_gpus": 2},
)

motion_correction_job.queue(lane)
ctf_estimation_job.queue(lane)

motion_correction_job.wait_for_done(), ctf_estimation_job.wait_for_done()
('completed', 'completed')

7.3. Curate Exposures#

Use half the micrographs to pick particles with the Blob picker. These will be used to generate more precise template-based picks on the full dataset. This requires running a Curate Exposures interactive job.

Note

Interactive jobs are special jobs that allow visual adjustment of data curation parameters from the CryoSPARC web interface. The following interactive jobs are used in this workflow:

  • Curate Exposures

  • Inspect Picks

  • Select 2D Classes

When queued, interactive jobs soon enter status “waiting” (unlike regular jobs which get status “running”). This means they are ready for interaction from the CryoSPARC interface.

After the job enters “waiting” status, either interact with the job from the CryoSPARC interface or use the Job.interact method to programmatically invoke the same interactive actions.

Example interactive invocation for a Curate Exposures job:

data = job.interact("get_fields_and_thresholds")

This returns a curation data structure which may be mutated in Python and written back with the following:

job.interact("set_thresholds", data)

An interactive job has a shutdown function that may be invoked when interaction is complete. Example shutdown invocations for different interactive job types:

Job Type

Shutdown Function

Manual Picker

job.interact("begin_extract", {"box_size_pix": ..., "bin_size_pix": ...})

Curate Exposures

job.interact("shutdown_interactive")

Inspect Picks

job.interact("shutdown_interactive")

Select 2D Classes

job.interact("finish")

Build and queue a Curate Exposures job and wait for “waiting” status.

curate_exposures_job = workspace.create_job(
    "curate_exposures_v2",
    connections={"exposures": (ctf_estimation_job.uid, "exposures")},
)

curate_exposures_job.queue()
curate_exposures_job.wait_for_status("waiting")
'waiting'

Either curate exposures from the CryoSPARC interface or use Job.interact method to perform interactive job actions, as follows:

from cryosparc.util import first

data = curate_exposures_job.interact("get_fields_and_thresholds")

idx_field = first(field for field in data["fields"] if field["name"] == "idx")
assert idx_field
idx_field["thresholds"] = [5, 14]
idx_field["active"] = True

curate_exposures_job.interact("set_thresholds", data)
curate_exposures_job.interact("shutdown_interactive")
curate_exposures_job.wait_for_done()
'completed'

Detailed explanation of the previous code block:

  1. Call get_fields_and_thresholds to get a dictionary with a fields key. The value is a list of adjustable curation fields end thresholds. Each item has this format:

    {
       'name': str,
       'title': str
       'short': str,
       'active': bool,
       'range': [number, number],
       'thresholds': [number, number],
    }
    
  2. For each field to threshold (just the Index field in this case):

    1. Modify the thresholds list to [MIN, MAX], where

      • MIN is a number greater than or equal to the first item in range

      • MAX is a number less than or equal to the second item in range

    2. Set active to True to enable the threshold

  3. Call set_thresholds with the modified dictionary

  4. Call shutdown_interactive to finish curating and wait until the job is Completed.

7.4. Blob Picker#

The complated curation job will have 10 accepted and 10 rejected exposures. Provide the accepted ones as input to the Blob Picker.

blob_picker_job = workspace.create_job(
    "blob_picker_gpu",
    connections={"micrographs": (curate_exposures_job.uid, "exposures_accepted")},
    params={"diameter": 100, "diameter_max": 200},
)
blob_picker_job.queue(lane)
blob_picker_job.wait_for_done()
'completed'

7.5. Inspect Picks#

Create an Inspect Picks job and interact with it similarly to Curate Exposures.

inspect_blob_picks_job = workspace.create_job(
    "inspect_picks_v2",
    connections={
        "micrographs": (blob_picker_job.uid, "micrographs"),
        "particles": (blob_picker_job.uid, "particles"),
    },
)
inspect_blob_picks_job.queue()
inspect_blob_picks_job.wait_for_status("waiting")
inspect_blob_picks_job.interact(
    "set_thresholds",
    {"ncc_score_thresh": 0.3, "lpower_thresh_min": 600, "lpower_thresh_max": 1000},
)
inspect_blob_picks_job.interact("shutdown_interactive")
inspect_blob_picks_job.wait_for_done()
'completed'

7.6. 2D Classification#

Extract the selected particles and classify them with a 2D Classification job.

extract_blob_picks_job = workspace.create_job(
    "extract_micrographs_cpu_parallel",
    connections={
        "micrographs": (inspect_blob_picks_job.uid, "micrographs"),
        "particles": (inspect_blob_picks_job.uid, "particles"),
    },
    params={"box_size_pix": 448},
)

classify_blob_picks_job = workspace.create_job(
    "class_2D",
    connections={"particles": (extract_blob_picks_job.uid, "particles")},
    params={"class2D_K": 10},
)

extract_blob_picks_job.queue(lane)
classify_blob_picks_job.queue(lane)

extract_blob_picks_job.wait_for_done(), classify_blob_picks_job.wait_for_done()
('completed', 'completed')

7.7. Select 2D Classes#

Create a Select 2D Classes job and either select templates from the CryoSPARC interface or interact with the job as follows:

select_blob_templates_job = workspace.create_job(
    "select_2D",
    connections={
        "particles": (classify_blob_picks_job.uid, "particles"),
        "templates": (classify_blob_picks_job.uid, "class_averages"),
    },
)

select_blob_templates_job.queue()
select_blob_templates_job.wait_for_status("waiting")

# Auto-interact
class_info = select_blob_templates_job.interact("get_class_info")
for c in class_info:
    if 1.0 < c["res_A"] < 19.0 and c["num_particles_total"] > 900:
        select_blob_templates_job.interact(
            "set_class_selected",
            {
                "class_idx": c["class_idx"],
                "selected": True,
            },
        )
select_blob_templates_job.interact("finish")
select_blob_templates_job.wait_for_done()
'completed'

7.8. Template Picker#

Create and run a Template Picker job with all micrographs.

template_picker_job = workspace.create_job(
    "template_picker_gpu",
    connections={
        "micrographs": (ctf_estimation_job.uid, "exposures"),
        "templates": (select_blob_templates_job.uid, "templates_selected"),
    },
    params={"diameter": 200},
)
template_picker_job.queue(lane)
template_picker_job.wait_for_done()
'completed'

Repeat all previous steps from Inspect Picks to Select 2D, using the template picks as input. Note that when queuing a series of connected jobs, only interactive jobs and the last job in the chain need to be waited on.

For example, given the following job chain:

Inspect Picks -> Extract -> 2D Classification -> Select 2D Classes
  1. Queue all the jobs

  2. Wait for Inspect Picks to be interactive

  3. Invoke shutdown_interactive when finished interacting

  4. Wait for Select 2D Classes to be interactive (occurs after Extraction and 2D Classification complete)

  5. Shutdown when finished interacting

  6. Wait for Select 2D to be done

# Create and connect jobs
inspect_template_picks_job = workspace.create_job(
    "inspect_picks_v2",
    connections={
        "micrographs": (template_picker_job.uid, "micrographs"),
        "particles": (template_picker_job.uid, "particles"),
    },
)

extract_template_picks_job = workspace.create_job(
    "extract_micrographs_cpu_parallel",
    connections={
        "micrographs": (inspect_template_picks_job.uid, "micrographs"),
        "particles": (inspect_template_picks_job.uid, "particles"),
    },
    params={"box_size_pix": 448},
)

classify_template_picks_job = workspace.create_job(
    "class_2D",
    connections={"particles": (extract_template_picks_job.uid, "particles")},
    params={"class2D_K": 50},
)

select_templates_job = workspace.create_job(
    "select_2D",
    connections={
        "particles": (classify_template_picks_job.uid, "particles"),
        "templates": (classify_template_picks_job.uid, "class_averages"),
    },
)

# Queue Jobs
inspect_template_picks_job.queue()
extract_template_picks_job.queue(lane)
classify_template_picks_job.queue(lane)
select_templates_job.queue()

# Inspect template picks
inspect_template_picks_job.wait_for_status("waiting")
inspect_template_picks_job.interact(
    "set_thresholds",
    {"ncc_score_thresh": 0.3, "lpower_thresh_min": 900.0, "lpower_thresh_max": 1800.0},
)
inspect_template_picks_job.interact("shutdown_interactive")

# Select 2D Classes
select_templates_job.wait_for_status("waiting")
class_info = select_templates_job.interact("get_class_info")
for c in class_info:
    if 1.0 < c["res_A"] < 19.0 and c["num_particles_total"] > 100:
        select_templates_job.interact(
            "set_class_selected",
            {
                "class_idx": c["class_idx"],
                "selected": True,
            },
        )
select_templates_job.interact("finish")
select_templates_job.wait_for_done()
'completed'

7.9. Reconstruction and Refinement#

Finally, queue and run Ab-Initio Reconstruction and Homogeneous Refinement jobs.

abinit_job = workspace.create_job(
    "homo_abinit",
    connections={"particles": (select_templates_job.uid, "particles_selected")},
)

refine_job = workspace.create_job(
    "homo_refine_new",
    connections={
        "particles": (abinit_job.uid, "particles_all_classes"),
        "volume": (abinit_job.uid, "volume_class_0"),
    },
    params={
        "refine_symmetry": "D7",
        "refine_defocus_refine": True,
        "refine_ctf_global_refine": True,
    },
)

abinit_job.queue(lane)
refine_job.queue(lane)

abinit_job.wait_for_done(), refine_job.wait_for_done()
('completed', 'completed')