diff --git a/modules/weko-search-ui/weko_search_ui/utils.py b/modules/weko-search-ui/weko_search_ui/utils.py index 3c02cde998..c45f1ccf80 100644 --- a/modules/weko-search-ui/weko_search_ui/utils.py +++ b/modules/weko-search-ui/weko_search_ui/utils.py @@ -931,7 +931,7 @@ def check_jsonld_import_items( with open(f"{data_path}/{json_name}", "r") as f: json_ld = json.load(f) mapper.data_path = data_path - item_metadatas, _ = mapper.to_item_metadata(json_ld) + item_metadatas, x = mapper.to_item_metadata(json_ld) list_record = [ { "$schema": f"/items/jsonschema/{item_type.id}", @@ -1447,7 +1447,8 @@ def handle_validate_item_import(list_record, schema) -> list: v2 = Draft4Validator(schema) if schema else None for record in list_record: errors = record.get("errors") or [] - warnings = [] + new_warnings = [] + warnings = record.get("warnings") or [] record_id = record.get("id") if record_id and ( not represents_int(record_id) or re.search(r"([0-9])", record_id) @@ -1472,7 +1473,7 @@ def handle_validate_item_import(list_record, schema) -> list: last_key = path_list[-1] target[last_key] = str(target[last_key]) target_path = ".".join([str(p) for p in path_list[:-2]]) - warnings.append( + new_warnings.append( _("Replace value of %(target_path)s from %(old_value)s to '%(new_value)s'.", target_path=target_path, old_value=target[last_key], new_value=str(target[last_key]) ) @@ -1490,13 +1491,14 @@ def handle_validate_item_import(list_record, schema) -> list: records = dict(**record) records["errors"] = errors if len(errors) else None - if len(warnings) > 0: + if len(new_warnings) > 0: warnings.append( _("Specified %(type)s is different from existing %(existing_type)s.", type="type:integer", existing_type="type:string" ) ) - records["warnings"] = warnings if len(warnings) else None + warnings.extend(new_warnings) + records["warnings"] = warnings result.append(records) return result diff --git a/tools/replace_db_fqdn.py b/tools/replace_db_fqdn.py new file mode 100644 index 0000000000..420a6a978d --- /dev/null +++ b/tools/replace_db_fqdn.py @@ -0,0 +1,340 @@ +import os +import re +import sys +import time +import traceback + +from sqlalchemy import create_engine, inspect, text + +if len(sys.argv) < 3: + print("Usage: python replace_db_fqdn.py ") + sys.exit(1) + +ofqdn = sys.argv[1] +nfqdn = sys.argv[2] +ofqdn_underscore = ofqdn.replace(".", "_").replace("-", "_") +nfqdn_underscore = nfqdn.replace(".", "_").replace("-", "_") +ofqdn_escape = re.escape(ofqdn) + +USERNAME = os.getenv("INVENIO_POSTGRESQL_DBUSER") +PASSWORD = os.getenv("INVENIO_POSTGRESQL_DBPASS") +HOST = os.getenv("INVENIO_POSTGRESQL_HOST") +PORT = 5432 +DBNAME = os.getenv("INVENIO_POSTGRESQL_DBNAME") +DATABASE_URL = f"postgresql+psycopg2://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}" + +engine = create_engine(DATABASE_URL) + + +def update_records( + select_sql, update_sql, table_name="", column_name="", batch_size=1000 +): + """Executes SQL queries. + + Executes batch updates on a specified table and column using provided SQL queries. + + Args: + select_sql (str): SQL query to select the target record IDs. + update_sql (str): + SQL update statement, using :ids as a parameter for batch IDs. + table_name (str, optional): + Name of the table being updated (for logging). Defaults to "". + column_name (str, optional): + Name of the column being updated (for logging). Defaults to "". + batch_size (int, optional): + Number of records to update per batch. Defaults to 1000. + + Behavior: + - Fetches IDs using select_ids_sql. + - Updates records in batches using update_sql_template. + - Commits the transaction only if all batches succeed; rolls back if any batch fails. + - Logs progress and errors. + """ + if not column_exists(engine, table_name, column_name): + print(f"[INFO] {table_name}.{column_name} does not exist. Skipped.") + return + with engine.connect() as conn: + trans = conn.begin() + result = conn.execute(text(select_sql)) + total = 0 + batch_num = 1 + total_start = time.time() + success = True + while True: + rows = result.fetchmany(batch_size) + if not rows: + break + batch_ids = [row[0] for row in rows] + try: + conn.execute(text(update_sql), {"ids": tuple(batch_ids)}) + except Exception as e: + print(f"[ERROR] {table_name}.{column_name}: Batch {batch_num} failed") + print(f"Error: {e}") + traceback.print_exc() + success = False + break + elapsed = time.time() - total_start + total += len(batch_ids) + if total % 1000 == 0: + print( + f"[INFO] {table_name}.{column_name}: {total} records processed (elapsed time: {elapsed:.2f} seconds)" + ) + batch_num += 1 + total_elapsed = time.time() - total_start + if success: + trans.commit() + print( + f"[INFO] {table_name}.{column_name}: {total} records replaced/updated, elapsed time: {total_elapsed:.2f} seconds" + ) + else: + trans.rollback() + print(f"[ERROR] {table_name}.{column_name}: Rolled back due to error") + + +def column_exists(engine, table_name, column_name): + """ + Check if a column exists in a table. + """ + inspector = inspect(engine) + columns = [col["name"] for col in inspector.get_columns(table_name)] + return column_name in columns + + +# files_location +select_files_location = """ + SELECT id from files_location; +""" +update_files_location = f""" + UPDATE files_location + SET uri = regexp_replace(uri, '{ofqdn_underscore}', '{nfqdn_underscore}') + WHERE id IN :ids; +""" + +# files_files.uri +select_files_files_uri = """ + SELECT id FROM files_files; +""" +update_files_files_uri = f""" + UPDATE files_files + SET uri = regexp_replace(uri, '{ofqdn_underscore}', '{nfqdn_underscore}') + WHERE id IN :ids; +""" + +# files_files +select_files_files_json = """ + SELECT id FROM files_files WHERE json ? 'url' AND (json->'url') ? 'url' AND json->'url'->>'url' IS NOT NULL; +""" +update_files_files_json = f""" + UPDATE files_files + SET json = jsonb_set( + json, + '{{url,url}}', + to_jsonb( + regexp_replace(json->'url'->>'url', '{ofqdn_escape}', '{nfqdn}') + ) + ) + WHERE id IN :ids; +""" + +# records_metadata +select_records_metadata = """ + SELECT id FROM records_metadata; +""" +update_records_metadata = f""" + UPDATE records_metadata + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# pidstore_pid +select_pidstore_pid = """ + SELECT id FROM pidstore_pid + WHERE pid_type = 'oai'; +""" +update_pidstore_pid = f""" + UPDATE pidstore_pid + SET pid_value = regexp_replace(pid_value, '{ofqdn_escape}', '{nfqdn}') + WHERE id IN :ids; +""" + +# feedback_email_setting +select_feedback_email_setting = """ + SELECT id FROM feedback_email_setting; +""" +update_feedback_email_setting = f""" + UPDATE feedback_email_setting + SET root_url = regexp_replace(root_url, '{ofqdn_escape}', '{nfqdn}') + WHERE id IN :ids; +""" + +# index +select_index = """ +SELECT id FROM index; +""" +update_index = f""" + UPDATE index + SET index_url = regexp_replace(index_url, '{ofqdn_escape}', '{nfqdn}') + WHERE id in :ids; +""" + +# changelist_indexes +select_changelist_indexes = """ + SELECT id FROM changelist_indexes; +""" +update_changelist_indexes = f""" + UPDATE changelist_indexes + SET url_path = regexp_replace(url_path, '{ofqdn_escape}', '{nfqdn}') + WHERE id in :ids; +""" + +# resourcelist_indexes +select_resourcelist_indexes = """ + SELECT id FROM resourcelist_indexes; +""" +update_resourcelist_indexes = f""" + UPDATE resourcelist_indexes + SET url_path = regexp_replace(url_path, '{ofqdn_escape}', '{nfqdn}') + WHERE id in :ids; +""" + +# widget_multi_lang_data +select_widget_multi_lang_data = """ + SELECT id FROM widget_multi_lang_data; +""" +update_widget_multi_lang_data = f""" + UPDATE widget_multi_lang_data + SET description_data = regexp_replace(description_data::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# widget_design_page +select_widget_design_page = """ + SELECT repository_id FROM widget_design_page; +""" +update_widget_design_page = f""" + UPDATE widget_design_page + SET settings = regexp_replace(settings::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE repository_id IN :ids; +""" + +# widget_design_setting +select_widget_design_setting = """ + SELECT repository_id FROM widget_design_setting; +""" +update_widget_design_setting = f""" + UPDATE widget_design_setting + SET settings = regexp_replace(settings::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE repository_id in :ids;; +""" + +# workflow_activity +select_workflow_activity = """ + SELECT id FROM workflow_activity WHERE temp_data IS NOT NULL; +""" +update_workflow_activity = f""" + UPDATE workflow_activity + SET temp_data = regexp_replace(temp_data::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id in :ids; +""" + +# item_metadata +select_item_metadata = """ + SELECT id from item_metadata; +""" +update_item_metadata = f""" + UPDATE item_metadata + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# item_metadata_version +select_item_metadata_version = """ + SELECT id from item_metadata_version; +""" +update_item_metadata_version = f""" + UPDATE item_metadata_version + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# records_metadata_version +select_records_metadata_version = """ + SELECT id from records_metadata_version; +""" +update_records_metadata_version = f""" + UPDATE records_metadata_version + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + + +if __name__ == "__main__": + update_records( + select_files_location, update_files_location, "files_location", "uri" + ) + update_records(select_files_files_uri, update_files_files_uri, "files_files", "uri") + update_records( + select_files_files_json, update_files_files_json, "files_files", "json" + ) + update_records( + select_pidstore_pid, update_pidstore_pid, "pidstore_pid", "pid_value" + ) + update_records( + select_feedback_email_setting, + update_feedback_email_setting, + "feedback_email_setting", + "root_url", + ) + update_records(select_index, update_index, "index", "index_url") + update_records( + select_changelist_indexes, + update_changelist_indexes, + "changelist_indexes", + "url_path", + ) + update_records( + select_resourcelist_indexes, + update_resourcelist_indexes, + "resourcelist_indexes", + "url_path", + ) + update_records( + select_widget_multi_lang_data, + update_widget_multi_lang_data, + "widget_multi_lang_data", + "description_data", + ) + update_records( + select_widget_design_setting, + update_widget_design_setting, + "widget_design_setting", + "settings", + ) + update_records( + select_widget_design_page, + update_widget_design_page, + "widget_design_page", + "settings", + ) + update_records( + select_workflow_activity, + update_workflow_activity, + "workflow_activity", + "temp_data", + ) + update_records( + select_records_metadata, update_records_metadata, "records_metadata", "json" + ) + update_records(select_item_metadata, update_item_metadata, "item_metadata", "json") + update_records( + select_records_metadata_version, + update_records_metadata_version, + "records_metadata_version", + "json", + ) + update_records( + select_item_metadata_version, + update_item_metadata_version, + "item_metadata_version", + "json", + ) diff --git a/tools/replace_es_fqdn.py b/tools/replace_es_fqdn.py new file mode 100644 index 0000000000..571f11079c --- /dev/null +++ b/tools/replace_es_fqdn.py @@ -0,0 +1,215 @@ +import os +import re +import sys +import time +import traceback + +from elasticsearch import Elasticsearch + + +def replace_fqdn_in_source(source, ofqdn, nfqdn): + """ + Replace ofqdn with nfqdn in specific fields of the source dict. + + Args: + source (dict): Target dictionary. + ofqdn (str): Old FQDN to replace. + nfqdn (str): New FQDN to use. + + Returns: + dict: Updated dictionary. + """ + # _oai -> id + oai = source.get("_oai", {}) + oai_id = oai.get("id") + if oai_id and ofqdn in oai_id: + oai["id"] = re.sub(re.escape(ofqdn), nfqdn, oai_id) + + # _item_metadata -> _oai -> id + item_metadata = source.get("_item_metadata", {}) + oai_meta = item_metadata.get("_oai", {}) + oai_meta_id = oai_meta.get("id") + if oai_meta_id and ofqdn in oai_meta_id: + oai_meta["id"] = re.sub(re.escape(ofqdn), nfqdn, oai_meta_id) + item_metadata["_oai"] = oai_meta + + # file property -> url -> url + for meta_val in item_metadata.values(): + if isinstance(meta_val, dict) and meta_val.get("attribute_type") == "file": + for file_obj in meta_val.get("attribute_value_mlt", []): + url_dict = file_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + file_obj["url"]["url"] = re.sub(re.escape(ofqdn), nfqdn, url_val) + + # content -> url -> url + if "content" in source and isinstance(source["content"], list): + for content_obj in source["content"]: + url_dict = content_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + content_obj["url"]["url"] = re.sub(re.escape(ofqdn), nfqdn, url_val) + + # file -> URI -> value + if "file" in source and isinstance(source["file"], dict): + uri_list = source["file"].get("URI", []) + for uri_obj in uri_list: + uri_val = uri_obj.get("value") + if uri_val and ofqdn in uri_val: + uri_obj["value"] = re.sub(re.escape(ofqdn), nfqdn, uri_val) + + source["_oai"] = oai + source["_item_metadata"] = item_metadata + return source + + +def update_es_records(ofqdn, nfqdn, id_file_path=None): + """ + Update Elasticsearch records. + If id_file_path is given, only update documents whose IDs are listed in the file. + Otherwise, update all documents in the index. + + Args: + ofqdn (str): The original fully qualified domain name to be replaced. + nfqdn (str): The new fully qualified domain name to use as a replacement. + id_file_path (str, optional): Path to a file containing document IDs (one per line). + """ + HOST = os.getenv("INVENIO_ELASTICSEARCH_HOST") + INDEX_PREFIX = os.getenv("SEARCH_INDEX_PREFIX") + PORT = 9200 + index_name = f"{INDEX_PREFIX}-weko-item-v1.0.0" + es = Elasticsearch(f"http://{HOST}:{PORT}") + + if id_file_path: + with open(id_file_path, "r") as f: + id_list = [line.strip() for line in f if line.strip()] + print(f"[INFO] {index_name}: {len(id_list)} IDs loaded from {id_file_path}") + else: + id_list = None + + query = {"query": {"match_all": {}}} + + try: + if id_list is not None: + result_count = len(id_list) + else: + result_count = es.count(index=index_name, body=query)["count"] + print(f"[INFO] {index_name}: Total items: {result_count}") + except Exception as e: + print(f"[ERROR] {index_name}: Failed to get count") + print(f"Error: {e}") + traceback.print_exc() + sys.exit(1) + + start_time = time.time() + failed_count = 0 + count = 0 + + try: + if id_list is not None: + for doc_id in id_list: + try: + hit = es.get(index=index_name, doc_type="_doc", id=doc_id) + except Exception as e: + print(f"[ERROR] {index_name}: Failed to get document ID: {doc_id}") + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + + source = hit["_source"] + version = hit.get("_version", 1) + new_source = replace_fqdn_in_source(source, ofqdn, nfqdn) + + try: + es.index( + index=index_name, + doc_type="_doc", + id=doc_id, + body=new_source, + version=version, + version_type="external_gte", + ) + except Exception as e: + print( + f"[ERROR] {index_name}: Failed to update document ID: {doc_id}" + ) + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + count += 1 + if count % 1000 == 0: + elapsed = time.time() - start_time + print( + f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)" + ) + else: + scroll = es.search( + index=index_name, + body=query, + params={"version": "true"}, + scroll="2m", + size=1000, + ) + scroll_id = scroll["_scroll_id"] + hits = scroll["hits"]["hits"] + + while hits: + for hit in hits: + doc_id = hit["_id"] + source = hit["_source"] + version = hit.get("_version", 1) + new_source = replace_fqdn_in_source(source, ofqdn, nfqdn) + + try: + es.index( + index=index_name, + doc_type="_doc", + id=doc_id, + body=new_source, + version=version, + version_type="external_gte", + ) + except Exception as e: + print( + f"[ERROR] {index_name}: Failed to update document ID: {doc_id}" + ) + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + count += 1 + + if count % 1000 == 0: + elapsed = time.time() - start_time + print( + f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)" + ) + + scroll = es.scroll(scroll_id=scroll_id, scroll="2m") + scroll_id = scroll["_scroll_id"] + hits = scroll["hits"]["hits"] + + total_elapsed = time.time() - start_time + + print( + f"[INFO] {index_name}: {count} items replaced/updated elapsed time: {total_elapsed:.2f} seconds" + ) + print(f"[INFO] count(success, error): ({count}, {failed_count})") + + except Exception as e: + print(f"[ERROR] {index_name}: Unexpected error") + print(f"Error: {e}") + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage: python replace_es_fqdn.py [id_file_path]") + sys.exit(1) + ofqdn = sys.argv[1] + nfqdn = sys.argv[2] + id_file_path = sys.argv[3] if len(sys.argv) > 3 else None + update_es_records(ofqdn, nfqdn, id_file_path)