From 653130cf020627ff3e199d14f6039b3df77843a8 Mon Sep 17 00:00:00 2001 From: ayumi-nishida Date: Thu, 19 Feb 2026 17:31:52 +0900 Subject: [PATCH 1/4] =?UTF-8?q?=E3=83=AF=E3=83=BC=E3=83=8B=E3=83=B3?= =?UTF-8?q?=E3=82=B0=E3=81=8C=E4=B8=8A=E6=9B=B8=E3=81=8D=E3=81=95=E3=82=8C?= =?UTF-8?q?=E3=82=8B=E4=B8=8D=E5=85=B7=E5=90=88=E8=A7=A3=E6=B6=88=E3=80=81?= =?UTF-8?q?=5F=E6=9C=AA=E5=AE=9A=E7=BE=A9=E3=82=A8=E3=83=A9=E3=83=BC?= =?UTF-8?q?=E3=81=AB=E3=81=AA=E3=82=8B=E7=8F=BE=E8=B1=A1=E8=A7=A3=E6=B6=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/weko-search-ui/weko_search_ui/utils.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/weko-search-ui/weko_search_ui/utils.py b/modules/weko-search-ui/weko_search_ui/utils.py index 3c02cde998..c45f1ccf80 100644 --- a/modules/weko-search-ui/weko_search_ui/utils.py +++ b/modules/weko-search-ui/weko_search_ui/utils.py @@ -931,7 +931,7 @@ def check_jsonld_import_items( with open(f"{data_path}/{json_name}", "r") as f: json_ld = json.load(f) mapper.data_path = data_path - item_metadatas, _ = mapper.to_item_metadata(json_ld) + item_metadatas, x = mapper.to_item_metadata(json_ld) list_record = [ { "$schema": f"/items/jsonschema/{item_type.id}", @@ -1447,7 +1447,8 @@ def handle_validate_item_import(list_record, schema) -> list: v2 = Draft4Validator(schema) if schema else None for record in list_record: errors = record.get("errors") or [] - warnings = [] + new_warnings = [] + warnings = record.get("warnings") or [] record_id = record.get("id") if record_id and ( not represents_int(record_id) or re.search(r"([0-9])", record_id) @@ -1472,7 +1473,7 @@ def handle_validate_item_import(list_record, schema) -> list: last_key = path_list[-1] target[last_key] = str(target[last_key]) target_path = ".".join([str(p) for p in path_list[:-2]]) - warnings.append( + new_warnings.append( _("Replace value of %(target_path)s from %(old_value)s to '%(new_value)s'.", target_path=target_path, old_value=target[last_key], new_value=str(target[last_key]) ) @@ -1490,13 +1491,14 @@ def handle_validate_item_import(list_record, schema) -> list: records = dict(**record) records["errors"] = errors if len(errors) else None - if len(warnings) > 0: + if len(new_warnings) > 0: warnings.append( _("Specified %(type)s is different from existing %(existing_type)s.", type="type:integer", existing_type="type:string" ) ) - records["warnings"] = warnings if len(warnings) else None + warnings.extend(new_warnings) + records["warnings"] = warnings result.append(records) return result From 2630acb62d4d02ce2429ce89c8d76a2d4b391dc0 Mon Sep 17 00:00:00 2001 From: ivis-futagami Date: Fri, 27 Feb 2026 13:18:00 +0900 Subject: [PATCH 2/4] change fqdn --- tools/replace_db_fqdn.py | 267 +++++++++++++++++++++++++++++++++++++++ tools/replace_es_fqdn.py | 163 ++++++++++++++++++++++++ 2 files changed, 430 insertions(+) create mode 100644 tools/replace_db_fqdn.py create mode 100644 tools/replace_es_fqdn.py diff --git a/tools/replace_db_fqdn.py b/tools/replace_db_fqdn.py new file mode 100644 index 0000000000..9bcbd87694 --- /dev/null +++ b/tools/replace_db_fqdn.py @@ -0,0 +1,267 @@ +import sys +import os +import time +import traceback +from sqlalchemy import create_engine, text + +if len(sys.argv) < 3: + print("Usage: python replace_db_fqdn.py ") + sys.exit(1) + +ofqdn = sys.argv[1] +nfqdn = sys.argv[2] +ofqdn_underscore = ofqdn.replace(".", "_").replace("-", "_") +nfqdn_underscore = nfqdn.replace(".", "_").replace("-", "_") + +USERNAME = os.getenv("INVENIO_POSTGRESQL_DBUSER") +PASSWORD = os.getenv("INVENIO_POSTGRESQL_DBPASS") +HOST = os.getenv("INVENIO_POSTGRESQL_HOST") +PORT = 5432 +DBNAME = os.getenv("INVENIO_POSTGRESQL_DBNAME") +DATABASE_URL = f"postgresql+psycopg2://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}" + +engine = create_engine(DATABASE_URL) + +def update_records( + select_sql, update_sql, table_name="", column_name="", batch_size=1000 +): + """Executes SQL queries. + + Executes batch updates on a specified table and column using provided SQL queries. + + Args: + select_sql (str): SQL query to select the target record IDs. + update_sql (str): + SQL update statement, using :ids as a parameter for batch IDs. + table_name (str, optional): + Name of the table being updated (for logging). Defaults to "". + column_name (str, optional): + Name of the column being updated (for logging). Defaults to "". + batch_size (int, optional): + Number of records to update per batch. Defaults to 1000. + + Behavior: + - Fetches IDs using select_ids_sql. + - Updates records in batches using update_sql_template. + - Commits the transaction only if all batches succeed; rolls back if any batch fails. + - Logs progress and errors. + """ + with engine.connect() as conn: + trans = conn.begin() + result = conn.execute(text(select_sql)) + total = 0 + batch_num = 1 + total_start = time.time() + success = True + while True: + rows = result.fetchmany(batch_size) + if not rows: + break + batch_ids = [row[0] for row in rows] + try: + conn.execute(text(update_sql), {"ids": tuple(batch_ids)}) + except Exception as e: + print(f"[ERROR] {table_name}.{column_name}: Batch {batch_num} failed") + print(f"Error: {e}") + traceback.print_exc() + success = False + break + elapsed = time.time() - total_start + total += len(batch_ids) + if total % 1000 == 0: + print(f"[INFO] {table_name}.{column_name}: {total} records processed (elapsed time: {elapsed:.2f} seconds)") + batch_num += 1 + total_elapsed = time.time() - total_start + if success: + trans.commit() + print(f"[INFO] {table_name}.{column_name}: {total} records replaced/updated, elapsed time: {total_elapsed:.2f} seconds") + else: + trans.rollback() + print(f"[ERROR] {table_name}.{column_name}: Rolled back due to error") + + +# files_location +select_files_location = """ + SELECT id from files_location; +""" +update_files_location = f""" + UPDATE files_location + SET uri = regexp_replace(uri, '{ofqdn_underscore}', '{nfqdn_underscore}') + WHERE id IN :ids; +""" + +# files_files.uri +select_files_files_uri = """ + SELECT id FROM files_files; +""" +update_files_files_uri = f""" + UPDATE files_files + SET uri = regexp_replace(uri, '{ofqdn_underscore}', '{nfqdn_underscore}') + WHERE id IN :ids; +""" + +# files_files +select_files_files_json = """ + SELECT id FROM files_files WHERE json ? 'url' AND (json->'url') ? 'url' AND json->'url'->>'url' IS NOT NULL; +""" +update_files_files_json = f""" + UPDATE files_files + SET json = jsonb_set( + json, + '{{url,url}}', + to_jsonb( + regexp_replace(json->'url'->>'url', '{ofqdn}', '{nfqdn}') + ) + ) + WHERE id IN :ids; +""" + +# records_metadata +select_records_metadata = """ + SELECT id FROM records_metadata; +""" +update_records_metadata = f""" + UPDATE records_metadata + SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# pidstore_pid +select_pidstore_pid = """ + SELECT id FROM pidstore_pid + WHERE pid_type = 'oai'; +""" +update_pidstore_pid = f""" + UPDATE pidstore_pid + SET pid_value = regexp_replace(pid_value, '{ofqdn}', '{nfqdn}') + WHERE id IN :ids; +""" + +# feedback_email_setting +select_feedback_email_setting = """ + SELECT id FROM feedback_email_setting; +""" +update_feedback_email_setting = f""" + UPDATE feedback_email_setting + SET root_url = regexp_replace(root_url, '{ofqdn}', '{nfqdn}') + WHERE id IN :ids; +""" + +# index +select_index = """ +SELECT id FROM index; +""" +update_index = f""" + UPDATE index + SET index_url = regexp_replace(index_url, '{ofqdn}', '{nfqdn}') + WHERE id in :ids; +""" + +# changelist_indexes +select_changelist_indexes = """ + SELECT id FROM changelist_indexes; +""" +update_changelist_indexes = f""" + UPDATE changelist_indexes + SET url_path = regexp_replace(url_path, '{ofqdn}', '{nfqdn}') + WHERE id in :ids; +""" + +# resourcelist_indexes +select_resourcelist_indexes = """ + SELECT id FROM resourcelist_indexes; +""" +update_resourcelist_indexes = f""" + UPDATE resourcelist_indexes + SET url_path = regexp_replace(url_path, '{ofqdn}', '{nfqdn}') + WHERE id in :ids; +""" + +# widget_multi_lang_data +select_widget_multi_lang_data = """ + SELECT id FROM widget_multi_lang_data; +""" +update_widget_multi_lang_data = f""" + UPDATE widget_multi_lang_data + SET description_data = regexp_replace(description_data::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# widget_design_page +select_widget_design_page = """ + SELECT repository_id FROM widget_design_page; +""" +update_widget_design_page = f""" + UPDATE widget_design_page + SET settings = regexp_replace(settings::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE repository_id IN :ids; +""" + +# widget_design_setting +select_widget_design_setting = """ + SELECT repository_id FROM widget_design_setting; +""" +update_widget_design_setting = f""" + UPDATE widget_design_setting + SET settings = regexp_replace(settings::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE repository_id in :ids;; +""" + +# workflow_activity +select_workflow_activity = """ + SELECT id FROM workflow_activity WHERE temp_data IS NOT NULL; +""" +update_workflow_activity = f""" + UPDATE workflow_activity + SET temp_data = regexp_replace(temp_data::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id in :ids; +""" + +# item_metadata +select_item_metadata = """ + SELECT id from item_metadata; +""" +update_item_metadata = f""" + UPDATE item_metadata + SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# item_metadata_version +select_item_metadata_version = """ + SELECT id from item_metadata_version; +""" +update_item_metadata_version = f""" + UPDATE item_metadata_version + SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + +# records_metadata_version +select_records_metadata_version = """ + SELECT id from records_metadata_version; +""" +update_records_metadata_version = f""" + UPDATE records_metadata_version + SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + WHERE id IN :ids; +""" + + +if __name__ == "__main__": + update_records(select_files_location, update_files_location, "files_location", "uri") + update_records(select_files_files_uri, update_files_files_uri, "files_files", "uri") + update_records(select_files_files_json, update_files_files_json, "files_files", "json") + update_records(select_pidstore_pid, update_pidstore_pid, "pidstore_pid", "pid_value") + update_records(select_feedback_email_setting, update_feedback_email_setting, "feedback_email_setting", "root_url") + update_records(select_index, update_index, "index", "index_url") + update_records(select_changelist_indexes, update_changelist_indexes, "changelist_indexes", "url_path") + update_records(select_resourcelist_indexes, update_resourcelist_indexes, "resourcelist_indexes", "url_path") + update_records(select_widget_multi_lang_data, update_widget_multi_lang_data, "widget_multi_lang_data", "description_data") + update_records(select_widget_design_setting, update_widget_design_setting, "widget_design_setting", "description_data") + update_records(select_widget_design_page, update_widget_design_page, "widget_design_page", "description_page") + update_records(select_workflow_activity, update_workflow_activity, "workflow_activity", "temp_data") + update_records(select_records_metadata, update_records_metadata, "records_metadata", "json") + update_records(select_item_metadata, update_item_metadata, "item_metadata", "json") + update_records(select_records_metadata_version, update_records_metadata_version, "records_metadata_version", "json") + update_records(select_item_metadata_version, update_item_metadata_version, "item_metadata_version", "json") diff --git a/tools/replace_es_fqdn.py b/tools/replace_es_fqdn.py new file mode 100644 index 0000000000..99afc7cdb4 --- /dev/null +++ b/tools/replace_es_fqdn.py @@ -0,0 +1,163 @@ +import time +import re +import os +import sys +import traceback +from elasticsearch import Elasticsearch + +def update_es_records(ofqdn, nfqdn): + """ + Update Elasticsearch records. + Update Elasticsearch records, replacing all occurrences of `ofqdn` with `nfqdn` + in specific fields such as OAI IDs and URLs, for all documents in the index. + Connects to Elasticsearch using environment variables, iterates all documents, + replaces `ofqdn` in relevant fields, updates documents, and logs progress/errors. + + Args: + ofqdn (str): The original fully qualified domain name to be replaced. + nfqdn (str): The new fully qualified domain name to use as a replacement. + + Behavior: + - Connects to Elasticsearch using environment variables for host and index prefix. + - Iterates through all documents in the target index. + - For each document, replaces `ofqdn` with `nfqdn` in relevant fields. + - Updates each document in Elasticsearch, preserving versioning. + - Logs progress and errors. + - Prints a summary of the operation upon completion. + """ + HOST = os.getenv("INVENIO_ELASTICSEARCH_HOST") + INDEX_PREFIX = os.getenv("SEARCH_INDEX_PREFIX") + PORT = 9200 + index_name = f"{INDEX_PREFIX}-weko-item-v1.0.0" + es = Elasticsearch(f"http://{HOST}:{PORT}") + + query = { + "query": { + "match_all": {} + } + } + + try: + result_count = es.count(index=index_name, body=query) + print(f"[INFO] {index_name}: Total items: {result_count['count']}") + except Exception as e: + print(f"[ERROR] {index_name}: Failed to get count") + print(f"Error: {e}") + traceback.print_exc() + sys.exit(1) + + start_time = time.time() + + try: + scroll = es.search( + index=index_name, + body=query, + params={"version": "true"}, + scroll="2m", + size=1000 + ) + except Exception as e: + print(f"[ERROR] {index_name}: Failed to search") + print(f"Error: {e}") + traceback.print_exc() + sys.exit(1) + + scroll_id = scroll['_scroll_id'] + hits = scroll['hits']['hits'] + + failed_count = 0 + count = 0 + + try: + while hits: + for hit in hits: + doc_id = hit["_id"] + source = hit["_source"] + version = hit.get("_version", 1) + + # _oai -> id + oai = source.get("_oai", {}) + oai_id = oai.get("id") + if oai_id and ofqdn in oai_id: + new_oai_id = re.sub(re.escape(ofqdn), nfqdn, oai_id) + oai["id"] = new_oai_id + + # _item_metadata -> oid -> id + item_metadata = source.get("_item_metadata", {}) + oai_meta = item_metadata.get("_oai", {}) + oai_meta_id = oai_meta.get("id") + if oai_meta_id and ofqdn in oai_meta_id: + new_oai_meta_id = re.sub(re.escape(ofqdn), nfqdn, oai_meta_id) + oai_meta["id"] = new_oai_meta_id + item_metadata["_oai"] = oai_meta + + # file property -> url -> url + for _, meta_val in item_metadata.items(): + if isinstance(meta_val, dict) and meta_val.get("attribute_type") == "file": + for file_obj in meta_val.get("attribute_value_mlt", []): + url_dict = file_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + new_url = re.sub(re.escape(ofqdn), nfqdn, url_val) + file_obj["url"]["url"] = new_url + + # content -> url -> url + if "content" in source and isinstance(source["content"], list): + for content_obj in source["content"]: + url_dict = content_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + new_url = re.sub(re.escape(ofqdn), nfqdn, url_val) + content_obj["url"]["url"] = new_url + + # file -> URI -> value + if "file" in source and isinstance(source["file"], dict): + uri_list = source["file"].get("URI", []) + for uri_obj in uri_list: + uri_val = uri_obj.get("value") + if uri_val and ofqdn in uri_val: + new_uri = re.sub(re.escape(ofqdn), nfqdn, uri_val) + uri_obj["value"] = new_uri + + try: + es.index( + index=index_name, + doc_type="_doc", + id=doc_id, + body={**source, "_oai": oai, "_item_metadata": item_metadata}, + version=version, + version_type="external_gte" + ) + except Exception as e: + print(f"[ERROR] {index_name}: Failed to update document ID: {doc_id}") + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + count += 1 + if count % 1000 == 0: + elapsed = time.time() - start_time + print(f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)") + + scroll = es.scroll(scroll_id=scroll_id, scroll="2m") + scroll_id = scroll['_scroll_id'] + hits = scroll['hits']['hits'] + + total_elapsed = time.time() - start_time + + print(f"[INFO] {index_name}: {count} items replaced/updated (elapsed time: {total_elapsed:.2f} seconds)") + print(f"[INFO] count(success, error): ({count}, {failed_count})") + + except Exception as e: + print(f"[ERROR] {index_name}: Unexpected error") + print(f"Error: {e}") + traceback.print_exc() + sys.exit(1) + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage: python replace_es_fqdn.py ") + sys.exit(1) + ofqdn = sys.argv[1] + nfqdn = sys.argv[2] + update_es_records(ofqdn, nfqdn) From 890ee52252f68d27a9158bbb647682065624faa0 Mon Sep 17 00:00:00 2001 From: ivis-futagami Date: Fri, 27 Feb 2026 23:28:09 +0900 Subject: [PATCH 3/4] update change fqdn tool --- tools/replace_db_fqdn.py | 135 ++++++++++++++++----- tools/replace_es_fqdn.py | 250 ++++++++++++++++++++++++--------------- 2 files changed, 256 insertions(+), 129 deletions(-) diff --git a/tools/replace_db_fqdn.py b/tools/replace_db_fqdn.py index 9bcbd87694..420a6a978d 100644 --- a/tools/replace_db_fqdn.py +++ b/tools/replace_db_fqdn.py @@ -1,8 +1,10 @@ -import sys import os +import re +import sys import time import traceback -from sqlalchemy import create_engine, text + +from sqlalchemy import create_engine, inspect, text if len(sys.argv) < 3: print("Usage: python replace_db_fqdn.py ") @@ -12,6 +14,7 @@ nfqdn = sys.argv[2] ofqdn_underscore = ofqdn.replace(".", "_").replace("-", "_") nfqdn_underscore = nfqdn.replace(".", "_").replace("-", "_") +ofqdn_escape = re.escape(ofqdn) USERNAME = os.getenv("INVENIO_POSTGRESQL_DBUSER") PASSWORD = os.getenv("INVENIO_POSTGRESQL_DBPASS") @@ -22,6 +25,7 @@ engine = create_engine(DATABASE_URL) + def update_records( select_sql, update_sql, table_name="", column_name="", batch_size=1000 ): @@ -46,6 +50,9 @@ def update_records( - Commits the transaction only if all batches succeed; rolls back if any batch fails. - Logs progress and errors. """ + if not column_exists(engine, table_name, column_name): + print(f"[INFO] {table_name}.{column_name} does not exist. Skipped.") + return with engine.connect() as conn: trans = conn.begin() result = conn.execute(text(select_sql)) @@ -69,17 +76,30 @@ def update_records( elapsed = time.time() - total_start total += len(batch_ids) if total % 1000 == 0: - print(f"[INFO] {table_name}.{column_name}: {total} records processed (elapsed time: {elapsed:.2f} seconds)") + print( + f"[INFO] {table_name}.{column_name}: {total} records processed (elapsed time: {elapsed:.2f} seconds)" + ) batch_num += 1 total_elapsed = time.time() - total_start if success: trans.commit() - print(f"[INFO] {table_name}.{column_name}: {total} records replaced/updated, elapsed time: {total_elapsed:.2f} seconds") + print( + f"[INFO] {table_name}.{column_name}: {total} records replaced/updated, elapsed time: {total_elapsed:.2f} seconds" + ) else: trans.rollback() print(f"[ERROR] {table_name}.{column_name}: Rolled back due to error") +def column_exists(engine, table_name, column_name): + """ + Check if a column exists in a table. + """ + inspector = inspect(engine) + columns = [col["name"] for col in inspector.get_columns(table_name)] + return column_name in columns + + # files_location select_files_location = """ SELECT id from files_location; @@ -110,7 +130,7 @@ def update_records( json, '{{url,url}}', to_jsonb( - regexp_replace(json->'url'->>'url', '{ofqdn}', '{nfqdn}') + regexp_replace(json->'url'->>'url', '{ofqdn_escape}', '{nfqdn}') ) ) WHERE id IN :ids; @@ -122,7 +142,7 @@ def update_records( """ update_records_metadata = f""" UPDATE records_metadata - SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id IN :ids; """ @@ -133,7 +153,7 @@ def update_records( """ update_pidstore_pid = f""" UPDATE pidstore_pid - SET pid_value = regexp_replace(pid_value, '{ofqdn}', '{nfqdn}') + SET pid_value = regexp_replace(pid_value, '{ofqdn_escape}', '{nfqdn}') WHERE id IN :ids; """ @@ -143,7 +163,7 @@ def update_records( """ update_feedback_email_setting = f""" UPDATE feedback_email_setting - SET root_url = regexp_replace(root_url, '{ofqdn}', '{nfqdn}') + SET root_url = regexp_replace(root_url, '{ofqdn_escape}', '{nfqdn}') WHERE id IN :ids; """ @@ -153,7 +173,7 @@ def update_records( """ update_index = f""" UPDATE index - SET index_url = regexp_replace(index_url, '{ofqdn}', '{nfqdn}') + SET index_url = regexp_replace(index_url, '{ofqdn_escape}', '{nfqdn}') WHERE id in :ids; """ @@ -163,7 +183,7 @@ def update_records( """ update_changelist_indexes = f""" UPDATE changelist_indexes - SET url_path = regexp_replace(url_path, '{ofqdn}', '{nfqdn}') + SET url_path = regexp_replace(url_path, '{ofqdn_escape}', '{nfqdn}') WHERE id in :ids; """ @@ -173,7 +193,7 @@ def update_records( """ update_resourcelist_indexes = f""" UPDATE resourcelist_indexes - SET url_path = regexp_replace(url_path, '{ofqdn}', '{nfqdn}') + SET url_path = regexp_replace(url_path, '{ofqdn_escape}', '{nfqdn}') WHERE id in :ids; """ @@ -183,7 +203,7 @@ def update_records( """ update_widget_multi_lang_data = f""" UPDATE widget_multi_lang_data - SET description_data = regexp_replace(description_data::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET description_data = regexp_replace(description_data::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id IN :ids; """ @@ -193,7 +213,7 @@ def update_records( """ update_widget_design_page = f""" UPDATE widget_design_page - SET settings = regexp_replace(settings::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET settings = regexp_replace(settings::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE repository_id IN :ids; """ @@ -203,7 +223,7 @@ def update_records( """ update_widget_design_setting = f""" UPDATE widget_design_setting - SET settings = regexp_replace(settings::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET settings = regexp_replace(settings::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE repository_id in :ids;; """ @@ -213,7 +233,7 @@ def update_records( """ update_workflow_activity = f""" UPDATE workflow_activity - SET temp_data = regexp_replace(temp_data::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET temp_data = regexp_replace(temp_data::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id in :ids; """ @@ -223,7 +243,7 @@ def update_records( """ update_item_metadata = f""" UPDATE item_metadata - SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id IN :ids; """ @@ -233,7 +253,7 @@ def update_records( """ update_item_metadata_version = f""" UPDATE item_metadata_version - SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id IN :ids; """ @@ -243,25 +263,78 @@ def update_records( """ update_records_metadata_version = f""" UPDATE records_metadata_version - SET json = regexp_replace(json::text, '{ofqdn}', '{nfqdn}', 'g')::jsonb + SET json = regexp_replace(json::text, '{ofqdn_escape}', '{nfqdn}', 'g')::jsonb WHERE id IN :ids; """ if __name__ == "__main__": - update_records(select_files_location, update_files_location, "files_location", "uri") + update_records( + select_files_location, update_files_location, "files_location", "uri" + ) update_records(select_files_files_uri, update_files_files_uri, "files_files", "uri") - update_records(select_files_files_json, update_files_files_json, "files_files", "json") - update_records(select_pidstore_pid, update_pidstore_pid, "pidstore_pid", "pid_value") - update_records(select_feedback_email_setting, update_feedback_email_setting, "feedback_email_setting", "root_url") + update_records( + select_files_files_json, update_files_files_json, "files_files", "json" + ) + update_records( + select_pidstore_pid, update_pidstore_pid, "pidstore_pid", "pid_value" + ) + update_records( + select_feedback_email_setting, + update_feedback_email_setting, + "feedback_email_setting", + "root_url", + ) update_records(select_index, update_index, "index", "index_url") - update_records(select_changelist_indexes, update_changelist_indexes, "changelist_indexes", "url_path") - update_records(select_resourcelist_indexes, update_resourcelist_indexes, "resourcelist_indexes", "url_path") - update_records(select_widget_multi_lang_data, update_widget_multi_lang_data, "widget_multi_lang_data", "description_data") - update_records(select_widget_design_setting, update_widget_design_setting, "widget_design_setting", "description_data") - update_records(select_widget_design_page, update_widget_design_page, "widget_design_page", "description_page") - update_records(select_workflow_activity, update_workflow_activity, "workflow_activity", "temp_data") - update_records(select_records_metadata, update_records_metadata, "records_metadata", "json") + update_records( + select_changelist_indexes, + update_changelist_indexes, + "changelist_indexes", + "url_path", + ) + update_records( + select_resourcelist_indexes, + update_resourcelist_indexes, + "resourcelist_indexes", + "url_path", + ) + update_records( + select_widget_multi_lang_data, + update_widget_multi_lang_data, + "widget_multi_lang_data", + "description_data", + ) + update_records( + select_widget_design_setting, + update_widget_design_setting, + "widget_design_setting", + "settings", + ) + update_records( + select_widget_design_page, + update_widget_design_page, + "widget_design_page", + "settings", + ) + update_records( + select_workflow_activity, + update_workflow_activity, + "workflow_activity", + "temp_data", + ) + update_records( + select_records_metadata, update_records_metadata, "records_metadata", "json" + ) update_records(select_item_metadata, update_item_metadata, "item_metadata", "json") - update_records(select_records_metadata_version, update_records_metadata_version, "records_metadata_version", "json") - update_records(select_item_metadata_version, update_item_metadata_version, "item_metadata_version", "json") + update_records( + select_records_metadata_version, + update_records_metadata_version, + "records_metadata_version", + "json", + ) + update_records( + select_item_metadata_version, + update_item_metadata_version, + "item_metadata_version", + "json", + ) diff --git a/tools/replace_es_fqdn.py b/tools/replace_es_fqdn.py index 99afc7cdb4..d00f17010a 100644 --- a/tools/replace_es_fqdn.py +++ b/tools/replace_es_fqdn.py @@ -1,29 +1,78 @@ -import time -import re import os +import re import sys +import time import traceback + from elasticsearch import Elasticsearch -def update_es_records(ofqdn, nfqdn): + +def replace_fqdn_in_source(source, ofqdn, nfqdn): + """ + Replace ofqdn with nfqdn in specific fields of the source dict. + + Args: + source (dict): Target dictionary. + ofqdn (str): Old FQDN to replace. + nfqdn (str): New FQDN to use. + + Returns: + dict: Updated dictionary. + """ + # _oai -> id + oai = source.get("_oai", {}) + oai_id = oai.get("id") + if oai_id and ofqdn in oai_id: + oai["id"] = re.sub(re.escape(ofqdn), nfqdn, oai_id) + + # _item_metadata -> _oai -> id + item_metadata = source.get("_item_metadata", {}) + oai_meta = item_metadata.get("_oai", {}) + oai_meta_id = oai_meta.get("id") + if oai_meta_id and ofqdn in oai_meta_id: + oai_meta["id"] = re.sub(re.escape(ofqdn), nfqdn, oai_meta_id) + item_metadata["_oai"] = oai_meta + + # file property -> url -> url + for meta_val in item_metadata.values(): + if isinstance(meta_val, dict) and meta_val.get("attribute_type") == "file": + for file_obj in meta_val.get("attribute_value_mlt", []): + url_dict = file_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + file_obj["url"]["url"] = re.sub(re.escape(ofqdn), nfqdn, url_val) + + # content -> url -> url + if "content" in source and isinstance(source["content"], list): + for content_obj in source["content"]: + url_dict = content_obj.get("url", {}) + url_val = url_dict.get("url") + if url_val and ofqdn in url_val: + content_obj["url"]["url"] = re.sub(re.escape(ofqdn), nfqdn, url_val) + + # file -> URI -> value + if "file" in source and isinstance(source["file"], dict): + uri_list = source["file"].get("URI", []) + for uri_obj in uri_list: + uri_val = uri_obj.get("value") + if uri_val and ofqdn in uri_val: + uri_obj["value"] = re.sub(re.escape(ofqdn), nfqdn, uri_val) + + source["_oai"] = oai + source["_item_metadata"] = item_metadata + return source + + +def update_es_records(ofqdn, nfqdn, id_file_path=None): """ Update Elasticsearch records. - Update Elasticsearch records, replacing all occurrences of `ofqdn` with `nfqdn` - in specific fields such as OAI IDs and URLs, for all documents in the index. - Connects to Elasticsearch using environment variables, iterates all documents, - replaces `ofqdn` in relevant fields, updates documents, and logs progress/errors. + If id_file_path is given, only update documents whose IDs are listed in the file. + Otherwise, update all documents in the index. Args: ofqdn (str): The original fully qualified domain name to be replaced. nfqdn (str): The new fully qualified domain name to use as a replacement. - - Behavior: - - Connects to Elasticsearch using environment variables for host and index prefix. - - Iterates through all documents in the target index. - - For each document, replaces `ofqdn` with `nfqdn` in relevant fields. - - Updates each document in Elasticsearch, preserving versioning. - - Logs progress and errors. - - Prints a summary of the operation upon completion. + id_file_path (str, optional): Path to a file containing document IDs (one per line). """ HOST = os.getenv("INVENIO_ELASTICSEARCH_HOST") INDEX_PREFIX = os.getenv("SEARCH_INDEX_PREFIX") @@ -31,15 +80,21 @@ def update_es_records(ofqdn, nfqdn): index_name = f"{INDEX_PREFIX}-weko-item-v1.0.0" es = Elasticsearch(f"http://{HOST}:{PORT}") - query = { - "query": { - "match_all": {} - } - } + if id_file_path: + with open(id_file_path, "r") as f: + id_list = [line.strip() for line in f if line.strip()] + print(f"[INFO] {index_name}: {len(id_list)} IDs loaded from {id_file_path}") + else: + id_list = None + + query = {"query": {"match_all": {}}} try: - result_count = es.count(index=index_name, body=query) - print(f"[INFO] {index_name}: Total items: {result_count['count']}") + if id_list is not None: + result_count = len(id_list) + else: + result_count = es.count(index=index_name, body=query)["count"] + print(f"[INFO] {index_name}: Total items: {result_count}") except Exception as e: print(f"[ERROR] {index_name}: Failed to get count") print(f"Error: {e}") @@ -47,89 +102,38 @@ def update_es_records(ofqdn, nfqdn): sys.exit(1) start_time = time.time() - - try: - scroll = es.search( - index=index_name, - body=query, - params={"version": "true"}, - scroll="2m", - size=1000 - ) - except Exception as e: - print(f"[ERROR] {index_name}: Failed to search") - print(f"Error: {e}") - traceback.print_exc() - sys.exit(1) - - scroll_id = scroll['_scroll_id'] - hits = scroll['hits']['hits'] - failed_count = 0 count = 0 try: - while hits: - for hit in hits: - doc_id = hit["_id"] + if id_list is not None: + for doc_id in id_list: + try: + hit = es.get(index=index_name, doc_type="_doc", id=doc_id) + except Exception as e: + print(f"[ERROR] {index_name}: Failed to get document ID: {doc_id}") + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + source = hit["_source"] version = hit.get("_version", 1) - - # _oai -> id - oai = source.get("_oai", {}) - oai_id = oai.get("id") - if oai_id and ofqdn in oai_id: - new_oai_id = re.sub(re.escape(ofqdn), nfqdn, oai_id) - oai["id"] = new_oai_id - - # _item_metadata -> oid -> id - item_metadata = source.get("_item_metadata", {}) - oai_meta = item_metadata.get("_oai", {}) - oai_meta_id = oai_meta.get("id") - if oai_meta_id and ofqdn in oai_meta_id: - new_oai_meta_id = re.sub(re.escape(ofqdn), nfqdn, oai_meta_id) - oai_meta["id"] = new_oai_meta_id - item_metadata["_oai"] = oai_meta - - # file property -> url -> url - for _, meta_val in item_metadata.items(): - if isinstance(meta_val, dict) and meta_val.get("attribute_type") == "file": - for file_obj in meta_val.get("attribute_value_mlt", []): - url_dict = file_obj.get("url", {}) - url_val = url_dict.get("url") - if url_val and ofqdn in url_val: - new_url = re.sub(re.escape(ofqdn), nfqdn, url_val) - file_obj["url"]["url"] = new_url - - # content -> url -> url - if "content" in source and isinstance(source["content"], list): - for content_obj in source["content"]: - url_dict = content_obj.get("url", {}) - url_val = url_dict.get("url") - if url_val and ofqdn in url_val: - new_url = re.sub(re.escape(ofqdn), nfqdn, url_val) - content_obj["url"]["url"] = new_url - - # file -> URI -> value - if "file" in source and isinstance(source["file"], dict): - uri_list = source["file"].get("URI", []) - for uri_obj in uri_list: - uri_val = uri_obj.get("value") - if uri_val and ofqdn in uri_val: - new_uri = re.sub(re.escape(ofqdn), nfqdn, uri_val) - uri_obj["value"] = new_uri + new_source = replace_fqdn_in_source(source, ofqdn, nfqdn) try: es.index( index=index_name, doc_type="_doc", id=doc_id, - body={**source, "_oai": oai, "_item_metadata": item_metadata}, + body=new_source, version=version, - version_type="external_gte" + version_type="external_gte", ) except Exception as e: - print(f"[ERROR] {index_name}: Failed to update document ID: {doc_id}") + print( + f"[ERROR] {index_name}: Failed to update document ID: {doc_id}" + ) print(f"Error: {e}") traceback.print_exc() failed_count += 1 @@ -137,15 +141,63 @@ def update_es_records(ofqdn, nfqdn): count += 1 if count % 1000 == 0: elapsed = time.time() - start_time - print(f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)") - - scroll = es.scroll(scroll_id=scroll_id, scroll="2m") - scroll_id = scroll['_scroll_id'] - hits = scroll['hits']['hits'] + print( + f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)" + ) + else: + scroll = es.search( + index=index_name, + body=query, + params={"version": "true"}, + scroll="2m", + size=1000, + ) + scroll_id = scroll["_scroll_id"] + hits = scroll["hits"]["hits"] + + while hits: + for hit in hits: + doc_id = hit["_id"] + source = hit["_source"] + version = hit.get("_version", 1) + new_source = replace_fqdn_in_source(source, ofqdn, nfqdn) + + try: + if (failed_count + count) % 10 == 5: + raise Exception + es.index( + index=index_name, + doc_type="_doc", + id=doc_id, + body=new_source, + version=version, + version_type="external_gte", + ) + except Exception as e: + print( + f"[ERROR] {index_name}: Failed to update document ID: {doc_id}" + ) + print(f"Error: {e}") + traceback.print_exc() + failed_count += 1 + continue + count += 1 + + if count % 1000 == 0: + elapsed = time.time() - start_time + print( + f"[INFO] {index_name}: {count} items processed (elapsed time: {elapsed:.2f} seconds)" + ) + + scroll = es.scroll(scroll_id=scroll_id, scroll="2m") + scroll_id = scroll["_scroll_id"] + hits = scroll["hits"]["hits"] total_elapsed = time.time() - start_time - print(f"[INFO] {index_name}: {count} items replaced/updated (elapsed time: {total_elapsed:.2f} seconds)") + print( + f"[INFO] {index_name}: {count} items replaced/updated elapsed time: {total_elapsed:.2f} seconds" + ) print(f"[INFO] count(success, error): ({count}, {failed_count})") except Exception as e: @@ -154,10 +206,12 @@ def update_es_records(ofqdn, nfqdn): traceback.print_exc() sys.exit(1) + if __name__ == "__main__": if len(sys.argv) < 3: - print("Usage: python replace_es_fqdn.py ") + print("Usage: python replace_es_fqdn.py [id_file_path]") sys.exit(1) ofqdn = sys.argv[1] nfqdn = sys.argv[2] - update_es_records(ofqdn, nfqdn) + id_file_path = sys.argv[3] if len(sys.argv) > 3 else None + update_es_records(ofqdn, nfqdn, id_file_path) From b14b9caeaa4c57f6977ff3db23b770e575525fc8 Mon Sep 17 00:00:00 2001 From: ivis-futagami Date: Mon, 2 Mar 2026 02:27:39 +0900 Subject: [PATCH 4/4] fix es fqdn change tool --- tools/replace_es_fqdn.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/replace_es_fqdn.py b/tools/replace_es_fqdn.py index d00f17010a..571f11079c 100644 --- a/tools/replace_es_fqdn.py +++ b/tools/replace_es_fqdn.py @@ -163,8 +163,6 @@ def update_es_records(ofqdn, nfqdn, id_file_path=None): new_source = replace_fqdn_in_source(source, ofqdn, nfqdn) try: - if (failed_count + count) % 10 == 5: - raise Exception es.index( index=index_name, doc_type="_doc",