Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 57 additions & 61 deletions src/murfey/workflows/clem/register_preprocessing_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,20 @@ def _register_clem_imaging_site(
result: CLEMPreprocessingResult,
murfey_db: Session,
):
def _register(
"""
Creates an ImagingSite database entry for the current CLEM preprocessing result
if one doesn't already exist, or modifies the existing one if it does. Each entry
corresponds to a unique site on the sample grid, and results containing denoised
data will supersede existing rows for the same position that contain only raw
data. Returns the created/queried entry.
"""

def _populate(
entry: MurfeyDB.ImagingSite,
result: CLEMPreprocessingResult,
):
"""
Helper function to update the ImagingSite column values with.
Helper function to populate the ImagingSite column values.
"""

# Is this an atlas or grid square
Expand Down Expand Up @@ -174,20 +182,20 @@ def _register(
session_id=session_id,
site_name=result.site_name,
)
clem_img_site = _register(clem_img_site, result)
clem_img_site = _populate(clem_img_site, result)

# Prepare to overwrite existing entry if current result is a denoised dataset
if result.is_denoised:
# Proceed with overwrite if current result is different from existing entry
output_file = list(result.output_files.values())[0]
if str(output_file.parent / "*.tiff") != clem_img_site.image_path:
clem_img_site = _register(clem_img_site, result)
clem_img_site = _populate(clem_img_site, result)

# Commit changes and return entry
murfey_db.add(clem_img_site)
murfey_db.commit()
murfey_db.close()

logger.info(f"CLEM preprocessing results registered for {result.series_name!r} ")
return clem_img_site


def _determine_collection_mode(
Expand Down Expand Up @@ -219,9 +227,15 @@ def _register_dcg_and_atlas(
session_id: int,
instrument_name: str,
visit_name: str,
result: CLEMPreprocessingResult,
imaging_site: MurfeyDB.ImagingSite,
murfey_db: Session,
):
"""
Takes an ImagingSite entry and uses it to create and register DataCollectionGroup
entries in ISPyB if they don't already exist, or to populate existing entries.
After doing so, it will register the DataCollectionGroup ID in Murfey and add it
to the ImagingSite entry.
"""
# Determine variables to register data collection group and atlas with
proposal_code = "".join(char for char in visit_name.split("-")[0] if char.isalpha())
proposal_number = "".join(
Expand All @@ -230,36 +244,29 @@ def _register_dcg_and_atlas(
visit_number = visit_name.split("-")[-1]

# Generate name/tag for data colleciton group based on series name
dcg_name = result.site_name.split("--")[0]
if result.site_name.split("--")[1].isdigit():
dcg_name += f"--{result.site_name.split('--')[1]}"
dcg_name = imaging_site.site_name.split("--")[0]
if imaging_site.site_name.split("--")[1].isdigit():
dcg_name += f"--{imaging_site.site_name.split('--')[1]}"

# Determine values for atlas
if result.is_atlas:
output_file = list(result.output_files.values())[0]
# Register the thumbnail entries if they are provided
if result.thumbnails and result.thumbnail_size is not None:
# Glob path to the thumbnail files
thumbnail = list(result.thumbnails.values())[0]
atlas_name = str(thumbnail.parent / "*.png")

# Work out the scaling factor used
thumbnail_height, thumbnail_width = result.thumbnail_size
scaling_factor = min(
thumbnail_width / result.pixels_x,
thumbnail_height / result.pixels_y,
)
atlas_pixel_size = result.pixel_size / scaling_factor
if is_atlas := imaging_site.data_type == "atlas":
# Register using thumbnail values if they are provided
if (
imaging_site.thumbnail_path is not None
and imaging_site.thumbnail_pixel_size is not None
):
atlas_name: str | None = imaging_site.thumbnail_path
atlas_pixel_size: float | None = imaging_site.thumbnail_pixel_size
# Otherwise, register the TIFF files themselves
else:
atlas_name = str(output_file.parent / "*.tiff")
atlas_pixel_size = result.pixel_size
atlas_name = imaging_site.image_path
atlas_pixel_size = imaging_site.image_pixel_size
# Translate colour flags into ISPyB convention
color_flags = {
COLOR_FLAGS_MURFEY_TO_ISPYB[key]: int(value)
for key, value in _get_color_flags(result.output_files.keys()).items()
COLOR_FLAGS_MURFEY_TO_ISPYB[key]: getattr(imaging_site, key, 0)
for key in COLOR_FLAGS_MURFEY_TO_ISPYB.keys()
}
collection_mode = _determine_collection_mode(result.output_files.keys())
collection_mode = imaging_site.collection_mode
else:
atlas_name = ""
atlas_pixel_size = 0.0
Expand All @@ -272,9 +279,8 @@ def _register_dcg_and_atlas(
.where(MurfeyDB.DataCollectionGroup.tag == dcg_name)
).all():
dcg_entry = dcg_search[0]
# Update atlas if registering atlas dataset
# and data collection group already exists
if result.is_atlas:
# Update if current dataset is atlas and data collection group exists
if is_atlas:
atlas_message = {
"session_id": session_id,
"dcgid": dcg_entry.id,
Expand Down Expand Up @@ -330,51 +336,41 @@ def _register_dcg_and_atlas(
.where(MurfeyDB.DataCollectionGroup.tag == dcg_name)
).one()

if not (
clem_img_site := murfey_db.exec(
select(MurfeyDB.ImagingSite)
.where(MurfeyDB.ImagingSite.session_id == session_id)
.where(MurfeyDB.ImagingSite.site_name == result.site_name)
).one_or_none()
):
clem_img_site = MurfeyDB.ImagingSite(
session_id=session_id, site_name=result.site_name
)

clem_img_site.dcg_id = dcg_entry.id
clem_img_site.dcg_name = dcg_entry.tag
murfey_db.add(clem_img_site)
imaging_site.dcg_id = dcg_entry.id
imaging_site.dcg_name = dcg_entry.tag
murfey_db.add(imaging_site)
murfey_db.commit()
murfey_db.close()


def _register_grid_square(
session_id: int,
result: CLEMPreprocessingResult,
imaging_site: MurfeyDB.ImagingSite,
murfey_db: Session,
):
# Skip this step if no transport manager object is configured
if _transport_object is None:
logger.error("Unable to find transport manager")
return
# Load all entries for the current data collection group
dcg_name = result.site_name.split("--")[0]
if result.site_name.split("--")[1].isdigit():
dcg_name += f"--{result.site_name.split('--')[1]}"
if (dcg_name := imaging_site.dcg_name) is None:
logger.warning("Current imaging site has no data collection group name")
return

# Check if an atlas has been registered
if not (
atlas_entry := murfey_db.exec(
# Sort by ascending insertion order
atlas_results := murfey_db.exec(
select(MurfeyDB.ImagingSite)
.where(MurfeyDB.ImagingSite.session_id == session_id)
.where(MurfeyDB.ImagingSite.dcg_name == dcg_name)
.where(MurfeyDB.ImagingSite.data_type == "atlas")
).one_or_none()
.order_by(MurfeyDB.ImagingSite.id)
).all()
):
logger.info(
f"No atlas has been registered for data collection group {dcg_name!r} yet"
)
return
atlas_entry = atlas_results[-1] # Use the latest registered atlas

# Check if there are CLEM entries to register
if clem_img_site_to_register := murfey_db.exec(
Expand Down Expand Up @@ -531,17 +527,17 @@ def _register_grid_square(
image=grid_square_params.image,
)
murfey_db.add(grid_square_entry)
murfey_db.commit()

# Add grid square ID to existing CLEM image series entry
clem_img_site.grid_square_id = grid_square_entry.id
murfey_db.add(clem_img_site)
murfey_db.commit()

# Do one commit at the end
murfey_db.commit()
else:
logger.info(
f"No grid squares to register for data collection group {dcg_name!r} yet"
)
murfey_db.close()
return


Expand Down Expand Up @@ -584,7 +580,7 @@ def run(message: dict, murfey_db: Session) -> dict[str, bool]:
return {"success": False, "requeue": False}
try:
# Register items in Murfey database
_register_clem_imaging_site(
clem_img_site = _register_clem_imaging_site(
session_id=session_id,
result=result,
murfey_db=murfey_db,
Expand All @@ -602,7 +598,7 @@ def run(message: dict, murfey_db: Session) -> dict[str, bool]:
session_id=session_id,
instrument_name=murfey_session.instrument_name,
visit_name=murfey_session.visit,
result=result,
imaging_site=clem_img_site,
murfey_db=murfey_db,
)
except Exception:
Expand All @@ -617,7 +613,7 @@ def run(message: dict, murfey_db: Session) -> dict[str, bool]:
# Register CLEM image series as grid squares
_register_grid_square(
session_id=session_id,
result=result,
imaging_site=clem_img_site,
murfey_db=murfey_db,
)
except Exception:
Expand Down
18 changes: 16 additions & 2 deletions tests/workflows/clem/test_register_preprocessing_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ def generate_preprocessing_messages(
for n in range(3)
]
)
# Add a second atlas dataset
datasets.extend(
[
(
grid_dir / "Overview 2" / "Image 1",
False,
True,
(2400, 2400),
1e-6,
[0.002, 0.0044, 0.002, 0.0044],
)
]
)
# Add on metadata for denoised datasets
datasets.extend(
[
(
Expand Down Expand Up @@ -398,7 +412,7 @@ def test_run_with_db(
MurfeyDB.GridSquare.session_id == murfey_session.id
)
).all()
assert len(murfey_gs_search) == (len(preprocessing_messages) - 1) // 2
assert len(murfey_gs_search) == (len(preprocessing_messages) - 2) // 2

# ISPyB's DataCollectionGroup should have an entry
murfey_dcg = murfey_dcg_search[0]
Expand Down Expand Up @@ -449,7 +463,7 @@ def test_run_with_db(
.scalars()
.all()
)
assert len(ispyb_gs_search) == (len(preprocessing_messages) - 1) // 2
assert len(ispyb_gs_search) == (len(preprocessing_messages) - 2) // 2
for gs in ispyb_gs_search:
# Check that all entries point to the denoised images ("_Lng_LVCC")
assert gs.gridSquareImage is not None and "_Lng_LVCC" in gs.gridSquareImage
Expand Down
Loading