Skip to content

opticstream.utils

Filename parsing and manipulation utilities.

These are regular Python functions (not Prefect tasks) for filename operations used across multiple tasks.

complex_to_complex_filename(complex_file, complex_path)

Normalize complex filename and construct full path.

Normalizes image index to 4 digits and ensures filename ends with _complex.nii. Converts mosaic_001_image_000_complex.niimosaic_001_image_0000_complex.nii

Parameters:

Name Type Description Default
complex_file str

Path to complex file

required
complex_path Path

Path to complex directory

required

Returns:

Type Description
str

Full path to normalized complex file

Source code in opticstream/utils/filename_utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def complex_to_complex_filename(complex_file: str, complex_path: Path) -> str:
    """
    Normalize complex filename and construct full path.

    Normalizes image index to 4 digits and ensures filename ends with `_complex.nii`.
    Converts `mosaic_001_image_000_complex.nii` → `mosaic_001_image_0000_complex.nii`

    Parameters
    ----------
    complex_file : str
        Path to complex file
    complex_path : Path
        Path to complex directory

    Returns
    -------
    str
        Full path to normalized complex file
    """
    # Normalize image index to 4 digits
    normalized_name = normalize_image_index(complex_file)

    # Ensure the normalized name ends with _complex.nii
    name_no_ext = op.splitext(normalized_name)[0]
    if not name_no_ext.endswith("_complex"):
        # Extract mosaic and image parts, force _complex suffix
        match = re.match(r"^mosaic_(\d{3})_image_(\d{4})", name_no_ext)
        if match:
            name_no_ext = f"mosaic_{match.group(1)}_image_{match.group(2)}_complex"
            normalized_name = name_no_ext + ".nii"

    return str(Path(complex_path) / normalized_name)

extract_processed_index_from_filename(file_path)

Extract the numeric index after processed_ in the basename.

Used for mosaics_per_slice == 3 spectral paths such as .../spectral/processed_0123*.nii.

Source code in opticstream/utils/filename_utils.py
136
137
138
139
140
141
142
143
144
145
146
147
def extract_processed_index_from_filename(file_path: str) -> int:
    """
    Extract the numeric index after ``processed_`` in the basename.

    Used for ``mosaics_per_slice == 3`` spectral paths such as
    ``.../spectral/processed_0123*.nii``.
    """
    filename = op.basename(file_path)
    match = re.search(r"processed_(\d+)", filename)
    if not match:
        raise ValueError(f"Could not parse processed index from filename: {file_path}")
    return int(match.group(1))

extract_spectral_index_from_filename(file_path)

Extract the numeric index after spectral_ in the basename.

Used for mosaics_per_slice == 3 processed paths such as .../processed/processed_0123*.nii.

Source code in opticstream/utils/filename_utils.py
123
124
125
126
127
128
129
130
131
132
133
134
def extract_spectral_index_from_filename(file_path: str) -> int:
    """
    Extract the numeric index after ``spectral_`` in the basename.

    Used for ``mosaics_per_slice == 3`` processed paths such as
    ``.../processed/processed_0123*.nii``.
    """
    filename = op.basename(file_path)
    match = re.search(r"spectral_(\d+)", filename)
    if not match:
        raise ValueError(f"Could not parse processed index from filename: {file_path}")
    return int(match.group(1))

extract_tile_number_from_filename(file_path)

Extract tile number from an image_{number} token in filename.

Parameters:

Name Type Description Default
file_path str

Path to file (full path or basename)

required

Returns:

Type Description
int

Tile number parsed from image_{number}

Raises:

Type Description
ValueError

If filename does not contain an image_{number} token.

Source code in opticstream/utils/filename_utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def extract_tile_number_from_filename(file_path: str) -> int:
    """
    Extract tile number from an ``image_{number}`` token in filename.

    Parameters
    ----------
    file_path : str
        Path to file (full path or basename)

    Returns
    -------
    int
        Tile number parsed from ``image_{number}``

    Raises
    ------
    ValueError
        If filename does not contain an ``image_{number}`` token.
    """
    filename = op.basename(file_path)
    match = re.search(r"(?:^|_)image_(\d+)(?:_|\.|$)", filename)
    if not match:
        raise ValueError(f"Could not parse tile number from filename: {file_path}")
    return int(match.group(1))

normalize_image_index(filename)

Normalize image index in filename to 4-digit padding.

Handles mosaic_(\d{3})_image_(\d{3,4})mosaic_(\d{3})_image_(\d{4})

Parameters:

Name Type Description Default
filename str

Filename to normalize (can be full path or just basename)

required

Returns:

Type Description
str

Normalized filename with 4-digit image index (basename only)

Raises:

Type Description
ValueError

If filename doesn't match expected pattern

Source code in opticstream/utils/filename_utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def normalize_image_index(filename: str) -> str:
    r"""
    Normalize image index in filename to 4-digit padding.

    Handles `mosaic_(\d{3})_image_(\d{3,4})` → `mosaic_(\d{3})_image_(\d{4})`

    Parameters
    ----------
    filename : str
        Filename to normalize (can be full path or just basename)

    Returns
    -------
    str
        Normalized filename with 4-digit image index (basename only)

    Raises
    ------
    ValueError
        If filename doesn't match expected pattern
    """
    name = op.basename(filename)
    name_no_ext = op.splitext(name)[0]
    match = re.match(r"^mosaic_(\d{3})_image_(\d{3,4})(.*)$", name_no_ext)
    if not match:
        raise ValueError(f"Invalid filename pattern: {filename}")
    mosaic_str = match.group(1)
    image_idx = int(match.group(2))
    image_str = f"{image_idx:04d}"
    suffix = match.group(3)  # Everything after image index (e.g., "_complex" or empty)
    # Reconstruct with normalized image index
    normalized_name = f"mosaic_{mosaic_str}_image_{image_str}{suffix}"
    ext = op.splitext(name)[1]
    return normalized_name + ext

parse_lsm_run_folder_name(folder_name)

Parse an LSM run folder name into run_index, strip_index, and channel_index.

Folder names follow the format: - → run_index=N, strip_index=1, channel_index=1 - → run_index=N, strip_index=s+1, channel_index=1 - _C2 → run_index=N, strip_index=1, channel_index=2 - _C2 → run_index=N, strip_index=s+1, channel_index=2

Where <prefix> is any single word composed of letters (e.g. "Run", "Scan", "Acq"). The prefix is case-insensitive.

Parameters:

Name Type Description Default
folder_name str

Folder name or path (e.g. "Run1", "Run1_2", "Scan3_C2", "Acq1_C2_3"). Trailing slashes and parent path are ignored; only the basename is parsed.

required

Returns:

Type Description
tuple of (int, int, int)

(run_index, strip_index, channel_index)

Raises:

Type Description
ValueError

If folder_name does not match the expected pattern.

Examples:

>>> parse_lsm_run_folder_name("Run1")
(1, 1, 1)
>>> parse_lsm_run_folder_name("Run1_1")
(1, 2, 1)
>>> parse_lsm_run_folder_name("Run1_4")
(1, 5, 1)
>>> parse_lsm_run_folder_name("Run1_C2")
(1, 1, 2)
>>> parse_lsm_run_folder_name("Run1_C2_3")
(1, 4, 2)
>>> parse_lsm_run_folder_name("Scan2_C2_1")
(2, 2, 2)
Source code in opticstream/utils/filename_utils.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def parse_lsm_run_folder_name(folder_name: str) -> tuple[int, int, int]:
    """
    Parse an LSM run folder name into run_index, strip_index, and channel_index.

    Folder names follow the format:
    - <prefix><N>           → run_index=N, strip_index=1, channel_index=1
    - <prefix><N>_<s>       → run_index=N, strip_index=s+1, channel_index=1
    - <prefix><N>_C2        → run_index=N, strip_index=1, channel_index=2
    - <prefix><N>_C2_<s>    → run_index=N, strip_index=s+1, channel_index=2

    Where ``<prefix>`` is any single word composed of letters (e.g. "Run", "Scan",
    "Acq").  The prefix is case-insensitive.

    Parameters
    ----------
    folder_name : str
        Folder name or path (e.g. "Run1", "Run1_2", "Scan3_C2", "Acq1_C2_3").
        Trailing slashes and parent path are ignored; only the basename is parsed.

    Returns
    -------
    tuple of (int, int, int)
        (run_index, strip_index, channel_index)

    Raises
    ------
    ValueError
        If folder_name does not match the expected pattern.

    Examples
    --------
    >>> parse_lsm_run_folder_name("Run1")
    (1, 1, 1)
    >>> parse_lsm_run_folder_name("Run1_1")
    (1, 2, 1)
    >>> parse_lsm_run_folder_name("Run1_4")
    (1, 5, 1)
    >>> parse_lsm_run_folder_name("Run1_C2")
    (1, 1, 2)
    >>> parse_lsm_run_folder_name("Run1_C2_3")
    (1, 4, 2)
    >>> parse_lsm_run_folder_name("Scan2_C2_1")
    (2, 2, 2)
    """
    name = op.basename(folder_name.rstrip(op.sep))
    m = _LSM_RUN_FOLDER_RE.match(name)
    if not m:
        raise ValueError(
            f"Folder name does not match LSM run pattern '<prefix><N>[_C2][_<suffix>]': {folder_name!r}"
        )
    run_index = int(m.group(2))
    channel_index = 2 if m.group(3) else 1
    suffix = m.group(5)
    strip_index = int(suffix) + 1 if suffix is not None else 1
    return (run_index, strip_index, channel_index)

parse_lsm_strip_index(strip_index, channel_index, strips_per_slice)

Parse an LSM strip index into a slice index, strip index, and channel index.

Parameters:

Name Type Description Default
strip_index int

1-based strip index within the entire acquisition.

required
channel_index int

Channel index (e.g. 1 or 2).

required
strips_per_slice int

Number of strips acquired per slice.

required

Returns:

Type Description
Tuple[int, int, int]

(slice_index, strip_index_within_slice, channel_index)

Source code in opticstream/utils/filename_utils.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def parse_lsm_strip_index(
    strip_index: int, channel_index: int, strips_per_slice: int
) -> Tuple[int, int, int]:
    """
    Parse an LSM strip index into a slice index, strip index, and channel index.

    Parameters
    ----------
    strip_index : int
        1-based strip index within the entire acquisition.
    channel_index : int
        Channel index (e.g. 1 or 2).
    strips_per_slice : int
        Number of strips acquired per slice.

    Returns
    -------
    Tuple[int, int, int]
        (slice_index, strip_index_within_slice, channel_index)
    """
    slice_index = (strip_index - 1) // strips_per_slice + 1
    strip_index_within_slice = (strip_index - 1) % strips_per_slice + 1
    return (slice_index, strip_index_within_slice, channel_index)

parse_lsm_strip_index_from_filename(folder_name, strips_per_slice=1)

Parse LSM slice/strip/channel indices from a run folder name.

This combines parse_lsm_run_folder_name and parse_lsm_strip_index:

  • First, the folder name is interpreted as an LSM run folder, e.g.:
  • Run1, Run1_2, Run1_C2, Run1_C2_3.
  • Then the resulting strip index is split into a slice index and a strip index within that slice using strips_per_slice.

Parameters:

Name Type Description Default
folder_name str

Folder name or path; only the basename is parsed.

required
strips_per_slice int

Number of strips acquired per slice. Defaults to 1.

1

Returns:

Type Description
Tuple[int, int, int]

(slice_index, strip_index_within_slice, channel_index)

Source code in opticstream/utils/filename_utils.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def parse_lsm_strip_index_from_filename(
    folder_name: str,
    strips_per_slice: int = 1,
) -> Tuple[int, int, int]:
    """
    Parse LSM slice/strip/channel indices from a run folder name.

    This combines `parse_lsm_run_folder_name` and `parse_lsm_strip_index`:

    - First, the folder name is interpreted as an LSM run folder, e.g.:
      - ``Run1``, ``Run1_2``, ``Run1_C2``, ``Run1_C2_3``.
    - Then the resulting strip index is split into a slice index and a strip index
      within that slice using ``strips_per_slice``.

    Parameters
    ----------
    folder_name : str
        Folder name or path; only the basename is parsed.
    strips_per_slice : int, optional
        Number of strips acquired per slice. Defaults to 1.

    Returns
    -------
    Tuple[int, int, int]
        (slice_index, strip_index_within_slice, channel_index)
    """
    run_index, strip_index, channel_index = parse_lsm_run_folder_name(
        os.path.basename(folder_name)
    )
    # Currently, run_index is not used here but is preserved for future extensions.
    return parse_lsm_strip_index(strip_index, channel_index, strips_per_slice)

replace_spectral_with_complex_in_path(file_path)

Replace 'spectral' with 'complex' in file path.

Simple string replacement for complex→processed conversion.

Parameters:

Name Type Description Default
file_path str

File path containing 'spectral'

required

Returns:

Type Description
str

File path with 'spectral' replaced by 'complex'

Source code in opticstream/utils/filename_utils.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def replace_spectral_with_complex_in_path(file_path: str) -> str:
    """
    Replace 'spectral' with 'complex' in file path.

    Simple string replacement for complex→processed conversion.

    Parameters
    ----------
    file_path : str
        File path containing 'spectral'

    Returns
    -------
    str
        File path with 'spectral' replaced by 'complex'
    """
    return file_path.replace("spectral", "complex")

spectral_to_complex_filename(spectral_file, complex_path)

Convert spectral filename to complex filename.

Converts mosaic_001_image_0000_spectral_0000.niimosaic_001_image_0000_complex.nii Handles spectral→complex replacement and image index normalization to 4 digits.

Parameters:

Name Type Description Default
spectral_file str

Path to spectral file

required
complex_path Path

Path to complex directory

required

Returns:

Type Description
str

Full path to complex file

Source code in opticstream/utils/filename_utils.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def spectral_to_complex_filename(spectral_file: str, complex_path: Path) -> str:
    """
    Convert spectral filename to complex filename.

    Converts `mosaic_001_image_0000_spectral_0000.nii` → `mosaic_001_image_0000_complex.nii`
    Handles spectral→complex replacement and image index normalization to 4 digits.

    Parameters
    ----------
    spectral_file : str
        Path to spectral file
    complex_path : Path
        Path to complex directory

    Returns
    -------
    str
        Full path to complex file
    """
    base_name = op.basename(spectral_file)
    # Replace anything starting with 'spectral' before the extension with 'complex'
    # This corresponds to: regexprep(base_name, 'spectral.*$', 'complex')
    if "spectral" in base_name:
        # Find start of 'spectral', remove everything from there to extension, append 'complex'
        idx = base_name.find("spectral")
        name_no_ext = base_name[:idx] + "complex"
        ext = op.splitext(base_name)[1]
        new_base = name_no_ext + ext
    else:
        new_base = base_name

    # Extract name before extension for further regex
    name_no_ext = op.splitext(new_base)[0]

    # Attempt to match '^mosaic_(\d{3})_image_(\d{3})_complex$'
    match = re.match(r"^mosaic_(\d{3})_image_(\d{3,4})_complex$", name_no_ext)
    if match:
        mosaic_str = match.group(1)
        image_idx = int(match.group(2))
        image_str = f"{image_idx:04d}"
        # Reconstruct name with 4-digit image index and original extension
        name_no_ext = f"mosaic_{mosaic_str}_image_{image_str}_complex"
        new_base = name_no_ext + op.splitext(new_base)[1]

    return str(Path(complex_path) / new_base)

Deployment utilities for Prefect event-driven flows.

This module provides helper functions to create standardized deployment configurations for event-driven flows, reducing boilerplate code.

create_event_deployment(flow, name, event_name, tags=None, concurrency_limit=None)

Create a standard event-driven deployment for a flow.

This helper creates a deployment configuration for a flow that is triggered by a single event. It standardizes the deployment setup and reduces boilerplate.

Parameters:

Name Type Description Default
flow Flow

The Prefect flow to create a deployment for

required
name str

Deployment name

required
event_name str

Event name to trigger on

required
tags List[str]

Tags for the deployment

None
concurrency_limit int

Concurrency limit for the deployment

None

Returns:

Type Description
Flow

The flow with deployment configuration (for use with to_deployment())

Examples:

>>> deployment = create_event_deployment(
...     process_tile_batch_event_flow,
...     name="process_tile_batch_event_flow",
...     event_name=BATCH_READY,
...     tags=["event-driven", "tile-batch"],
... )
Source code in opticstream/utils/deployment_utils.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_event_deployment(
    flow: Flow,
    name: str,
    event_name: str,
    tags: Optional[List[str]] = None,
    concurrency_limit: Optional[int] = None,
) -> Flow:
    """
    Create a standard event-driven deployment for a flow.

    This helper creates a deployment configuration for a flow that is triggered
    by a single event. It standardizes the deployment setup and reduces boilerplate.

    Parameters
    ----------
    flow : Flow
        The Prefect flow to create a deployment for
    name : str
        Deployment name
    event_name : str
        Event name to trigger on
    tags : List[str], optional
        Tags for the deployment
    concurrency_limit : int, optional
        Concurrency limit for the deployment

    Returns
    -------
    Flow
        The flow with deployment configuration (for use with to_deployment())

    Examples
    --------
    >>> deployment = create_event_deployment(
    ...     process_tile_batch_event_flow,
    ...     name="process_tile_batch_event_flow",
    ...     event_name=BATCH_READY,
    ...     tags=["event-driven", "tile-batch"],
    ... )
    """
    if tags is None:
        tags = ["event-driven"]

    deployment = flow.to_deployment(
        name=name,
        tags=tags,
        triggers=[get_event_trigger(event_name)],
    )

    if concurrency_limit is not None:
        deployment.concurrency_limit = concurrency_limit

    return deployment

create_unified_deployment(flow, name, event_names, tags=None, concurrency_limit=None)

Create a unified deployment for a flow that handles multiple events.

This helper creates a deployment configuration for flows that need to handle multiple event types (like unified_state_management_event_flow).

Parameters:

Name Type Description Default
flow Flow

The Prefect flow to create a deployment for

required
name str

Deployment name

required
event_names List[str]

List of event names to trigger on

required
tags List[str]

Tags for the deployment

None
concurrency_limit int

Concurrency limit for the deployment

None

Returns:

Type Description
Flow

The flow with deployment configuration (for use with to_deployment())

Examples:

>>> deployment = create_unified_deployment(
...     unified_state_management_event_flow,
...     name="unified_state_management_event_flow",
...     event_names=[BATCH_PROCESSED, BATCH_ARCHIVED, MOSAIC_ENFACE_STITCHED],
...     tags=["event-driven", "state-management", "unified"],
...     concurrency_limit=1,
... )
Source code in opticstream/utils/deployment_utils.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def create_unified_deployment(
    flow: Flow,
    name: str,
    event_names: List[str],
    tags: Optional[List[str]] = None,
    concurrency_limit: Optional[int] = None,
) -> Flow:
    """
    Create a unified deployment for a flow that handles multiple events.

    This helper creates a deployment configuration for flows that need to handle
    multiple event types (like unified_state_management_event_flow).

    Parameters
    ----------
    flow : Flow
        The Prefect flow to create a deployment for
    name : str
        Deployment name
    event_names : List[str]
        List of event names to trigger on
    tags : List[str], optional
        Tags for the deployment
    concurrency_limit : int, optional
        Concurrency limit for the deployment

    Returns
    -------
    Flow
        The flow with deployment configuration (for use with to_deployment())

    Examples
    --------
    >>> deployment = create_unified_deployment(
    ...     unified_state_management_event_flow,
    ...     name="unified_state_management_event_flow",
    ...     event_names=[BATCH_PROCESSED, BATCH_ARCHIVED, MOSAIC_ENFACE_STITCHED],
    ...     tags=["event-driven", "state-management", "unified"],
    ...     concurrency_limit=1,
    ... )
    """
    if tags is None:
        tags = ["event-driven", "unified"]

    triggers = [get_event_trigger(event_name) for event_name in event_names]

    deployment = flow.to_deployment(
        name=name,
        tags=tags,
        triggers=triggers,
    )

    if concurrency_limit is not None:
        deployment.concurrency_limit = concurrency_limit

    return deployment