Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions src/dspace/_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,24 @@ def fetch_raw_item(self, uuid: str):
raise Exception(r)
return response_to_json(r)

def fetch_bundle_bitstreams(self, bundle_uuid: str, page_size: int = 100):
"""Fetch all bitstreams currently assigned to one bundle."""
page = 0
out = []
url = f'core/bundles/{bundle_uuid}/bitstreams'
while True:
r = self._fetch(url, self.get, '_embedded', params={'page': page, 'size': page_size})
if r is None:
break
chunk = r.get('bitstreams', []) if isinstance(r, dict) else []
if not chunk:
break
out.extend(chunk)
if len(chunk) < page_size:
break
page += 1
return out

# =======

def put_usermetadata(self, params: dict, data: dict):
Expand Down Expand Up @@ -405,7 +423,9 @@ def add_checksums(self):
def put_bitstream(self, param: dict, data: dict):
url = 'clarin/import/core/bitstream'
_logger.debug(f"Importing [][{param}] using [{url}]")
return list(self._iput(url, [data], [param]))[0]
# Bitstream creation endpoint is non-idempotent in practice.
# Disable transient retries to avoid duplicate rows after read timeouts.
return list(self._iput(url, [data], [param], allow_transient_retry=False))[0]

def put_com_logo(self, param: dict):
url = 'clarin/import/logo/community'
Expand Down Expand Up @@ -588,18 +608,33 @@ def _put(self, url: str, arr: list, params: list = None):
list(self._iput(url, arr, params))
return len(arr)

def _iput(self, url: str, arr: list, params=None):
def _iput(self, url: str, arr: list, params=None, allow_transient_retry: bool = True):
_logger.debug(f"Importing {len(arr)} using [{url}]")
if params is not None:
assert len(params) == len(arr)

for i, data in enumerate(progress_bar(arr)):
param = params[i] if params is not None else None
result = self._post_with_retry(url, data, param, i, len(arr))
result = self._post_with_retry(
url,
data,
param,
i,
len(arr),
allow_transient_retry=allow_transient_retry,
)
yield result
_logger.debug(f"Imported [{url}] successfully")

def _post_with_retry(self, url: str, data, param, item_index: int, total_items: int):
def _post_with_retry(
self,
url: str,
data,
param,
item_index: int,
total_items: int,
allow_transient_retry: bool = True,
):
"""POST with retry logic for handling temporary server errors"""

# Check if circuit breaker is blocking requests due to consecutive errors
Expand All @@ -619,7 +654,9 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items:
last_exception = None
last_response = None

for attempt in range(HTTP_MAX_RETRIES):
max_attempts = HTTP_MAX_RETRIES if allow_transient_retry else 1

for attempt in range(max_attempts):
try:
self._maybe_reauthenticate()
r = self.post(url, params=param, data=data)
Expand All @@ -640,21 +677,21 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items:
)
if attempt > 0:
_logger.debug(
f"POST [{url}] succeeded on attempt {attempt + 1}/{HTTP_MAX_RETRIES}")
f"POST [{url}] succeeded on attempt {attempt + 1}/{max_attempts}")
return js
except Exception:
return r

# Handle auth errors (recoverable via re-auth)
elif r.status_code in [401, 403]:
last_response = r
if attempt == HTTP_MAX_RETRIES - 1:
if attempt == max_attempts - 1:
_logger.warning(
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - final attempt")
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - final attempt")
break

_logger.warning(
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - re-authenticating")
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - re-authenticating")
if not self._maybe_reauthenticate(force=True):
_logger.warning("Re-authentication failed")
break
Expand All @@ -666,19 +703,19 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items:
self._handle_circuit_breaker(r.status_code)
retry_delay = HTTP_RETRY_DELAY * (HTTP_RETRY_BACKOFF ** attempt)

if attempt == HTTP_MAX_RETRIES - 1:
if attempt == max_attempts - 1:
# Last attempt - no retry will happen
_logger.warning(
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - final attempt")
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - final attempt")
elif attempt == 0:
# First attempt - log with retry info
_logger.warning(
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES}) - retrying in {retry_delay}s")
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts}) - retrying in {retry_delay}s")
else:
_logger.debug(
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{HTTP_MAX_RETRIES})")
f"POST [{url}] HTTP {r.status_code} (attempt {attempt + 1}/{max_attempts})")

if attempt < HTTP_MAX_RETRIES - 1:
if attempt < max_attempts - 1:
time.sleep(retry_delay)

# Re-authenticate on certain errors
Expand All @@ -701,12 +738,12 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items:

if attempt == 0 or attempt == HTTP_MAX_RETRIES - 1:
_logger.warning(
f"POST [{url}] exception (attempt {attempt + 1}/{HTTP_MAX_RETRIES}): {str(e)}")
f"POST [{url}] exception (attempt {attempt + 1}/{max_attempts}): {str(e)}")
else:
_logger.debug(
f"POST [{url}] exception (attempt {attempt + 1}/{HTTP_MAX_RETRIES}): {str(e)}")
f"POST [{url}] exception (attempt {attempt + 1}/{max_attempts}): {str(e)}")

if attempt < HTTP_MAX_RETRIES - 1:
if attempt < max_attempts - 1:
time.sleep(retry_delay)
continue

Expand All @@ -730,7 +767,7 @@ def _post_with_retry(self, url: str, data, param, item_index: int, total_items:
error_detail = sanitize_log_content(
str(last_exception) if last_exception else "Unknown error")

msg = f"POST [{url}] for [{ascii_data}] failed after {HTTP_MAX_RETRIES} attempts. Final error: {error_detail}"
msg = f"POST [{url}] for [{ascii_data}] failed after {max_attempts} attempts. Final error: {error_detail}"
_logger.error(msg)
return None

Expand Down
Loading