diff --git a/spp_programs/models/managers/deduplication_manager.py b/spp_programs/models/managers/deduplication_manager.py index cc9cb561..1581ab06 100644 --- a/spp_programs/models/managers/deduplication_manager.py +++ b/spp_programs/models/managers/deduplication_manager.py @@ -131,12 +131,15 @@ def _check_duplicate_by_individual_ids(self, beneficiaries): group_of_duplicates[group_membership.individual.id].append(group_membership.group.id) _logger.debug("Found %s duplicate group(s)", len(group_of_duplicates)) - for _individual, group_ids in group_of_duplicates.items(): + for individual_id, group_ids in group_of_duplicates.items(): group_ids_set = set(group_ids) duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids) duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id") - self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate individuals") + individual_rec = self.env["res.partner"].browse(individual_id) + group_names = duplicate_beneficiaries.mapped("partner_id.name") + reason = f"Shared member: {individual_rec.name} found in {len(group_ids)} groups ({', '.join(group_names)})" + self._record_duplicate(self, duplicate_beneficiariy_ids, reason) duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled") if len(duplicated_enrolled) == 1: @@ -158,7 +161,11 @@ class IDDocumentDeduplication(models.Model): _inherit = ["spp.base.deduplication.manager", "spp.manager.source.mixin"] _description = "ID Deduplication Manager" - supported_id_document_type_ids = fields.Many2many("spp.id.type", string="Supported ID Document Types") + supported_id_document_type_ids = fields.Many2many( + "spp.vocabulary.code", + string="Supported ID Document Types", + domain=[("vocabulary_id.namespace_uri", "=", "urn:openspp:vocab:id-type")], + ) def deduplicate_beneficiaries(self, states): for rec in self: @@ -233,7 +240,7 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): if x.id_type_id in self.supported_id_document_type_ids and ( (not x.expiry_date) or x.expiry_date > date.today() ): - id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value} + id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.display + "-" + x.value} individual_id_docs.update(id_doc_id_with_id_type_and_value) # Check ID Docs of each group @@ -242,7 +249,7 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): if x.id_type_id in self.supported_id_document_type_ids and ( (not x.expiry_date) or x.expiry_date > date.today() ): - id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value} + id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.display + "-" + x.value} individual_id_docs.update(id_doc_id_with_id_type_and_value) _logger.debug("Found %s ID document(s) to check", len(individual_id_docs)) @@ -264,6 +271,11 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): ) _logger.debug("Found %s group(s) with duplicates", len(group_with_duplicates)) + # Build mapping: individual_id -> list of duplicate ID doc descriptions + individual_dup_docs = {} + for doc in duplicated_doc_ids: + individual_dup_docs.setdefault(doc.partner_id.id, []).append(f"{doc.id_type_id.display}: {doc.value}") + group_of_duplicates = {} for group_membership in group_with_duplicates: _logger.debug("Processing group membership duplicate") @@ -272,12 +284,15 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): group_of_duplicates[group_membership.individual.id].append(group_membership.group.id) _logger.debug("Found %s duplicate group(s)", len(group_of_duplicates)) - for _individual, group_ids in group_of_duplicates.items(): + for individual_id, group_ids in group_of_duplicates.items(): group_ids_set = set(group_ids) duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids) duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id") - self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate ID Documents") + individual_rec = self.env["res.partner"].browse(individual_id) + doc_info = ", ".join(individual_dup_docs.get(individual_id, [])) + reason = f"Duplicate ID document ({doc_info}) on member: {individual_rec.name}" + self._record_duplicate(self, duplicate_beneficiariy_ids, reason) duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled") if len(duplicated_enrolled) == 1: @@ -295,55 +310,65 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): def _check_duplicate_by_individual(self, beneficiaries): """ - This method is used to check if there are any duplicates - among the individuals id docs. - :param beneficiary_ids: The beneficiaries. - :return: + Check for duplicate ID documents among individual beneficiaries. + Groups duplicates by shared ID value and applies keep-one-enrolled logic. """ _logger.debug("-" * 100) individual_ids = beneficiaries.mapped("partner_id.id") individuals = self.env["res.partner"].search([("id", "in", individual_ids)]) _logger.debug("Checking ID Document Duplicates for %s individual(s)", len(individuals)) + # Map: reg_id.id -> "IDType-Value", also track reg_id -> partner_id individual_id_docs = {} - # Check ID Documents of each individual + reg_id_to_partner = {} for i in individuals: for x in i.reg_ids: if x.id_type_id in self.supported_id_document_type_ids and ( (not x.expiry_date) or x.expiry_date > date.today() ): - id_doc_id_with_id_type_and_value = {x.id: x.id_type_id.name + "-" + x.value} - individual_id_docs.update(id_doc_id_with_id_type_and_value) + doc_key = x.id_type_id.display + "-" + x.value + individual_id_docs[x.id] = doc_key + reg_id_to_partner[x.id] = i.id _logger.debug("Found %s ID document(s) to check", len(individual_id_docs)) rev_dict = {} for key, value in individual_id_docs.items(): rev_dict.setdefault(value, set()).add(key) - duplicate_ids = filter(lambda x: len(x) > 1, rev_dict.values()) - duplicate_ids = list(duplicate_ids) - duplicate_ids = list(itertools.chain.from_iterable(duplicate_ids)) - _logger.debug("Found %s duplicate ID document(s)", len(duplicate_ids)) + all_duplicated_memberships = self.env["spp.program.membership"] - duplicated_doc_ids = self.env["spp.registry.id"].search([("id", "in", duplicate_ids)]) - individual_ids = [x.partner_id.id for x in duplicated_doc_ids] - individual_ids = list(dict.fromkeys(individual_ids)) - _logger.debug("Found %s individual(s) with duplicated ID documents", len(individual_ids)) - individual_program_membership = self.env["spp.program.membership"].search( - [ - ("partner_id", "in", individual_ids), - ("program_id", "=", self.program_id.id), - ] - ) + # Process each group of duplicates sharing the same ID value + for doc_key, reg_ids in rev_dict.items(): + if len(reg_ids) <= 1: + continue - for duplicates in individual_program_membership: - duplicate_individuals = [duplicates.id] - self._record_duplicate(self, duplicate_individuals, "Duplicate ID Documents") + partner_ids = list({reg_id_to_partner[rid] for rid in reg_ids}) + dup_memberships = self.env["spp.program.membership"].search( + [ + ("partner_id", "in", partner_ids), + ("program_id", "=", self.program_id.id), + ] + ) + if not dup_memberships: + continue - if duplicates.state == "enrolled": - duplicates.write({"state": "duplicated"}) + names = dup_memberships.mapped("partner_id.name") + reason = f"Duplicate ID document ({doc_key}) shared by: {', '.join(names)}" + self._record_duplicate(self, dup_memberships.ids, reason) - return individual_program_membership + # Keep-one-enrolled logic + duplicated_enrolled = dup_memberships.filtered(lambda rec: rec.state == "enrolled") + if len(duplicated_enrolled) == 1: + to_mark = dup_memberships.filtered(lambda rec: rec.state != "enrolled") + else: + to_mark = dup_memberships + to_mark.filtered(lambda rec: rec.state not in ["exited", "not_eligible", "duplicated"]).write( + {"state": "duplicated"} + ) + + all_duplicated_memberships |= dup_memberships + + return all_duplicated_memberships class PhoneNumberDeduplication(models.Model): @@ -462,6 +487,10 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): ) _logger.debug("Found %s group(s) with duplicates", len(group_with_duplicates)) + # Build mapping: individual_id -> list of duplicate phone numbers + individual_dup_phones = {} + for phone_rec in duplicate_individuals_ids: + individual_dup_phones.setdefault(phone_rec.partner_id.id, []).append(phone_rec.phone_no) group_of_duplicates = {} for group_membership in group_with_duplicates: @@ -471,12 +500,15 @@ def _check_duplicate_by_group_with_individual(self, beneficiaries): group_of_duplicates[group_membership.individual.id].append(group_membership.group.id) _logger.debug("Found %s duplicate group(s)", len(group_of_duplicates)) - for _individual, group_ids in group_of_duplicates.items(): + for individual_id, group_ids in group_of_duplicates.items(): group_ids_set = set(group_ids) duplicate_beneficiaries = beneficiaries.filtered(lambda rec, gids=group_ids_set: rec.partner_id.id in gids) duplicate_beneficiariy_ids = duplicate_beneficiaries.mapped("id") - self._record_duplicate(self, duplicate_beneficiariy_ids, "Duplicate Phone Numbers") + individual_rec = self.env["res.partner"].browse(individual_id) + phone_info = ", ".join(individual_dup_phones.get(individual_id, [])) + reason = f"Duplicate phone number ({phone_info}) on member: {individual_rec.name}" + self._record_duplicate(self, duplicate_beneficiariy_ids, reason) duplicated_enrolled = duplicate_beneficiaries.filtered(lambda rec: rec.state == "enrolled") if len(duplicated_enrolled) == 1: @@ -524,21 +556,44 @@ def _check_duplicate_by_individual(self, beneficiaries): individual_ids = [x.partner_id.id for x in duplicated_phone_ids] individual_ids = list(dict.fromkeys(individual_ids)) _logger.debug("Individual IDS with Duplicated Phone Numbers: %s", individual_ids) - individual_program_membership = self.env["spp.program.membership"].search( - [ - ("partner_id", "in", individual_ids), - ("program_id", "=", self.program_id.id), - ] - ) - for duplicates in individual_program_membership: - duplicate_individuals = [duplicates.id] - self._record_duplicate(self, duplicate_individuals, "Duplicate Phone Numbers") + all_duplicated_memberships = self.env["spp.program.membership"] + + # Build reverse map: phone_sanitized -> list of partner_ids + phone_to_partners = {} + for phone_rec in duplicated_phone_ids: + phone_to_partners.setdefault(phone_rec.phone_sanitized, set()).add(phone_rec.partner_id.id) + + for phone_val, partner_id_set in phone_to_partners.items(): + if len(partner_id_set) <= 1: + continue + + dup_memberships = self.env["spp.program.membership"].search( + [ + ("partner_id", "in", list(partner_id_set)), + ("program_id", "=", self.program_id.id), + ] + ) + if not dup_memberships: + continue + + names = dup_memberships.mapped("partner_id.name") + reason = f"Duplicate phone number ({phone_val}) shared by: {', '.join(names)}" + self._record_duplicate(self, dup_memberships.ids, reason) + + # Keep-one-enrolled logic + duplicated_enrolled = dup_memberships.filtered(lambda rec: rec.state == "enrolled") + if len(duplicated_enrolled) == 1: + to_mark = dup_memberships.filtered(lambda rec: rec.state != "enrolled") + else: + to_mark = dup_memberships + to_mark.filtered(lambda rec: rec.state not in ["exited", "not_eligible", "duplicated"]).write( + {"state": "duplicated"} + ) - if duplicates.state == "enrolled": - duplicates.write({"state": "duplicated"}) + all_duplicated_memberships |= dup_memberships - return individual_program_membership + return all_duplicated_memberships class IDPhoneEligibilityManager(models.Model): diff --git a/spp_programs/models/managers/program_manager.py b/spp_programs/models/managers/program_manager.py index 125331c2..eccb41a2 100644 --- a/spp_programs/models/managers/program_manager.py +++ b/spp_programs/models/managers/program_manager.py @@ -214,8 +214,10 @@ def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=Fa for el in eligibility_managers: members = el.enroll_eligible_registrants(members) # enroll the one not already enrolled: + # Exclude members that are duplicated or exited — those states + # should only be changed through their own workflows. _logger.debug("members filtered: %s", members) - not_enrolled = members.filtered(lambda m: m.state != "enrolled") + not_enrolled = members.filtered(lambda m: m.state not in ("enrolled", "duplicated", "exited")) _logger.debug("not_enrolled: %s", not_enrolled) not_enrolled.write( { @@ -226,7 +228,7 @@ def _enroll_eligible_registrants(self, states, offset=0, limit=None, do_count=Fa # dis-enroll the one not eligible anymore: enrolled_members_ids = members.ids members_to_remove = member_before.filtered( - lambda m: m.state != "not_eligible" and m.id not in enrolled_members_ids + lambda m: m.state not in ("not_eligible", "duplicated", "exited") and m.id not in enrolled_members_ids ) # _logger.debug("members_to_remove: %s", members_to_remove) members_to_remove.write( diff --git a/spp_programs/models/program_membership.py b/spp_programs/models/program_membership.py index ee4132ae..c5ee399c 100644 --- a/spp_programs/models/program_membership.py +++ b/spp_programs/models/program_membership.py @@ -52,6 +52,23 @@ class SPPProgramMembership(models.Model): registrant_id = fields.Integer(string="Registrant ID", related="partner_id.id") + duplicate_reason = fields.Char( + string="Duplicate Reason", + compute="_compute_duplicate_reason", + ) + + def _compute_duplicate_reason(self): + for rec in self: + if rec.state == "duplicated": + dup_records = self.env["spp.program.membership.duplicate"].search( + [("beneficiary_ids", "in", rec.id), ("state", "=", "duplicate")], + order="id desc", + limit=1, + ) + rec.duplicate_reason = dup_records.reason if dup_records else False + else: + rec.duplicate_reason = False + @api.constrains("partner_id", "program_id") def _check_unique_partner_per_program(self): # Prefetch partner_id and program_id to avoid N+1 queries in loop @@ -212,7 +229,13 @@ def enroll_eligible_registrants(self): member = em.enroll_eligible_registrants(member) if len(member) > 0: - if self.state != "enrolled": + if self.state in ("duplicated", "exited"): + message = _( + "Cannot enroll: beneficiary is currently %s.", + dict(self._fields["state"].selection).get(self.state, self.state), + ) + kind = "warning" + elif self.state != "enrolled": self.write( { "state": "enrolled", @@ -221,19 +244,22 @@ def enroll_eligible_registrants(self): ) message = _("%s Beneficiaries enrolled.", len(member)) kind = "success" - return { - "type": "ir.actions.client", - "tag": "display_notification", - "params": { - "title": _("Enrollment"), - "message": message, - "sticky": False, - "type": kind, - "next": { - "type": "ir.actions.act_window_close", - }, + else: + message = _("Beneficiary is already enrolled.") + kind = "info" + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": _("Enrollment"), + "message": message, + "sticky": False, + "type": kind, + "next": { + "type": "ir.actions.act_window_close", }, - } + }, + } else: self.state = "not_eligible" diff --git a/spp_programs/models/programs.py b/spp_programs/models/programs.py index a1ad8016..e086c6d2 100644 --- a/spp_programs/models/programs.py +++ b/spp_programs/models/programs.py @@ -380,17 +380,38 @@ def deduplicate_beneficiaries(self): message = None kind = "success" if len(deduplication_managers): + # Count already-flagged duplicates before running + already_duplicated = self.env["spp.program.membership"].search_count( + [("program_id", "=", rec.id), ("state", "=", "duplicated")] + ) + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] duplicates = 0 for el in deduplication_managers: duplicates += el.deduplicate_beneficiaries(states) - if duplicates > 0: - message = _("%s Instances of Beneficiaries duplicate.", duplicates) + # Count total duplicates after running + total_duplicated = self.env["spp.program.membership"].search_count( + [("program_id", "=", rec.id), ("state", "=", "duplicated")] + ) + new_duplicates = total_duplicated - already_duplicated + + if total_duplicated > 0: + parts = [] + if new_duplicates > 0: + parts.append(_("%(new)s new duplicate(s) found", new=new_duplicates)) + if already_duplicated > 0: + parts.append(_("%(existing)s already flagged", existing=already_duplicated)) + message = ", ".join(parts) + "." + kind = "warning" + elif duplicates > 0: + message = _( + "Found %(count)s duplicate beneficiaries.", + count=duplicates, + ) kind = "warning" else: message = _("No duplicates found.") - kind = "success" else: raise UserError(_("No Deduplication Manager defined.")) @@ -399,7 +420,6 @@ def deduplicate_beneficiaries(self): "type": "ir.actions.client", "tag": "display_notification", "params": { - "title": _("Deduplication"), "message": message, "sticky": False, "type": kind, @@ -701,7 +721,7 @@ def check_managers_limit(self): combined_message = ", ".join(error_messages) raise UserError( f"Only one manager can be configured under {combined_message}." - f"Please delete any new manager(s) before saving your changes." # noqa: B950 + f"Please delete any new manager(s) before saving your changes." ) def _pre_enrollment_hook(self, partner): diff --git a/spp_programs/tests/__init__.py b/spp_programs/tests/__init__.py index c56ebc92..e3e7f202 100644 --- a/spp_programs/tests/__init__.py +++ b/spp_programs/tests/__init__.py @@ -1,17 +1,20 @@ +from . import common_compliance +from . import test_compliance_cel from . import test_create_program_wiz +from . import test_create_program_wizard_cash +from . import test_create_program_wizard_cel +from . import test_cycle +from . import test_deduplication +from . import test_eligibility_cel +from . import test_eligibility_cel_integration from . import test_enrollment_wizard -from . import test_entitlement_report_wiz from . import test_entitlement +from . import test_entitlement_manager_cash +from . import test_entitlement_report_wiz +from . import test_program_enrollment +from . import test_program_membership from . import test_programs from . import test_registrant -from . import test_stock_rule -from . import test_cycle -from . import common_compliance from . import test_spp_cycle_compliance from . import test_spp_program_create_wizard_compliance -from . import test_entitlement_manager_cash -from . import test_create_program_wizard_cash -from . import test_eligibility_cel -from . import test_eligibility_cel_integration -from . import test_compliance_cel -from . import test_create_program_wizard_cel +from . import test_stock_rule diff --git a/spp_programs/tests/test_deduplication.py b/spp_programs/tests/test_deduplication.py new file mode 100644 index 00000000..f7242ebe --- /dev/null +++ b/spp_programs/tests/test_deduplication.py @@ -0,0 +1,477 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +import uuid + +from odoo.tests import TransactionCase + + +class TestDeduplicationCommon(TransactionCase): + """Common setup for deduplication tests.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.program = cls.env["spp.program"].create( + {"name": f"Dedup Program {uuid.uuid4().hex[:8]}", "target_type": "group"} + ) + + # Get existing ID type vocabulary and use existing codes, or create test ones + cls.id_vocab = cls.env["spp.vocabulary"].search([("namespace_uri", "=", "urn:openspp:vocab:id-type")], limit=1) + if cls.id_vocab: + # Use existing codes from the system vocabulary + existing_codes = cls.env["spp.vocabulary.code"].search([("vocabulary_id", "=", cls.id_vocab.id)], limit=2) + if len(existing_codes) >= 2: + cls.id_type_national = existing_codes[0] + cls.id_type_passport = existing_codes[1] + else: + # Create local extensions + cls.id_type_national = cls.env["spp.vocabulary.code"].create( + { + "vocabulary_id": cls.id_vocab.id, + "code": f"NID-{uuid.uuid4().hex[:6]}", + "display": "National ID", + "is_local": True, + } + ) + cls.id_type_passport = cls.env["spp.vocabulary.code"].create( + { + "vocabulary_id": cls.id_vocab.id, + "code": f"PP-{uuid.uuid4().hex[:6]}", + "display": "Passport", + "is_local": True, + } + ) + else: + cls.id_vocab = cls.env["spp.vocabulary"].create( + { + "name": "ID Types", + "namespace_uri": "urn:openspp:vocab:id-type", + } + ) + cls.id_type_national = cls.env["spp.vocabulary.code"].create( + { + "vocabulary_id": cls.id_vocab.id, + "code": "NID", + "display": "National ID", + } + ) + cls.id_type_passport = cls.env["spp.vocabulary.code"].create( + { + "vocabulary_id": cls.id_vocab.id, + "code": "PASSPORT", + "display": "Passport", + } + ) + + # Create default deduplication manager + cls.dedup_default = cls.env["spp.deduplication.manager.default"].create( + {"name": "Default Dedup", "program_id": cls.program.id} + ) + cls.dedup_manager = cls.env["spp.deduplication.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{cls.dedup_default._name},{cls.dedup_default.id}", + } + ) + cls.program.write({"deduplication_manager_ids": [(4, cls.dedup_manager.id)]}) + + # Create ID deduplication manager + cls.id_dedup = cls.env["spp.deduplication.manager.id_dedup"].create( + { + "name": "ID Dedup", + "program_id": cls.program.id, + "supported_id_document_type_ids": [(6, 0, [cls.id_type_national.id, cls.id_type_passport.id])], + } + ) + + # Create phone deduplication manager + cls.phone_dedup = cls.env["spp.deduplication.manager.phone_number"].create( + {"name": "Phone Dedup", "program_id": cls.program.id} + ) + + def _create_group(self, name, individuals=None): + """Helper to create a group with individuals.""" + group = self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": True}) + if individuals: + for individual in individuals: + self.env["spp.group.membership"].create({"group": group.id, "individual": individual.id}) + return group + + def _create_individual(self, name, reg_ids=None, phone_numbers=None): + """Helper to create an individual with optional IDs and phone numbers.""" + individual = self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": False}) + if reg_ids: + for id_type, value in reg_ids: + self.env["spp.registry.id"].create( + { + "partner_id": individual.id, + "id_type_id": id_type.id, + "value": value, + } + ) + if phone_numbers: + for phone in phone_numbers: + self.env["spp.phone.number"].create({"partner_id": individual.id, "phone_no": phone}) + return individual + + def _enroll_in_program(self, partner, state="draft"): + """Helper to create a program membership.""" + return self.env["spp.program.membership"].create( + { + "partner_id": partner.id, + "program_id": self.program.id, + "state": state, + } + ) + + +class TestDefaultDeduplication(TestDeduplicationCommon): + """Tests for default deduplication (shared individuals across groups).""" + + def test_no_duplicates_found(self): + """When groups don't share members, no duplicates are found.""" + ind1 = self._create_individual("Individual 1") + ind2 = self._create_individual("Individual 2") + group1 = self._create_group("Group A", [ind1]) + group2 = self._create_group("Group B", [ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + count = self.dedup_default.deduplicate_beneficiaries(states) + self.assertEqual(count, 0) + + def test_shared_individual_detected(self): + """When two groups share a member, duplicates are detected.""" + shared = self._create_individual("Shared Individual") + ind1 = self._create_individual("Unique 1") + ind2 = self._create_individual("Unique 2") + group1 = self._create_group("Group A", [shared, ind1]) + group2 = self._create_group("Group B", [shared, ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + count = self.dedup_default.deduplicate_beneficiaries(states) + self.assertGreater(count, 0) + + def test_detailed_reason_includes_member_name(self): + """Duplicate reason includes the shared member's name.""" + shared = self._create_individual("John Shared") + group1 = self._create_group("Group X", [shared]) + group2 = self._create_group("Group Y", [shared]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.dedup_default.deduplicate_beneficiaries(states) + + dup_record = self.env["spp.program.membership.duplicate"].search( + [("deduplication_manager_id", "=", self.dedup_default.id)], limit=1 + ) + self.assertIn("John Shared", dup_record.reason) + self.assertIn("Shared member", dup_record.reason) + + def test_keep_one_enrolled(self): + """When one enrolled beneficiary is duplicated, it stays enrolled.""" + shared = self._create_individual("Shared") + group1 = self._create_group("Group Enrolled", [shared]) + group2 = self._create_group("Group Draft", [shared]) + m1 = self._enroll_in_program(group1, "enrolled") + m2 = self._enroll_in_program(group2, "draft") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.dedup_default.deduplicate_beneficiaries(states) + + m1.invalidate_recordset() + m2.invalidate_recordset() + self.assertEqual(m1.state, "enrolled") + self.assertEqual(m2.state, "duplicated") + + def test_exited_not_overwritten(self): + """Exited state is preserved during deduplication.""" + shared = self._create_individual("Shared") + group1 = self._create_group("Group A", [shared]) + group2 = self._create_group("Group B", [shared]) + self._enroll_in_program(group1, "enrolled") + m2 = self._enroll_in_program(group2, "exited") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.dedup_default.deduplicate_beneficiaries(states) + + m2.invalidate_recordset() + self.assertEqual(m2.state, "exited") + + def test_individual_target_type_skips_dedup(self): + """Default dedup only works for group target type.""" + self.program.target_type = "individual" + ind = self._create_individual("Ind") + self._enroll_in_program(ind, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + count = self.dedup_default.deduplicate_beneficiaries(states) + self.assertEqual(count, 0) + + +class TestIDDocumentDeduplication(TestDeduplicationCommon): + """Tests for ID document deduplication.""" + + def test_group_duplicate_id_docs(self): + """Detect duplicate ID documents across groups.""" + ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "12345")]) + ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "12345")]) + group1 = self._create_group("Group A", [ind1]) + group2 = self._create_group("Group B", [ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + self.program.target_type = "group" + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + count = self.id_dedup.deduplicate_beneficiaries(states) + self.assertGreater(count, 0) + + def test_group_duplicate_reason_includes_doc_info(self): + """Duplicate reason includes the document type and value.""" + ind1 = self._create_individual("Alice", reg_ids=[(self.id_type_national, "NID-999")]) + ind2 = self._create_individual("Bob", reg_ids=[(self.id_type_national, "NID-999")]) + group1 = self._create_group("Group A", [ind1]) + group2 = self._create_group("Group B", [ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + self.program.target_type = "group" + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.id_dedup.deduplicate_beneficiaries(states) + + dup_record = self.env["spp.program.membership.duplicate"].search( + [("deduplication_manager_id", "=", self.id_dedup.id)], limit=1 + ) + self.assertIn("National ID", dup_record.reason) + + def test_individual_duplicate_id_docs(self): + """Detect duplicate ID documents for individual target type.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "SAME-ID")]) + ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "SAME-ID")]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.id_dedup.deduplicate_beneficiaries(states) + self.assertGreater(result, 0) + + def test_individual_duplicate_reason_includes_names(self): + """Individual duplicate reason shows shared names.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Alice Ind", reg_ids=[(self.id_type_passport, "PP-001")]) + ind2 = self._create_individual("Bob Ind", reg_ids=[(self.id_type_passport, "PP-001")]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.id_dedup.deduplicate_beneficiaries(states) + + dup_record = self.env["spp.program.membership.duplicate"].search( + [("deduplication_manager_id", "=", self.id_dedup.id)], limit=1 + ) + self.assertIn("Passport", dup_record.reason) + self.assertIn("shared by", dup_record.reason) + + def test_individual_keep_one_enrolled(self): + """Keep-one-enrolled logic for individual dedup.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Enrolled One", reg_ids=[(self.id_type_national, "DUP-100")]) + ind2 = self._create_individual("Draft One", reg_ids=[(self.id_type_national, "DUP-100")]) + m1 = self._enroll_in_program(ind1, "enrolled") + m2 = self._enroll_in_program(ind2, "draft") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.id_dedup.deduplicate_beneficiaries(states) + + m1.invalidate_recordset() + m2.invalidate_recordset() + # One should remain enrolled or both marked if both enrolled + self.assertIn(m2.state, ["duplicated", "draft"]) + + def test_no_duplicate_when_ids_differ(self): + """No duplicates when ID values are different.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Person A", reg_ids=[(self.id_type_national, "UNIQUE-1")]) + ind2 = self._create_individual("Person B", reg_ids=[(self.id_type_national, "UNIQUE-2")]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.id_dedup.deduplicate_beneficiaries(states) + self.assertEqual(result, 0) + + def test_expired_ids_ignored(self): + """Expired ID documents are not considered for deduplication.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Person A") + ind2 = self._create_individual("Person B") + # Create expired IDs with same value + self.env["spp.registry.id"].create( + { + "partner_id": ind1.id, + "id_type_id": self.id_type_national.id, + "value": "EXPIRED-ID", + "expiry_date": "2020-01-01", + } + ) + self.env["spp.registry.id"].create( + { + "partner_id": ind2.id, + "id_type_id": self.id_type_national.id, + "value": "EXPIRED-ID", + "expiry_date": "2020-01-01", + } + ) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.id_dedup.deduplicate_beneficiaries(states) + self.assertEqual(result, 0) + + def test_unsupported_id_type_ignored(self): + """IDs of unsupported types are ignored.""" + other_vocab = self.env["spp.vocabulary"].create({"name": "Other", "namespace_uri": "urn:test:other"}) + other_type = self.env["spp.vocabulary.code"].create( + {"vocabulary_id": other_vocab.id, "code": "OTH", "display": "Other ID"} + ) + self.program.target_type = "individual" + ind1 = self._create_individual("Person A", reg_ids=[(other_type, "SAME")]) + ind2 = self._create_individual("Person B", reg_ids=[(other_type, "SAME")]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.id_dedup.deduplicate_beneficiaries(states) + self.assertEqual(result, 0) + + def test_supported_id_type_field_uses_vocabulary_code(self): + """Verify that supported_id_document_type_ids points to spp.vocabulary.code.""" + field = self.id_dedup._fields["supported_id_document_type_ids"] + self.assertEqual(field.comodel_name, "spp.vocabulary.code") + + +class TestPhoneNumberDeduplication(TestDeduplicationCommon): + """Tests for phone number deduplication.""" + + def test_group_duplicate_phone_numbers(self): + """Detect duplicate phone numbers across groups.""" + ind1 = self._create_individual("Person A", phone_numbers=["+1234567890"]) + ind2 = self._create_individual("Person B", phone_numbers=["+1234567890"]) + group1 = self._create_group("Group A", [ind1]) + group2 = self._create_group("Group B", [ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + self.program.target_type = "group" + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + count = self.phone_dedup.deduplicate_beneficiaries(states) + self.assertGreater(count, 0) + + def test_group_phone_reason_includes_phone(self): + """Phone duplicate reason includes the phone number.""" + ind1 = self._create_individual("Alice", phone_numbers=["+9876543210"]) + ind2 = self._create_individual("Bob", phone_numbers=["+9876543210"]) + group1 = self._create_group("Group A", [ind1]) + group2 = self._create_group("Group B", [ind2]) + self._enroll_in_program(group1, "enrolled") + self._enroll_in_program(group2, "enrolled") + + self.program.target_type = "group" + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + self.phone_dedup.deduplicate_beneficiaries(states) + + dup_record = self.env["spp.program.membership.duplicate"].search( + [("deduplication_manager_id", "=", self.phone_dedup.id)], limit=1 + ) + self.assertTrue(dup_record) + self.assertIn("Duplicate phone number", dup_record.reason) + + def test_individual_duplicate_phone_numbers(self): + """Detect duplicate phone numbers for individual target type.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Person A", phone_numbers=["+1111111111"]) + ind2 = self._create_individual("Person B", phone_numbers=["+1111111111"]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.phone_dedup.deduplicate_beneficiaries(states) + self.assertGreater(result, 0) + + def test_no_duplicate_different_phones(self): + """No duplicates when phone numbers differ.""" + self.program.target_type = "individual" + ind1 = self._create_individual("Person A", phone_numbers=["+1111111111"]) + ind2 = self._create_individual("Person B", phone_numbers=["+2222222222"]) + self._enroll_in_program(ind1, "enrolled") + self._enroll_in_program(ind2, "enrolled") + + states = ["draft", "enrolled", "eligible", "paused", "duplicated"] + result = self.phone_dedup.deduplicate_beneficiaries(states) + self.assertEqual(result, 0) + + +class TestRecordDuplicate(TestDeduplicationCommon): + """Tests for the _record_duplicate method.""" + + def test_record_creates_duplicate_entry(self): + """_record_duplicate creates a duplicate record.""" + group = self._create_group("Test Group") + membership = self._enroll_in_program(group) + + self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Test reason") + + dup = self.env["spp.program.membership.duplicate"].search( + [ + ("beneficiary_ids", "in", membership.id), + ("reason", "=", "Test reason"), + ] + ) + self.assertTrue(dup) + self.assertEqual(dup.state, "duplicate") + + def test_no_duplicate_entry_for_same_reason(self): + """No duplicate entry created if one already exists for same reason.""" + group = self._create_group("Test Group") + membership = self._enroll_in_program(group) + + self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Same reason") + self.dedup_default._record_duplicate(self.dedup_default, [membership.id], "Same reason") + + dups = self.env["spp.program.membership.duplicate"].search( + [ + ("beneficiary_ids", "in", membership.id), + ("reason", "=", "Same reason"), + ] + ) + self.assertEqual(len(dups), 1) + + +class TestDeduplicationManagerSelection(TestDeduplicationCommon): + """Tests for deduplication manager selection/registration.""" + + def test_selection_includes_default(self): + selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id() + names = [s[0] for s in selection] + self.assertIn("spp.deduplication.manager.default", names) + + def test_selection_includes_phone(self): + selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id() + names = [s[0] for s in selection] + self.assertIn("spp.deduplication.manager.phone_number", names) + + def test_selection_includes_id_dedup(self): + selection = self.env["spp.deduplication.manager"]._selection_manager_ref_id() + names = [s[0] for s in selection] + self.assertIn("spp.deduplication.manager.id_dedup", names) + + def test_eligibility_selection_includes_id_and_phone(self): + selection = self.env["spp.eligibility.manager"]._selection_manager_ref_id() + names = [s[0] for s in selection] + self.assertIn("spp.program.membership.manager.id_dedup", names) + self.assertIn("spp.program.membership.manager.phone_number", names) diff --git a/spp_programs/tests/test_program_enrollment.py b/spp_programs/tests/test_program_enrollment.py new file mode 100644 index 00000000..b45a4fc5 --- /dev/null +++ b/spp_programs/tests/test_program_enrollment.py @@ -0,0 +1,366 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +import uuid + +from odoo.exceptions import UserError +from odoo.tests import TransactionCase + + +class TestProgramEnrollment(TransactionCase): + """Tests for program-level enrollment and deduplication workflows.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.program = cls.env["spp.program"].create( + {"name": f"Enrollment Test {uuid.uuid4().hex[:8]}", "target_type": "group"} + ) + + # Create program manager + cls.pm_default = cls.env["spp.program.manager.default"].create( + {"name": "Default PM", "program_id": cls.program.id} + ) + pm = cls.env["spp.program.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{cls.pm_default._name},{cls.pm_default.id}", + } + ) + cls.program.write({"program_manager_ids": [(4, pm.id)]}) + + # Create eligibility manager + cls.em_default = cls.env["spp.program.membership.manager.default"].create( + {"name": "Default EM", "program_id": cls.program.id} + ) + em = cls.env["spp.eligibility.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{cls.em_default._name},{cls.em_default.id}", + } + ) + cls.program.write({"eligibility_manager_ids": [(4, em.id)]}) + + # Create deduplication manager + cls.dedup_default = cls.env["spp.deduplication.manager.default"].create( + {"name": "Default Dedup", "program_id": cls.program.id} + ) + dm = cls.env["spp.deduplication.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{cls.dedup_default._name},{cls.dedup_default.id}", + } + ) + cls.program.write({"deduplication_manager_ids": [(4, dm.id)]}) + + def _create_group(self, name): + return self.env["res.partner"].create({"name": name, "is_registrant": True, "is_group": True}) + + def _enroll(self, partner, state="draft"): + return self.env["spp.program.membership"].create( + {"partner_id": partner.id, "program_id": self.program.id, "state": state} + ) + + def test_enroll_eligible_registrants_no_beneficiaries_raises(self): + """Enrollment raises error when no beneficiaries exist.""" + program = self.env["spp.program"].create({"name": f"Empty {uuid.uuid4().hex[:8]}"}) + with self.assertRaises(UserError): + program.enroll_eligible_registrants() + + def test_enroll_eligible_registrants_no_manager_raises(self): + """Enrollment raises error when no program manager exists.""" + program = self.env["spp.program"].create({"name": f"No PM {uuid.uuid4().hex[:8]}"}) + group = self._create_group("Group") + self.env["spp.program.membership"].create({"partner_id": group.id, "program_id": program.id}) + with self.assertRaises(UserError): + program.enroll_eligible_registrants() + + def test_enrollment_skips_duplicated(self): + """Enrollment does not change duplicated state to enrolled.""" + group = self._create_group("Duplicated Group") + membership = self._enroll(group, "duplicated") + + # Run enrollment + self.pm_default._enroll_eligible_registrants(["duplicated"]) + + membership.invalidate_recordset() + self.assertEqual(membership.state, "duplicated") + + def test_enrollment_skips_exited(self): + """Enrollment does not change exited state to enrolled.""" + group = self._create_group("Exited Group") + membership = self._enroll(group, "exited") + + self.pm_default._enroll_eligible_registrants(["exited"]) + + membership.invalidate_recordset() + self.assertEqual(membership.state, "exited") + + def test_enrollment_enrolls_draft(self): + """Enrollment changes draft state to enrolled.""" + group = self._create_group("Draft Group") + membership = self._enroll(group, "draft") + + self.pm_default._enroll_eligible_registrants(["draft"], do_count=True) + + membership.invalidate_recordset() + self.assertEqual(membership.state, "enrolled") + + def test_disenrollment_skips_duplicated(self): + """Disenrollment does not change duplicated to not_eligible.""" + group = self._create_group("Dup Group") + membership = self._enroll(group, "duplicated") + + self.pm_default._enroll_eligible_registrants(["duplicated"]) + + membership.invalidate_recordset() + # Should NOT be changed to not_eligible + self.assertNotEqual(membership.state, "not_eligible") + + def test_program_deduplicate_no_duplicates(self): + """Program-level deduplication shows no duplicates message.""" + group = self._create_group("Solo Group") + self._enroll(group, "enrolled") + + result = self.program.deduplicate_beneficiaries() + self.assertEqual(result["params"]["type"], "success") + self.assertIn("No duplicates", result["params"]["message"]) + + def test_program_deduplicate_with_duplicates(self): + """Program-level deduplication shows warning with counts.""" + ind = self.env["res.partner"].create({"name": "Shared", "is_registrant": True, "is_group": False}) + group1 = self._create_group("Group 1") + group2 = self._create_group("Group 2") + self.env["spp.group.membership"].create({"group": group1.id, "individual": ind.id}) + self.env["spp.group.membership"].create({"group": group2.id, "individual": ind.id}) + self._enroll(group1, "enrolled") + self._enroll(group2, "enrolled") + + result = self.program.deduplicate_beneficiaries() + self.assertEqual(result["params"]["type"], "warning") + + def test_program_deduplicate_shows_new_vs_existing(self): + """Deduplication message distinguishes new from existing duplicates.""" + ind = self.env["res.partner"].create({"name": "Shared", "is_registrant": True, "is_group": False}) + group1 = self._create_group("Group A") + group2 = self._create_group("Group B") + self.env["spp.group.membership"].create({"group": group1.id, "individual": ind.id}) + self.env["spp.group.membership"].create({"group": group2.id, "individual": ind.id}) + m1 = self._enroll(group1, "enrolled") + m2 = self._enroll(group2, "enrolled") + + # First run + self.program.deduplicate_beneficiaries() + + # Reset states to run again + m1.write({"state": "enrolled"}) + m2.write({"state": "enrolled"}) + + # Second run - should show already flagged + result = self.program.deduplicate_beneficiaries() + self.assertEqual(result["params"]["type"], "warning") + + def test_program_deduplicate_no_manager_raises(self): + """Deduplication raises error when no manager defined.""" + program = self.env["spp.program"].create({"name": f"No Dedup {uuid.uuid4().hex[:8]}"}) + with self.assertRaises(UserError): + program.deduplicate_beneficiaries() + + def test_verify_eligibility_no_manager_raises(self): + """verify_eligibility raises error when no program manager.""" + program = self.env["spp.program"].create({"name": f"No PM V {uuid.uuid4().hex[:8]}"}) + with self.assertRaises(UserError): + program.verify_eligibility() + + +class TestProgramManagerDefault(TransactionCase): + """Tests for spp.program.manager.default.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.program = cls.env["spp.program"].create({"name": f"PM Test {uuid.uuid4().hex[:8]}", "target_type": "group"}) + cls.pm = cls.env["spp.program.manager.default"].create({"name": "Default PM", "program_id": cls.program.id}) + pm_manager = cls.env["spp.program.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{cls.pm._name},{cls.pm.id}", + } + ) + cls.program.write({"program_manager_ids": [(4, pm_manager.id)]}) + + # Eligibility manager + em_default = cls.env["spp.program.membership.manager.default"].create( + {"name": "Default EM", "program_id": cls.program.id} + ) + em = cls.env["spp.eligibility.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{em_default._name},{em_default.id}", + } + ) + cls.program.write({"eligibility_manager_ids": [(4, em.id)]}) + + def test_enroll_no_eligibility_manager_raises(self): + """Enrollment raises error when no eligibility manager.""" + program = self.env["spp.program"].create({"name": f"No EM {uuid.uuid4().hex[:8]}"}) + pm = self.env["spp.program.manager.default"].create({"name": "PM", "program_id": program.id}) + with self.assertRaises(UserError): + pm.enroll_eligible_registrants() + + def test_selection_includes_default(self): + """Program manager selection includes default.""" + selection = self.env["spp.program.manager"]._selection_manager_ref_id() + names = [s[0] for s in selection] + self.assertIn("spp.program.manager.default", names) + + +class TestProgramModel(TransactionCase): + """Tests for spp.program model methods.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.program = cls.env["spp.program"].create( + {"name": f"Program Model {uuid.uuid4().hex[:8]}", "target_type": "group"} + ) + + def test_unique_program_name(self): + """Program names must be unique.""" + with self.assertRaises(UserError): + self.env["spp.program"].create({"name": self.program.name}) + + def test_compute_has_members(self): + """has_members is True when memberships exist.""" + self.assertFalse(self.program.has_members) + group = self.env["res.partner"].create({"name": "Group", "is_registrant": True, "is_group": True}) + self.env["spp.program.membership"].create({"partner_id": group.id, "program_id": self.program.id}) + self.program.invalidate_recordset() + self.assertTrue(self.program.has_members) + + def test_count_beneficiaries(self): + """count_beneficiaries returns correct count.""" + result = self.program.count_beneficiaries() + self.assertEqual(result["value"], 0) + + def test_count_beneficiaries_by_state(self): + """count_beneficiaries filters by state.""" + group = self.env["res.partner"].create({"name": "G", "is_registrant": True, "is_group": True}) + self.env["spp.program.membership"].create( + { + "partner_id": group.id, + "program_id": self.program.id, + "state": "enrolled", + } + ) + result = self.program.count_beneficiaries(["enrolled"]) + self.assertEqual(result["value"], 1) + result_draft = self.program.count_beneficiaries(["draft"]) + self.assertEqual(result_draft["value"], 0) + + def test_get_beneficiaries(self): + """get_beneficiaries returns correct recordset.""" + group = self.env["res.partner"].create({"name": "G", "is_registrant": True, "is_group": True}) + self.env["spp.program.membership"].create( + { + "partner_id": group.id, + "program_id": self.program.id, + "state": "enrolled", + } + ) + result = self.program.get_beneficiaries("enrolled") + self.assertEqual(len(result), 1) + + def test_get_beneficiaries_count(self): + """get_beneficiaries with count=True returns integer.""" + result = self.program.get_beneficiaries(count=True) + self.assertIsInstance(result, int) + + def test_get_beneficiaries_cursor_pagination(self): + """get_beneficiaries supports cursor-based pagination.""" + result = self.program.get_beneficiaries(last_id=0, limit=10) + self.assertEqual(len(result), 0) + + def test_open_eligible_beneficiaries_form(self): + """open_eligible_beneficiaries_form returns action.""" + result = self.program.open_eligible_beneficiaries_form() + self.assertEqual(result["type"], "ir.actions.act_window") + + def test_open_duplicate_membership_form(self): + """open_duplicate_membership_form returns action.""" + result = self.program.open_duplicate_membership_form() + self.assertIn("duplicated", str(result["domain"])) + + def test_open_cycles_form(self): + """open_cycles_form returns action.""" + result = self.program.open_cycles_form() + self.assertEqual(result["type"], "ir.actions.act_window") + + def test_open_program_form(self): + """open_program_form returns action.""" + result = self.program.open_program_form() + self.assertEqual(result["res_id"], self.program.id) + + def test_refresh_page(self): + """refresh_page returns reload action.""" + result = self.program.refresh_page() + self.assertEqual(result["tag"], "reload") + + def test_end_program(self): + """end_program changes state to ended.""" + self.program.with_context(active_ids=[self.program.id]).end_program() + self.assertEqual(self.program.state, "ended") + + def test_end_program_already_ended(self): + """end_program shows error for already ended program.""" + self.program.write({"state": "ended"}) + result = self.program.with_context(active_ids=[self.program.id]).end_program() + self.assertEqual(result["params"]["type"], "danger") + + def test_reactivate_program(self): + """reactivate_program changes state back to active.""" + self.program.write({"state": "ended"}) + self.program.with_context(active_ids=[self.program.id]).reactivate_program() + self.assertEqual(self.program.state, "active") + + def test_reactivate_active_program(self): + """reactivate_program shows error for active program.""" + result = self.program.with_context(active_ids=[self.program.id]).reactivate_program() + self.assertEqual(result["params"]["type"], "danger") + + def test_check_managers_limit(self): + """Cannot have more than one program manager.""" + pm1 = self.env["spp.program.manager.default"].create({"name": "PM1", "program_id": self.program.id}) + pm2 = self.env["spp.program.manager.default"].create({"name": "PM2", "program_id": self.program.id}) + mgr1 = self.env["spp.program.manager"].create( + { + "program_id": self.program.id, + "manager_ref_id": f"{pm1._name},{pm1.id}", + } + ) + mgr2 = self.env["spp.program.manager"].create( + { + "program_id": self.program.id, + "manager_ref_id": f"{pm2._name},{pm2.id}", + } + ) + with self.assertRaises(UserError): + self.program.write({"program_manager_ids": [(6, 0, [mgr1.id, mgr2.id])]}) + + def test_unlink_program(self): + """Program can be deleted with its managers.""" + program = self.env["spp.program"].create({"name": f"Delete Me {uuid.uuid4().hex[:8]}"}) + program.unlink() + self.assertFalse(program.exists()) + + def test_compute_has_compliance_criteria(self): + """has_compliance_criteria reflects manager presence.""" + self.assertFalse(self.program.has_compliance_criteria) + + def test_get_manager_unsupported_raises(self): + """get_manager raises for unsupported manager type.""" + with self.assertRaises(NotImplementedError): + self.program.get_manager("unsupported_type") + + def test_get_managers_unsupported_raises(self): + """get_managers raises for unsupported manager type.""" + with self.assertRaises(NotImplementedError): + self.program.get_managers("unsupported_type") diff --git a/spp_programs/tests/test_program_membership.py b/spp_programs/tests/test_program_membership.py new file mode 100644 index 00000000..5415cae4 --- /dev/null +++ b/spp_programs/tests/test_program_membership.py @@ -0,0 +1,231 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +import uuid + +from odoo import fields +from odoo.exceptions import UserError, ValidationError +from odoo.tests import TransactionCase + + +class TestProgramMembership(TransactionCase): + """Tests for spp.program.membership model.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.program = cls.env["spp.program"].create( + {"name": f"Membership Test {uuid.uuid4().hex[:8]}", "target_type": "group"} + ) + + # Create eligibility manager so enrollment works + manager_default = cls.env["spp.program.membership.manager.default"].create( + {"name": "Default", "program_id": cls.program.id} + ) + eligibility_manager = cls.env["spp.eligibility.manager"].create( + { + "program_id": cls.program.id, + "manager_ref_id": f"{manager_default._name},{manager_default.id}", + } + ) + cls.program.write({"eligibility_manager_ids": [(4, eligibility_manager.id)]}) + + cls.group = cls.env["res.partner"].create({"name": "Test Group", "is_registrant": True, "is_group": True}) + + def _create_membership(self, partner=None, state="draft"): + partner = partner or self.group + return self.env["spp.program.membership"].create( + { + "partner_id": partner.id, + "program_id": self.program.id, + "state": state, + } + ) + + def test_unique_partner_per_program(self): + """Cannot have duplicate memberships for same partner+program.""" + self._create_membership() + with self.assertRaises(ValidationError): + self._create_membership() + + def test_enrollment_date_set_on_enrolled(self): + """Enrollment date is set when state becomes enrolled.""" + membership = self._create_membership(state="enrolled") + self.assertTrue(membership.enrollment_date) + + def test_duplicate_reason_computed(self): + """Duplicate reason shows the reason from the duplicate record.""" + membership = self._create_membership(state="duplicated") + self.env["spp.program.membership.duplicate"].create( + { + "beneficiary_ids": [(6, 0, [membership.id])], + "state": "duplicate", + "reason": "Test duplicate reason", + "deduplication_manager_id": 1, + } + ) + membership.invalidate_recordset() + self.assertEqual(membership.duplicate_reason, "Test duplicate reason") + + def test_duplicate_reason_empty_when_not_duplicated(self): + """Duplicate reason is False when state is not duplicated.""" + membership = self._create_membership(state="enrolled") + self.assertFalse(membership.duplicate_reason) + + def test_enroll_prevents_duplicated_state(self): + """Cannot enroll a beneficiary that is in duplicated state.""" + membership = self._create_membership(state="duplicated") + result = membership.enroll_eligible_registrants() + self.assertEqual(result["params"]["type"], "warning") + self.assertIn("duplicated", result["params"]["message"].lower()) + membership.invalidate_recordset() + self.assertEqual(membership.state, "duplicated") + + def test_enroll_prevents_exited_state(self): + """Cannot enroll a beneficiary that is in exited state.""" + membership = self._create_membership(state="exited") + result = membership.enroll_eligible_registrants() + self.assertEqual(result["params"]["type"], "warning") + membership.invalidate_recordset() + self.assertEqual(membership.state, "exited") + + def test_enroll_already_enrolled_shows_info(self): + """Enrolling an already enrolled beneficiary shows info message.""" + membership = self._create_membership(state="enrolled") + result = membership.enroll_eligible_registrants() + self.assertEqual(result["params"]["type"], "info") + + def test_verify_eligibility_marks_not_eligible(self): + """verify_eligibility marks as not_eligible when no match.""" + # Create individual (won't match group target_type eligibility) + individual = self.env["res.partner"].create( + { + "name": "Individual Wrong Type", + "is_registrant": True, + "is_group": False, + } + ) + ind_membership = self.env["spp.program.membership"].create( + { + "partner_id": individual.id, + "program_id": self.program.id, + } + ) + ind_membership.verify_eligibility() + self.assertEqual(ind_membership.state, "not_eligible") + + def test_back_to_draft(self): + """Back to draft resets state.""" + membership = self._create_membership(state="not_eligible") + membership.back_to_draft() + self.assertEqual(membership.state, "draft") + + def test_action_pause(self): + """Pausing an enrolled membership.""" + membership = self._create_membership(state="enrolled") + membership.action_pause() + self.assertEqual(membership.state, "paused") + + def test_action_pause_non_enrolled_raises(self): + """Cannot pause a non-enrolled membership.""" + membership = self._create_membership(state="draft") + with self.assertRaises(UserError): + membership.action_pause() + + def test_action_resume(self): + """Resuming a paused membership.""" + membership = self._create_membership(state="paused") + membership.action_resume() + self.assertEqual(membership.state, "enrolled") + + def test_action_resume_non_paused_raises(self): + """Cannot resume a non-paused membership.""" + membership = self._create_membership(state="enrolled") + with self.assertRaises(UserError): + membership.action_resume() + + def test_action_exit(self): + """Exiting an enrolled membership.""" + membership = self._create_membership(state="enrolled") + membership.action_exit() + self.assertEqual(membership.state, "exited") + self.assertEqual(membership.exit_date, fields.Date.today()) + + def test_action_exit_non_enrolled_raises(self): + """Cannot exit a draft membership.""" + membership = self._create_membership(state="draft") + with self.assertRaises(UserError): + membership.action_exit() + + def test_open_beneficiaries_form(self): + """open_beneficiaries_form returns an action.""" + membership = self._create_membership() + result = membership.open_beneficiaries_form() + self.assertEqual(result["res_model"], "spp.program.membership") + self.assertEqual(result["res_id"], membership.id) + + def test_open_registrant_form_group(self): + """open_registrant_form returns an action for groups.""" + membership = self._create_membership() + result = membership.open_registrant_form() + self.assertEqual(result["res_model"], "res.partner") + self.assertEqual(result["res_id"], self.group.id) + + def test_open_registrant_form_individual(self): + """open_registrant_form returns an action for individuals.""" + individual = self.env["res.partner"].create({"name": "Individual", "is_registrant": True, "is_group": False}) + ind_program = self.env["spp.program"].create( + { + "name": f"Ind Program {uuid.uuid4().hex[:8]}", + "target_type": "individual", + } + ) + membership = self.env["spp.program.membership"].create( + {"partner_id": individual.id, "program_id": ind_program.id} + ) + result = membership.open_registrant_form() + self.assertEqual(result["res_model"], "res.partner") + self.assertEqual(result["res_id"], individual.id) + + def test_display_name(self): + """display_name is set on membership.""" + membership = self._create_membership() + self.assertTrue(membership.display_name) + + def test_deduplicate_from_membership(self): + """Deduplication can be triggered from membership.""" + membership = self._create_membership() + + # Set up deduplication manager + dedup_default = self.env["spp.deduplication.manager.default"].create( + {"name": "Default Dedup", "program_id": self.program.id} + ) + dedup_manager = self.env["spp.deduplication.manager"].create( + { + "program_id": self.program.id, + "manager_ref_id": f"{dedup_default._name},{dedup_default.id}", + } + ) + self.program.write({"deduplication_manager_ids": [(4, dedup_manager.id)]}) + + result = membership.deduplicate_beneficiaries() + self.assertEqual(result["tag"], "display_notification") + + def test_deduplicate_no_manager_raises(self): + """Deduplication raises error when no manager is set.""" + program_no_dedup = self.env["spp.program"].create({"name": f"No Dedup {uuid.uuid4().hex[:8]}"}) + membership = self.env["spp.program.membership"].create( + {"partner_id": self.group.id, "program_id": program_no_dedup.id} + ) + with self.assertRaises(UserError): + membership.deduplicate_beneficiaries() + + def test_get_view_form_group_context(self): + """_get_view applies group domain in context.""" + Membership = self.env["spp.program.membership"].with_context(target_type="group") + arch, view = Membership._get_view(view_type="form") + self.assertIn("is_group", arch) + + def test_get_view_form_individual_context(self): + """_get_view applies individual domain in context.""" + Membership = self.env["spp.program.membership"].with_context(target_type="individual") + arch, view = Membership._get_view(view_type="form") + self.assertIn("is_group", arch) diff --git a/spp_programs/views/managers/deduplication_manager_view.xml b/spp_programs/views/managers/deduplication_manager_view.xml index 0e92cf6b..d1220a77 100644 --- a/spp_programs/views/managers/deduplication_manager_view.xml +++ b/spp_programs/views/managers/deduplication_manager_view.xml @@ -172,7 +172,8 @@ Part of OpenSPP. See LICENSE file for full copyright and licensing details. diff --git a/spp_programs/views/program_membership_view.xml b/spp_programs/views/program_membership_view.xml index f01e3670..ced3e3ad 100644 --- a/spp_programs/views/program_membership_view.xml +++ b/spp_programs/views/program_membership_view.xml @@ -122,6 +122,14 @@ Part of OpenSPP. See LICENSE file for full copyright and licensing details. bg_color="bg-danger" invisible="state != 'not_eligible'" /> +