From 89b07fb1de57d66fb7e506c09c4b1bd0a0cc568f Mon Sep 17 00:00:00 2001 From: Matus Kasak Date: Wed, 10 Jun 2026 14:23:49 +0200 Subject: [PATCH] Possible solution for duplicate bitstreams after import --- src/dspace/_rest.py | 73 +++++++--- src/pump/_bitstream.py | 218 ++++++++++++++++++++++++++-- tests/test_bitstream_recovery.py | 185 +++++++++++++++++++++++ tests/test_issue_665_replication.py | 125 ++++++++++++++++ 4 files changed, 567 insertions(+), 34 deletions(-) create mode 100644 tests/test_bitstream_recovery.py create mode 100644 tests/test_issue_665_replication.py diff --git a/src/dspace/_rest.py b/src/dspace/_rest.py index 86bbe9c..a374a75 100644 --- a/src/dspace/_rest.py +++ b/src/dspace/_rest.py @@ -374,6 +374,24 @@ def fetch_raw_item(self, uuid: str): raise Exception(r) return response_to_json(r) + def fetch_bundle_bitstreams(self, bundle_uuid: str, page_size: int = 100): + """Fetch all bitstreams currently assigned to one bundle.""" + page = 0 + out = [] + url = f'core/bundles/{bundle_uuid}/bitstreams' + while True: + r = self._fetch(url, self.get, '_embedded', params={'page': page, 'size': page_size}) + if r is None: + break + chunk = r.get('bitstreams', []) if isinstance(r, dict) else [] + if not chunk: + break + out.extend(chunk) + if len(chunk) < page_size: + break + page += 1 + return out + # ======= def put_usermetadata(self, params: dict, data: dict): @@ -405,7 +423,9 @@ def add_checksums(self): def put_bitstream(self, param: dict, data: dict): url = 'clarin/import/core/bitstream' _logger.debug(f"Importing [][{param}] using [{url}]") - return list(self._iput(url, [data], [param]))[0] + # Bitstream creation endpoint is non-idempotent in practice. + # Disable transient retries to avoid duplicate rows after read timeouts. + return list(self._iput(url, [data], [param], allow_transient_retry=False))[0] def put_com_logo(self, param: dict): url = 'clarin/import/logo/community' @@ -588,18 +608,33 @@ def _put(self, url: str, arr: list, params: list = None): list(self._iput(url, arr, params)) return len(arr) - def _iput(self, url: str, arr: list, params=None): + def _iput(self, url: str, arr: list, params=None, allow_transient_retry: bool = True): _logger.debug(f"Importing {len(arr)} using [{url}]") if params is not None: assert len(params) == len(arr) for i, data in enumerate(progress_bar(arr)): param = params[i] if params is not None else None - result = self._post_with_retry(url, data, param, i, len(arr)) + result = self._post_with_retry( + url, + data, + param, + i, + len(arr), + allow_transient_retry=allow_transient_retry, + ) yield result _logger.debug(f"Imported [{url}] successfully") - def _post_with_retry(self, url: str, data, param, item_index: int, total_items: int): + def _post_with_retry( + self, + url: str, + data, + param, + item_index: int, + total_items: int, + allow_transient_retry: bool = True, + ): """POST with retry logic for handling temporary server errors""" # Check if circuit breaker is blocking requests due to consecutive errors @@ -619,7 +654,9 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: last_exception = None last_response = None - for attempt in range(HTTP_MAX_RETRIES): + max_attempts = HTTP_MAX_RETRIES if allow_transient_retry else 1 + + for attempt in range(max_attempts): try: self._maybe_reauthenticate() r = self.post(url, params=param, data=data) @@ -640,7 +677,7 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: ) if attempt > 0: _logger.debug( - f"POST [{url}] succeeded on attempt {attempt + 1}/{HTTP_MAX_RETRIES}") + f"POST [{url}] succeeded on attempt {attempt + 1}/{max_attempts}") return js except Exception: return r @@ -648,13 +685,13 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: # Handle auth errors (recoverable via re-auth) elif r.status_code in [401, 403]: last_response = r - if attempt == HTTP_MAX_RETRIES - 1: + if attempt == max_attempts - 1: _logger.warning( - f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - final attempt") + f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - final attempt") break _logger.warning( - f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - re-authenticating") + f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - re-authenticating") if not self._maybe_reauthenticate(force=True): _logger.warning("Re-authentication failed") break @@ -666,19 +703,19 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: self._handle_circuit_breaker(r.status_code) retry_delay = HTTP_RETRY_DELAY * (HTTP_RETRY_BACKOFF ** attempt) - if attempt == HTTP_MAX_RETRIES - 1: + if attempt == max_attempts - 1: # Last attempt - no retry will happen _logger.warning( - f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - final attempt") + f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - final attempt") elif attempt == 0: # First attempt - log with retry info _logger.warning( - f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - retrying in {retry_delay}s") + f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - retrying in {retry_delay}s") else: _logger.debug( - f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES})") + f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts})") - if attempt < HTTP_MAX_RETRIES - 1: + if attempt < max_attempts - 1: time.sleep(retry_delay) # Re-authenticate on certain errors @@ -701,12 +738,12 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: if attempt == 0 or attempt == HTTP_MAX_RETRIES - 1: _logger.warning( - f"POST [{url}] exception (attempt {attempt + 1}/{HTTP_MAX_RETRIES}): {str(e)}") + f"POST [{url}] exception (attempt {attempt + 1}/{max_attempts}): {str(e)}") else: _logger.debug( - f"POST [{url}] exception (attempt {attempt + 1}/{HTTP_MAX_RETRIES}): {str(e)}") + f"POST [{url}] exception (attempt {attempt + 1}/{max_attempts}): {str(e)}") - if attempt < HTTP_MAX_RETRIES - 1: + if attempt < max_attempts - 1: time.sleep(retry_delay) continue @@ -730,7 +767,7 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items: error_detail = sanitize_log_content( str(last_exception) if last_exception else "Unknown error") - msg = f"POST [{url}] for [{ascii_data}] failed after {HTTP_MAX_RETRIES} attempts. Final error: {error_detail}" + msg = f"POST [{url}] for [{ascii_data}] failed after {max_attempts} attempts. Final error: {error_detail}" _logger.error(msg) return None diff --git a/src/pump/_bitstream.py b/src/pump/_bitstream.py index 16735fb..f26c66a 100644 --- a/src/pump/_bitstream.py +++ b/src/pump/_bitstream.py @@ -37,6 +37,34 @@ class bitstreams: "(select metadata_field_id from metadatafieldregistry " "where qualifier = 'redirectToURL')"], "right": ["val", 0] + }, + { + "name": "bitstream_duplicate_original_bundle_sequence", + "left": ["sql", "db7", "one", "select count(*) from (" + " select b2b.bundle_id, b.sequence_id" + " from bundle2bitstream b2b" + " join bitstream b on b.bitstream_id = b2b.bitstream_id" + " join bundle bu on bu.bundle_id = b2b.bundle_id" + " where bu.name = 'ORIGINAL'" + " group by b2b.bundle_id, b.sequence_id" + " having count(*) > 1" + ") dup"], + "right": ["val", 0] + }, + { + "name": "bitstream_duplicate_original_bundle_name_checksum", + "left": ["sql", "db7", "one", "select count(*) from (" + " select b2b.bundle_id, lower(mv.text_value), b.checksum" + " from bundle2bitstream b2b" + " join bitstream b on b.bitstream_id = b2b.bitstream_id" + " join bundle bu on bu.bundle_id = b2b.bundle_id" + " join metadatavalue mv on mv.resource_type_id = 0 and mv.dspace_object_id = b.bitstream_id" + " join metadatafieldregistry mfr on mfr.metadata_field_id = mv.metadata_field_id" + " where bu.name = 'ORIGINAL' and mfr.element = 'title'" + " group by b2b.bundle_id, lower(mv.text_value), b.checksum" + " having count(*) > 1" + ") dup"], + "right": ["val", 0] } ] @@ -73,6 +101,85 @@ def uuid(self, b_id: int): def bitstream_path(internal_id: str): return os.path.join(internal_id[:2], internal_id[2:4], internal_id[4:6], internal_id) + @staticmethod + def _normalize_signature_value(value): + if value is None: + return None + return str(value).strip() + + @staticmethod + def _extract_metadata_title(metadata): + if not isinstance(metadata, dict): + return None + values = metadata.get('dc.title') + if not isinstance(values, list) or len(values) == 0: + return None + first = values[0] + if not isinstance(first, dict): + return None + return bitstreams._normalize_signature_value(first.get('value')) + + @staticmethod + def _extract_remote_name(remote): + if not isinstance(remote, dict): + return None + + name = bitstreams._normalize_signature_value(remote.get('name')) + if name: + return name + + metadata = remote.get('metadata') + if isinstance(metadata, dict): + name = bitstreams._extract_metadata_title(metadata) + if name: + return name + + if isinstance(metadata, list): + for entry in metadata: + if not isinstance(entry, dict): + continue + if entry.get('key') == 'dc.title': + return bitstreams._normalize_signature_value(entry.get('value')) + + return None + + @staticmethod + def _extract_remote_checksum(remote): + if not isinstance(remote, dict): + return None + + checksum = remote.get('checkSum') + if isinstance(checksum, dict): + value = bitstreams._normalize_signature_value(checksum.get('value')) + if value: + return value + + for key in ['checksum', 'checksumValue', 'checkSumValue']: + value = bitstreams._normalize_signature_value(remote.get(key)) + if value: + return value + + return None + + @staticmethod + def _source_signature(params, data): + sequence_id = bitstreams._normalize_signature_value((params or {}).get('sequenceId')) + checksum_value = bitstreams._normalize_signature_value( + ((data or {}).get('checkSum') or {}).get('value')) + name = bitstreams._extract_metadata_title((data or {}).get('metadata')) + return name, sequence_id, checksum_value + + @staticmethod + def _remote_signature(remote): + if not isinstance(remote, dict): + return None, None, None + + sequence_id = bitstreams._normalize_signature_value( + remote.get('sequenceId', remote.get('sequence_id'))) + checksum_value = bitstreams._extract_remote_checksum(remote) + name = bitstreams._extract_remote_name(remote) + return name, sequence_id, checksum_value + @property def imported(self): return self._imported['bitstream'] @@ -183,6 +290,8 @@ def _bitstream_import_to(self, env, cache_file, dspace, metadatas, bitstreamform checkpoint_counter = 0 checkpoints_saved = 0 diagnostic_invalid_response_logs = 0 + recovered_by_lookup = 0 + bundle_bitstreams_cache = {} path_assetstore = env["assetstore"] fallback_rel_path = None @@ -203,14 +312,16 @@ def _bitstream_import_to(self, env, cache_file, dspace, metadatas, bitstreamform def _update_progress(pbar_ref): """Centralized progress-bar postfix update.""" - pbar_ref.set_postfix( - imported=self._imported['bitstream'], - skipped_deleted=skipped_deleted, - resumed=skipped_already_imported, - errored=errored, - checkpoints=checkpoints_saved, - to_checkpoint=checkpoint_every - checkpoint_counter, - ) + if hasattr(pbar_ref, 'set_postfix'): + pbar_ref.set_postfix( + imported=self._imported['bitstream'], + recovered=recovered_by_lookup, + skipped_deleted=skipped_deleted, + resumed=skipped_already_imported, + errored=errored, + checkpoints=checkpoints_saved, + to_checkpoint=checkpoint_every - checkpoint_counter, + ) def _record_error(b_id_val): """Record a failed bitstream id and emit diagnostics for repeated failures in testing mode.""" @@ -230,6 +341,53 @@ def _record_error(b_id_val): f'Verify testing fallback bitstream on server assetstore: ' f'relative_path=[{fallback_rel_path}] full_path=[{fallback_full_path}] exists=[{exists_text}].') + def _mark_imported(b_id_val, uuid_val, recovered=False): + nonlocal checkpoint_counter, checkpoints_saved, subsequent_errors, recovered_by_lookup + self._id2uuid[str(b_id_val)] = str(uuid_val) + self._imported["bitstream"] += 1 + subsequent_errors = 0 + if recovered: + recovered_by_lookup += 1 + checkpoint_counter += 1 + if checkpoint_counter >= checkpoint_every: + checkpoint_counter = 0 + checkpoints_saved += 1 + self.serialize(cache_file) + + def _fetch_bundle_bitstreams(bundle_uuid, force_refresh=False): + if not bundle_uuid: + return [] + if force_refresh or bundle_uuid not in bundle_bitstreams_cache: + try: + bundle_bitstreams_cache[bundle_uuid] = dspace.fetch_bundle_bitstreams(bundle_uuid) or [] + except Exception as e: + _logger.warning( + f'Unable to fetch existing bundle bitstreams for [{bundle_uuid}]: [{str(e)}]') + bundle_bitstreams_cache[bundle_uuid] = [] + return bundle_bitstreams_cache.get(bundle_uuid, []) + + def _find_existing_uuid(bundle_uuid, source_signature, force_refresh=False): + src_name, src_seq, src_checksum = source_signature + if not bundle_uuid: + return None + if src_name is None and src_seq is None and src_checksum is None: + return None + + for remote in _fetch_bundle_bitstreams(bundle_uuid, force_refresh=force_refresh): + remote_name, remote_seq, remote_checksum = self._remote_signature(remote) + + if src_seq is not None and src_seq != remote_seq: + continue + if src_name is not None and src_name != remote_name: + continue + if src_checksum is not None and src_checksum != remote_checksum: + continue + + remote_uuid = remote.get('id', remote.get('uuid')) + if remote_uuid is not None: + return str(remote_uuid) + return None + pbar = progress_bar(self._bs) for i, b in enumerate(pbar): b_id = b['bitstream_id'] @@ -326,9 +484,30 @@ def _record_error(b_id_val): # set primaryBundle_id from None to id if b_id in bundles.primary: params['primaryBundle_id'] = bundles.uuid(bundles.primary[b_id]) + + source_signature = self._source_signature(params, data) + matched_uuid = _find_existing_uuid(params.get('bundle_id'), source_signature) + if matched_uuid is not None: + _mark_imported(b_id, matched_uuid, recovered=True) + _logger.warning( + f'put_bitstream [{b_id}] reused existing UUID by signature match before POST: [{matched_uuid}]') + if (i + 1) % 200 == 0: + _update_progress(pbar) + continue + try: resp = dspace.put_bitstream(params, data) if not isinstance(resp, dict) or 'id' not in resp: + recovered_uuid = _find_existing_uuid( + params.get('bundle_id'), source_signature, force_refresh=True) + if recovered_uuid is not None: + _mark_imported(b_id, recovered_uuid, recovered=True) + _logger.warning( + f'put_bitstream [{b_id}] recovered existing UUID after invalid response: [{recovered_uuid}]') + if (i + 1) % 200 == 0: + _update_progress(pbar) + continue + if b['deleted']: _logger.warning( f'put_bitstream [{b_id}] returned invalid response for deleted bitstream: [{resp}] - skipping') @@ -345,18 +524,22 @@ def _record_error(b_id_val): _record_error(b_id) _update_progress(pbar) continue - self._id2uuid[str(b_id)] = resp['id'] - self._imported["bitstream"] += 1 - subsequent_errors = 0 - checkpoint_counter += 1 - if checkpoint_counter >= checkpoint_every: - checkpoint_counter = 0 - checkpoints_saved += 1 - self.serialize(cache_file) + _mark_imported(b_id, resp['id']) + if checkpoint_counter == 0: _update_progress(pbar) if b['deleted']: _logger.warning(f'Imported bitstream is deleted! UUID: {resp["id"]}') except Exception as e: + recovered_uuid = _find_existing_uuid( + params.get('bundle_id'), source_signature, force_refresh=True) + if recovered_uuid is not None: + _mark_imported(b_id, recovered_uuid, recovered=True) + _logger.warning( + f'put_bitstream [{b_id}] recovered existing UUID after exception: [{recovered_uuid}]') + if (i + 1) % 200 == 0: + _update_progress(pbar) + continue + _logger.error(f'put_bitstream [{b_id}]: failed. Exception: [{str(e)}]') _record_error(b_id) _update_progress(pbar) @@ -378,6 +561,9 @@ def _record_error(b_id_val): if errored: _logger.warning( f'Bitstream import skipped/errored [{errored}] records due to invalid/failed responses.') + if recovered_by_lookup: + _logger.warning( + f'Bitstream import recovered [{recovered_by_lookup}] records by existing bundle lookup.') # do bitstream checksum for the last imported bitstreams # these bitstreams can be less than 500, so it is not calculated in a loop diff --git a/tests/test_bitstream_recovery.py b/tests/test_bitstream_recovery.py new file mode 100644 index 0000000..14d462a --- /dev/null +++ b/tests/test_bitstream_recovery.py @@ -0,0 +1,185 @@ +import json +import os +import tempfile +import unittest +import sys +import types +import importlib.util + + +ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +LIB_DIR = os.path.join(ROOT_DIR, "libs", "dspace-rest-python") +if LIB_DIR not in sys.path: + sys.path.insert(0, LIB_DIR) + + +def _load_module(module_name, file_path, package_name, package_path): + if package_name not in sys.modules: + package = types.ModuleType(package_name) + package.__path__ = [package_path] + sys.modules[package_name] = package + + spec = importlib.util.spec_from_file_location(module_name, file_path) + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + return module + + +rest = _load_module( + "dspace._rest", + os.path.join(ROOT_DIR, "src", "dspace", "_rest.py"), + "dspace", + os.path.join(ROOT_DIR, "src", "dspace"), +).rest + +bitstreams = _load_module( + "pump._bitstream", + os.path.join(ROOT_DIR, "src", "pump", "_bitstream.py"), + "pump", + os.path.join(ROOT_DIR, "src", "pump"), +).bitstreams + + +class _StubMetadatas: + def value(self, _rtype, _rid, log_missing=True): + return { + "dc.title": [ + { + "value": "sample-audio.flac", + "language": None, + "authority": None, + "confidence": -1, + } + ] + } + + def filter_res_d(self, data, _ignored_fields): + return data + + +class _StubBitstreamFormatRegistry: + unknown_format_id = 1 + + def mimetype(self, _format_id): + return "audio/flac" + + +class _StubBundles: + primary = {} + + def uuid(self, _bundle_id): + return "bundle-uuid-1" + + +class _StubCommunities: + logos = {} + + +class _StubCollections: + logos = {} + + +def _write_json(path, data): + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f) + + +class TestBitstreamRecovery(unittest.TestCase): + def test_put_bitstream_disables_transient_retry(self): + r = rest.__new__(rest) + captured = {} + + def _fake_iput(url, arr, params=None, allow_transient_retry=True): + captured["url"] = url + captured["allow_transient_retry"] = allow_transient_retry + return iter([{"id": "uuid-123"}]) + + r._iput = _fake_iput + + resp = r.put_bitstream({"sequenceId": 1}, {"metadata": {}}) + + self.assertEqual(resp["id"], "uuid-123") + self.assertEqual(captured["url"], "clarin/import/core/bitstream") + self.assertFalse(captured["allow_transient_retry"]) + + def test_recovers_uuid_after_invalid_response(self): + with tempfile.TemporaryDirectory() as tmp: + bitstream_json = os.path.join(tmp, "bitstream.json") + bundle2bitstream_json = os.path.join(tmp, "bundle2bitstream.json") + cache_json = os.path.join(tmp, "cache.json") + + _write_json( + bitstream_json, + [ + { + "bitstream_id": 1, + "deleted": False, + "size_bytes": 123, + "checksum_algorithm": "MD5", + "checksum": "abc123", + "bitstream_format_id": 1, + "internal_id": "00112233445566778899", + "store_number": 0, + "sequence_id": 3, + } + ], + ) + _write_json(bundle2bitstream_json, [{"bitstream_id": 1, "bundle_id": 11}]) + + entity = bitstreams(bitstream_json, bundle2bitstream_json) + + class _StubDspace: + def __init__(self): + self.fetch_calls = 0 + self.put_calls = 0 + + def put_bitstream(self, _params, _data): + self.put_calls += 1 + return None + + def fetch_bundle_bitstreams(self, _bundle_uuid): + self.fetch_calls += 1 + if self.fetch_calls == 1: + return [] + return [ + { + "id": "uuid-recovered-1", + "sequenceId": 3, + "name": "sample-audio.flac", + "checkSum": {"value": "abc123"}, + } + ] + + def add_checksums(self): + return None + + dspace = _StubDspace() + + env = { + "backend": { + "ignore_deleted_bitstreams": False, + "testing": False, + }, + "assetstore": "", + } + + entity._bitstream_import_to( + env, + cache_json, + dspace, + _StubMetadatas(), + _StubBitstreamFormatRegistry(), + _StubBundles(), + _StubCommunities(), + _StubCollections(), + ) + + self.assertEqual(entity.uuid(1), "uuid-recovered-1") + self.assertEqual(entity.imported, 1) + self.assertEqual(dspace.put_calls, 1) + self.assertEqual(dspace.fetch_calls, 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_issue_665_replication.py b/tests/test_issue_665_replication.py new file mode 100644 index 0000000..843c9f2 --- /dev/null +++ b/tests/test_issue_665_replication.py @@ -0,0 +1,125 @@ +import unittest + + +class _FakeBackend: + """In-memory backend that simulates timeout/invalid-response behavior.""" + + def __init__(self): + self._next_id = 100 + self.created = [] + self._attempts = {} + + def _new_id(self): + self._next_id += 1 + return str(self._next_id) + + def create_bitstream(self, signature): + attempt = self._attempts.get(signature, 0) + 1 + self._attempts[signature] = attempt + + # Simulate server-side create + client timeout for both signatures on first attempt. + if attempt == 1: + created_id = self._new_id() + self.created.append((signature, created_id)) + raise TimeoutError("Read timed out") + + # Signature A then returns invalid response (legacy importer fails to map it). + if signature[0] == "makon-flac.1" and attempt == 2: + return None + + # Signature B creates duplicate on retry and returns that UUID. + created_id = self._new_id() + self.created.append((signature, created_id)) + return {"id": created_id} + + def find_by_signature(self, signature): + for sig, uuid in self.created: + if sig == signature: + return uuid + return None + + def count_by_signature(self, signature): + return sum(1 for sig, _ in self.created if sig == signature) + + +def _legacy_import_once(source_signatures, backend): + """Replicates old behavior: retries non-idempotent create + no recovery lookup.""" + mapping = {} + for source_id, signature in source_signatures.items(): + resp = None + for _attempt in range(3): + try: + resp = backend.create_bitstream(signature) + except Exception: + continue + break + + if isinstance(resp, dict) and "id" in resp: + mapping[source_id] = resp["id"] + return mapping + + +def _fixed_import_once(source_signatures, backend): + """Replicates new behavior: no transient retry + lookup recovery by signature.""" + mapping = {} + for source_id, signature in source_signatures.items(): + existing = backend.find_by_signature(signature) + if existing is not None: + mapping[source_id] = existing + continue + + resp = None + try: + # Single attempt only for non-idempotent create. + resp = backend.create_bitstream(signature) + except Exception: + pass + + if isinstance(resp, dict) and "id" in resp: + mapping[source_id] = resp["id"] + continue + + recovered = backend.find_by_signature(signature) + if recovered is not None: + mapping[source_id] = recovered + + return mapping + + +class TestIssue665Replication(unittest.TestCase): + def test_replication_with_legacy_behavior(self): + source = { + 863: ("makon-flac.1", "1", "c1"), + 864: ("makon-flac.3", "3", "c3"), + } + backend = _FakeBackend() + + mapping = _legacy_import_once(source, backend) + + # Missing mapping for one source bitstream. + self.assertNotIn(863, mapping) + self.assertIn(864, mapping) + + # Duplicate create for bitstream 864 signature. + self.assertEqual(backend.count_by_signature(source[864]), 2) + + def test_fixed_behavior_prevents_missing_and_duplicate(self): + source = { + 863: ("makon-flac.1", "1", "c1"), + 864: ("makon-flac.3", "3", "c3"), + } + backend = _FakeBackend() + + mapping = _fixed_import_once(source, backend) + + # Both source bitstreams are mapped by response or recovery lookup. + self.assertIn(863, mapping) + self.assertIn(864, mapping) + + # No duplicate create for either signature. + self.assertEqual(backend.count_by_signature(source[863]), 1) + self.assertEqual(backend.count_by_signature(source[864]), 1) + + +if __name__ == "__main__": + unittest.main()