From 1be889f73353111866c4ac321ca3e3817f251472 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Mar 2026 15:43:29 -0800 Subject: [PATCH 1/3] #198 add logic for employment based xref --- python/employment.py | 27 +++- sql/employment/edd_land_use_split.sql | 223 ++++++++++++++++++++++++++ 2 files changed, 244 insertions(+), 6 deletions(-) create mode 100644 sql/employment/edd_land_use_split.sql diff --git a/python/employment.py b/python/employment.py index 49b3394..7bd4382 100644 --- a/python/employment.py +++ b/python/employment.py @@ -96,7 +96,7 @@ def _aggregate_lodes_to_mgra( Args: combined_data: LODES data with columns: year, block, naics_code, jobs - xref: Crosswalk with columns: block, mgra, allocation_pct + xref: Crosswalk with columns: block, mgra, pct_edd, pct_area, edd_flag year: The year for which to aggregate data Returns: @@ -125,7 +125,15 @@ def _aggregate_lodes_to_mgra( .assign(year=year) .merge( combined_data.merge(xref, on="block", how="inner") - .assign(value=lambda df: df["jobs"] * df["allocation_pct"]) + .assign( + value=lambda df: df["jobs"] + * df.apply( + lambda row: ( + row["pct_edd"] if row["edd_flag"] == 1 else row["pct_area"] + ), + axis=1, + ) + ) .groupby(["year", "mgra", "naics_code"], as_index=False)["value"] .sum(), on=["year", "mgra", "naics_code"], @@ -151,15 +159,22 @@ def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]: jobs_inputs["lodes_data"] = _get_lodes_data(year) - with utils.ESTIMATES_ENGINE.connect() as con: + with utils.GIS_ENGINE.connect() as con: # get crosswalk from Census blocks to MGRAs - with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file: - jobs_inputs["xref_block_to_mgra"] = pd.read_sql_query( + with open(utils.SQL_FOLDER / "employment/edd_land_use_split.sql") as file: + jobs_inputs["xref_block_to_mgra"] = utils.read_sql_query_fallback( sql=sql.text(file.read()), con=con, - params={"mgra_version": utils.MGRA_VERSION}, + params={ + "mgra_version": utils.MGRA_VERSION, + "year": year, + }, ) + # Debug: print columns + # print(f"xref columns: {jobs_inputs['xref_block_to_mgra'].columns.tolist()}") + + with utils.ESTIMATES_ENGINE.connect() as con: # get regional employment control totals from QCEW with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: jobs_inputs["control_totals"] = utils.read_sql_query_fallback( diff --git a/sql/employment/edd_land_use_split.sql b/sql/employment/edd_land_use_split.sql new file mode 100644 index 0000000..c99bba5 --- /dev/null +++ b/sql/employment/edd_land_use_split.sql @@ -0,0 +1,223 @@ +/* +This query provides a many-to-many cross reference mapping 2020 Census Blocks to Series 15 MGRAs +There are two cross references for separate use cases + 1) Cross reference based on EDD point-level jobs data + 2) Cross reference based on simple land area intersection + +Notes: + 1) The land area intersection cross reference is used as a default cross + reference as there may be instances where the EDD point layer indicates no + MGRAs to allocate data to but the Census LEHD LODES contains jobs that + need to be allocated to MGRAs. + 2) Data prior to year 2017 is not present in the EDD view and must be + queried directly from the source database table. + 3) This must be run on the GIS server +*/ + +SET NOCOUNT ON; +-- Initialize parameters and return table ------------------------------------ +DECLARE @year INTEGER = :year; +DECLARE @mgra_version NVARCHAR(10) = :mgra_version; +DECLARE @msg nvarchar(45) = 'EDD point-level data does not exist'; + + +-- Check for MGRA version and stop execution if not Series 15 +IF @mgra_version != 'mgra15' +BEGIN + RAISERROR('EDD xref only valid for Series 15 MGRAs',16,1) + RETURN +END + +-- Create temporary table for EDD data to support spatial index +DROP TABLE IF EXISTS [#edd]; +CREATE TABLE [#edd] ( + [id] INTEGER IDENTITY(1,1) NOT NULL, + [jobs] FLOAT NOT NULL, + [Shape] GEOMETRY NOT NULL, + CONSTRAINT [pk_tt_edd] PRIMARY KEY ([id]) +) + +-- Create spatial index for later spatial join +-- Bounding box coordinates from SANDAG GIS team +-- Identical to spatial index on LUDU point layers in GIS database +CREATE SPATIAL INDEX [sidx_tt_edd] ON [#edd] +([Shape]) USING GEOMETRY_AUTO_GRID +WITH (BOUNDING_BOX = ( + 6151635.98006938, + 1775442.36347014, + 6613401.66775663, + 2129306.52024172), + CELLS_PER_OBJECT = 8 +) + + +-- Get SANDAG GIS team EDD dataset ------------------------------------------- +DECLARE @qry NVARCHAR(max) +IF @year >= 2017 +BEGIN + INSERT INTO [#edd] + SELECT + 1.0 * [emp_total]/[emp_valid] AS [jobs], + [SHAPE] + FROM ( + SELECT + CASE WHEN [emp_m1] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m2] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m3] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m4] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m5] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m6] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m7] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m8] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 + END AS [emp_valid], + ISNULL([emp_m1], 0) + + COALESCE([emp_m2], 0) + + COALESCE([emp_m3], 0) + + COALESCE([emp_m4], 0) + + COALESCE([emp_m5], 0) + + COALESCE([emp_m6], 0) + + COALESCE([emp_m7], 0) + + COALESCE([emp_m8], 0) + + COALESCE([emp_m9], 0) + + COALESCE([emp_m10], 0) + + COALESCE([emp_m11], 0) + + COALESCE([emp_m12], 0) + AS [emp_total], + [SHAPE] + FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] + WHERE [year] = @year + ) AS [tt] + WHERE + [emp_valid] > 0 + AND [emp_total] > 0 +END +ELSE IF @year = 2016 OR @year BETWEEN 2010 AND 2014 +BEGIN + INSERT INTO [#edd] + SELECT + [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT [year], [emp_id], [employment] + FROM [EMPCORE].[ca_edd].[employment] + WHERE + [month_id] = 14 -- adjusted employment + AND [employment] > 0 + AND [year] = @year + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] +END +ELSE IF @year = 2015 +BEGIN + INSERT INTO [#edd] + SELECT + [employment] * ISNULL([headquarters].[share], 1) AS [jobs], + ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] + FROM [EMPCORE].[ca_edd].[businesses] + LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + ON [businesses].[year] = [headquarters].[year] + AND [businesses].[emp_id] = [headquarters].[emp_id] + INNER JOIN ( + SELECT + [year], + [emp_id], + 1.0 * ((ISNULL([15], 0) + ISNULL([16], 0) + ISNULL([17], 0)) + / + (CASE WHEN [15] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [16] IS NOT NULL THEN 1 ELSE 0 END + + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END + )) + AS [employment] + FROM [EMPCORE].[ca_edd].[employment] + PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] + WHERE + [year] = @year AND + ([15] IS NOT NULL OR [16] IS NOT NULL OR [17] IS NOT NULL) + ) AS [employment] + ON [businesses].[year] = [employment].[year] + AND [businesses].[emp_id] = [employment].[emp_id] +END + + +-- Send error message if no data exists -------------------------------------- +IF NOT EXISTS ( + SELECT TOP (1) * + FROM [#edd] +) +SELECT @msg AS [msg] +ELSE +-- Build cross reference of Census 2020 Blocks to MGRAs ---------------------- +BEGIN + -- Calculate sum of jobs in Census 2020 Blocks by MGRAs using EDD points + -- Used later to calculate % allocation of Census 2020 Block jobs to MGRAs + WITH [xref_edd] AS ( + SELECT + [CENSUSBLOCKS].[GEOID20] AS [block], + [MGRA15].[MGRA] AS [mgra], + SUM([jobs]) AS [mgra_jobs], + SUM(SUM([jobs])) OVER (PARTITION BY [CENSUSBLOCKS].[GEOID20]) AS [block_jobs] + FROM [#edd] + INNER JOIN [GeoDepot].[sde].[CENSUSBLOCKS] + ON [#edd].[Shape].STIntersects([CENSUSBLOCKS].[Shape]) = 1 + INNER JOIN [GeoDepot].[sde].[MGRA15] + ON [#edd].[Shape].STIntersects([MGRA15].[Shape]) = 1 + GROUP BY + [CENSUSBLOCKS].[GEOID20], [MGRA15].[MGRA] + ), + -- Get % area overlap of Census 2020 Block area and MGRAs + [xref_area] AS ( + SELECT + [c].[GEOID20] AS [block], + [m].[MGRA] AS [mgra], + ([c].[Shape].STIntersection([m].[Shape]).STArea() / [c].[Shape].STArea()) AS [pct_area] + FROM [GeoDepot].[sde].[CENSUSBLOCKS] AS [c] + LEFT OUTER JOIN [GeoDepot].[sde].[MGRA15] AS [m] + ON [c].[Shape].STIntersects([m].[Shape]) = 1 + WHERE ([c].[Shape].STIntersection([m].[Shape]).STArea() / [c].[Shape].STArea()) > 0.01 + ), + -- Combine results and calculate % allocations of Census 2020 blocks to MGRAs + [results] AS ( + SELECT + COALESCE([xref_edd].[block], [xref_area].[block]) AS [block], + COALESCE([xref_edd].[mgra], [xref_area].[mgra]) AS [mgra], + SUM(CASE WHEN COALESCE([xref_edd].[block_jobs], 0) = 0 THEN 0 + ELSE 1.0 * COALESCE([xref_edd].[mgra_jobs], 0) / COALESCE([xref_edd].[block_jobs], 0) + END) AS [pct_edd], + SUM(COALESCE([xref_area].[pct_area], 0)) AS [pct_area] + FROM [xref_edd] + FULL OUTER JOIN [xref_area] + ON [xref_edd].[block] = [xref_area].[block] + AND [xref_edd].[mgra] = [xref_area].[mgra] + GROUP BY + COALESCE([xref_edd].[block], [xref_area].[block]), + COALESCE([xref_edd].[mgra], [xref_area].[mgra]) + ) + -- Return results ensuring % allocations add to 1 within each Census 2020 Block + SELECT + [block], + [mgra], + CASE WHEN SUM([pct_edd]) OVER (PARTITION BY [block]) > 0 + THEN [pct_edd] * 1/SUM([pct_edd]) OVER (PARTITION BY [block]) + ELSE 0 END AS [pct_edd], + [pct_area] * 1/SUM([pct_area]) OVER (PARTITION BY [block]) AS [pct_area], + SUM([pct_edd]) OVER (PARTITION BY [block]) AS edd_flag + FROM + [results] + WHERE + [mgra] IS NOT NULL + ORDER BY + [block], + [mgra] +END + +--Drop Temp table +DROP TABLE IF EXISTS [#edd]; \ No newline at end of file From 901c91ac7b4d8d3a8fe57c374400a041c50b93b1 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Thu, 5 Mar 2026 16:29:00 -0800 Subject: [PATCH 2/3] #198 updates based on copilot pr feedback --- python/employment.py | 10 +--------- sql/employment/edd_land_use_split.sql | 9 +++++---- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/python/employment.py b/python/employment.py index 7bd4382..d986a46 100644 --- a/python/employment.py +++ b/python/employment.py @@ -127,12 +127,7 @@ def _aggregate_lodes_to_mgra( combined_data.merge(xref, on="block", how="inner") .assign( value=lambda df: df["jobs"] - * df.apply( - lambda row: ( - row["pct_edd"] if row["edd_flag"] == 1 else row["pct_area"] - ), - axis=1, - ) + * np.where(df["edd_flag"] == 1, df["pct_edd"], df["pct_area"]) ) .groupby(["year", "mgra", "naics_code"], as_index=False)["value"] .sum(), @@ -171,9 +166,6 @@ def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]: }, ) - # Debug: print columns - # print(f"xref columns: {jobs_inputs['xref_block_to_mgra'].columns.tolist()}") - with utils.ESTIMATES_ENGINE.connect() as con: # get regional employment control totals from QCEW with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file: diff --git a/sql/employment/edd_land_use_split.sql b/sql/employment/edd_land_use_split.sql index c99bba5..d4aa027 100644 --- a/sql/employment/edd_land_use_split.sql +++ b/sql/employment/edd_land_use_split.sql @@ -52,7 +52,6 @@ WITH (BOUNDING_BOX = ( -- Get SANDAG GIS team EDD dataset ------------------------------------------- -DECLARE @qry NVARCHAR(max) IF @year >= 2017 BEGIN INSERT INTO [#edd] @@ -72,8 +71,8 @@ BEGIN + CASE WHEN [emp_m9] IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN [emp_m10] IS NOT NULL THEN 1 ELSE 0 END + CASE WHEN [emp_m11] IS NOT NULL THEN 1 ELSE 0 END - + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 - END AS [emp_valid], + + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END + AS [emp_valid], ISNULL([emp_m1], 0) + COALESCE([emp_m2], 0) + COALESCE([emp_m3], 0) @@ -209,7 +208,9 @@ BEGIN THEN [pct_edd] * 1/SUM([pct_edd]) OVER (PARTITION BY [block]) ELSE 0 END AS [pct_edd], [pct_area] * 1/SUM([pct_area]) OVER (PARTITION BY [block]) AS [pct_area], - SUM([pct_edd]) OVER (PARTITION BY [block]) AS edd_flag + CASE WHEN SUM([pct_edd]) OVER (PARTITION BY [block])> 0 + THEN 1 + ELSE 0 END AS edd_flag FROM [results] WHERE From 784d683cd0b267da9e629d117beda18d22c189d2 Mon Sep 17 00:00:00 2001 From: bryce-sandag Date: Mon, 9 Mar 2026 15:57:45 -0700 Subject: [PATCH 3/3] #198 address Eric's PR feedback --- sql/employment/edd_land_use_split.sql | 73 ++++++++++++++------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/sql/employment/edd_land_use_split.sql b/sql/employment/edd_land_use_split.sql index d4aa027..bced7bf 100644 --- a/sql/employment/edd_land_use_split.sql +++ b/sql/employment/edd_land_use_split.sql @@ -74,20 +74,20 @@ BEGIN + CASE WHEN [emp_m12] IS NOT NULL THEN 1 ELSE 0 END AS [emp_valid], ISNULL([emp_m1], 0) - + COALESCE([emp_m2], 0) - + COALESCE([emp_m3], 0) - + COALESCE([emp_m4], 0) - + COALESCE([emp_m5], 0) - + COALESCE([emp_m6], 0) - + COALESCE([emp_m7], 0) - + COALESCE([emp_m8], 0) - + COALESCE([emp_m9], 0) - + COALESCE([emp_m10], 0) - + COALESCE([emp_m11], 0) - + COALESCE([emp_m12], 0) + + ISNULL([emp_m2], 0) + + ISNULL([emp_m3], 0) + + ISNULL([emp_m4], 0) + + ISNULL([emp_m5], 0) + + ISNULL([emp_m6], 0) + + ISNULL([emp_m7], 0) + + ISNULL([emp_m8], 0) + + ISNULL([emp_m9], 0) + + ISNULL([emp_m10], 0) + + ISNULL([emp_m11], 0) + + ISNULL([emp_m12], 0) AS [emp_total], [SHAPE] - FROM [EMPCORE].[ca_edd].[vi_ca_edd_employment] + FROM [ca_edd].[vi_ca_edd_employment] WHERE [year] = @year ) AS [tt] WHERE @@ -100,13 +100,13 @@ BEGIN SELECT [employment] * ISNULL([headquarters].[share], 1) AS [jobs], ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] - FROM [EMPCORE].[ca_edd].[businesses] - LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + FROM [ca_edd].[businesses] + LEFT JOIN [ca_edd].[headquarters] ON [businesses].[year] = [headquarters].[year] AND [businesses].[emp_id] = [headquarters].[emp_id] INNER JOIN ( SELECT [year], [emp_id], [employment] - FROM [EMPCORE].[ca_edd].[employment] + FROM [ca_edd].[employment] WHERE [month_id] = 14 -- adjusted employment AND [employment] > 0 @@ -121,8 +121,8 @@ BEGIN SELECT [employment] * ISNULL([headquarters].[share], 1) AS [jobs], ISNULL([headquarters].[shape], [businesses].[shape]) AS [SHAPE] - FROM [EMPCORE].[ca_edd].[businesses] - LEFT JOIN [EMPCORE].[ca_edd].[headquarters] + FROM [ca_edd].[businesses] + LEFT JOIN [ca_edd].[headquarters] ON [businesses].[year] = [headquarters].[year] AND [businesses].[emp_id] = [headquarters].[emp_id] INNER JOIN ( @@ -136,7 +136,7 @@ BEGIN + CASE WHEN [17] IS NOT NULL THEN 1 ELSE 0 END )) AS [employment] - FROM [EMPCORE].[ca_edd].[employment] + FROM [ca_edd].[employment] PIVOT(SUM([employment]) FOR [month_id] IN ([15], [16], [17])) AS [pivot] WHERE [year] = @year AND @@ -163,7 +163,8 @@ BEGIN [CENSUSBLOCKS].[GEOID20] AS [block], [MGRA15].[MGRA] AS [mgra], SUM([jobs]) AS [mgra_jobs], - SUM(SUM([jobs])) OVER (PARTITION BY [CENSUSBLOCKS].[GEOID20]) AS [block_jobs] + SUM(SUM([jobs])) OVER (PARTITION BY [CENSUSBLOCKS].[GEOID20]) + AS [block_jobs] FROM [#edd] INNER JOIN [GeoDepot].[sde].[CENSUSBLOCKS] ON [#edd].[Shape].STIntersects([CENSUSBLOCKS].[Shape]) = 1 @@ -175,30 +176,34 @@ BEGIN -- Get % area overlap of Census 2020 Block area and MGRAs [xref_area] AS ( SELECT - [c].[GEOID20] AS [block], - [m].[MGRA] AS [mgra], - ([c].[Shape].STIntersection([m].[Shape]).STArea() / [c].[Shape].STArea()) AS [pct_area] - FROM [GeoDepot].[sde].[CENSUSBLOCKS] AS [c] - LEFT OUTER JOIN [GeoDepot].[sde].[MGRA15] AS [m] - ON [c].[Shape].STIntersects([m].[Shape]) = 1 - WHERE ([c].[Shape].STIntersection([m].[Shape]).STArea() / [c].[Shape].STArea()) > 0.01 + [CENSUSBLOCKS].[GEOID20] AS [block], + [MGRA15].[MGRA] AS [mgra], + ([CENSUSBLOCKS].[Shape].STIntersection([MGRA15].[Shape]).STArea() + / [CENSUSBLOCKS].[Shape].STArea()) + AS [pct_area] + FROM [GeoDepot].[sde].[CENSUSBLOCKS] + LEFT JOIN [GeoDepot].[sde].[MGRA15] + ON [CENSUSBLOCKS].[Shape].STIntersects([MGRA15].[Shape]) = 1 + WHERE ([CENSUSBLOCKS].[Shape].STIntersection([MGRA15].[Shape]).STArea() + / [CENSUSBLOCKS].[Shape].STArea()) > 0.01 ), -- Combine results and calculate % allocations of Census 2020 blocks to MGRAs [results] AS ( SELECT - COALESCE([xref_edd].[block], [xref_area].[block]) AS [block], - COALESCE([xref_edd].[mgra], [xref_area].[mgra]) AS [mgra], - SUM(CASE WHEN COALESCE([xref_edd].[block_jobs], 0) = 0 THEN 0 - ELSE 1.0 * COALESCE([xref_edd].[mgra_jobs], 0) / COALESCE([xref_edd].[block_jobs], 0) + ISNULL([xref_edd].[block], [xref_area].[block]) AS [block], + ISNULL([xref_edd].[mgra], [xref_area].[mgra]) AS [mgra], + SUM(CASE WHEN ISNULL([xref_edd].[block_jobs], 0) = 0 THEN 0 + ELSE 1.0 * ISNULL([xref_edd].[mgra_jobs], 0) + / ISNULL([xref_edd].[block_jobs], 0) END) AS [pct_edd], - SUM(COALESCE([xref_area].[pct_area], 0)) AS [pct_area] + SUM(ISNULL([xref_area].[pct_area], 0)) AS [pct_area] FROM [xref_edd] - FULL OUTER JOIN [xref_area] + FULL JOIN [xref_area] ON [xref_edd].[block] = [xref_area].[block] AND [xref_edd].[mgra] = [xref_area].[mgra] GROUP BY - COALESCE([xref_edd].[block], [xref_area].[block]), - COALESCE([xref_edd].[mgra], [xref_area].[mgra]) + ISNULL([xref_edd].[block], [xref_area].[block]), + ISNULL([xref_edd].[mgra], [xref_area].[mgra]) ) -- Return results ensuring % allocations add to 1 within each Census 2020 Block SELECT