|
12 | 12 | import sys |
13 | 13 | from collections import defaultdict |
14 | 14 | from pathlib import Path |
| 15 | +from typing import Any |
15 | 16 |
|
16 | 17 | parser = argparse.ArgumentParser() |
17 | 18 | parser.add_argument( |
|
36 | 37 | new_db_name = legacy_db.stem + '_v3.sqlite' |
37 | 38 | new_db_path = Path(legacy_db.parent, new_db_name) |
38 | 39 |
|
39 | | -con_old = sqlite3.connect(legacy_db) |
40 | | -con_new = sqlite3.connect(new_db_path) |
41 | | -cur = con_new.cursor() |
| 40 | +con_old: sqlite3.Connection = sqlite3.connect(legacy_db) |
| 41 | +con_new: sqlite3.Connection = sqlite3.connect(new_db_path) |
| 42 | +cur: sqlite3.Cursor = con_new.cursor() |
42 | 43 |
|
43 | 44 | # bring in the new schema and execute |
44 | 45 | with open(schema_file) as src: |
|
50 | 51 |
|
51 | 52 | # table mapping for DIRECT transfers |
52 | 53 | # fmt: off |
53 | | -direct_transfer_tables = [ |
| 54 | +direct_transfer_tables: list[tuple[str, str]] = [ |
54 | 55 | ("", "CapacityCredit"), |
55 | 56 | ("", "CapacityFactorProcess"), |
56 | 57 | ("", "CapacityFactorTech"), |
|
105 | 106 | ("SegFrac", "TimeSegmentFraction"), |
106 | 107 | ] |
107 | 108 |
|
108 | | -units_added_tables = [ |
| 109 | +units_added_tables: list[tuple[str, str]] = [ |
109 | 110 | ("", "MaxActivityGroup"), |
110 | 111 | ("", "MaxCapacityGroup"), |
111 | 112 | ("", "MinCapacityGroup"), |
112 | 113 | ("", "MinActivityGroup"), |
113 | 114 | ] |
114 | 115 |
|
115 | | -sequence_added_tables = [ |
| 116 | +sequence_added_tables: list[tuple[str, str]] = [ |
116 | 117 | ("time_season", "TimeSeason"), |
117 | 118 | ("time_periods", "time_period"), |
118 | 119 | ("time_of_day", "TimeOfDay"), |
|
126 | 127 | if old_name == '': |
127 | 128 | old_name = new_name |
128 | 129 |
|
129 | | - new_columns = [c[1] for c in con_new.execute(f'PRAGMA table_info({new_name});').fetchall()] |
130 | | - old_columns = [c[1] for c in con_old.execute(f'PRAGMA table_info({old_name});').fetchall()] |
| 130 | + new_columns: list[str] = [ |
| 131 | + c[1] for c in con_new.execute(f'PRAGMA table_info({new_name});').fetchall() |
| 132 | + ] |
| 133 | + old_columns: list[str] = [ |
| 134 | + c[1] for c in con_old.execute(f'PRAGMA table_info({old_name});').fetchall() |
| 135 | + ] |
131 | 136 | cols = str(old_columns[0 : len(new_columns)])[1:-1].replace("'", '') |
132 | 137 |
|
133 | 138 | try: |
134 | | - data = con_old.execute(f'SELECT {cols} FROM {old_name}').fetchall() |
| 139 | + data: list[Any] = con_old.execute(f'SELECT {cols} FROM {old_name}').fetchall() |
135 | 140 | except sqlite3.OperationalError: |
136 | 141 | print('TABLE NOT FOUND: ' + old_name) |
137 | 142 | data = [] |
|
222 | 227 | # let's ensure all the non-global entries are consistent (same techs in each region) |
223 | 228 | skip_rps = False |
224 | 229 | try: |
225 | | - rps_entries = con_old.execute('SELECT * FROM tech_rps').fetchall() |
| 230 | + rps_entries: list[tuple[str, str, str]] = con_old.execute('SELECT * FROM tech_rps').fetchall() |
226 | 231 | except sqlite3.OperationalError: |
227 | 232 | print('source does not appear to include RPS techs...skipping') |
228 | 233 | skip_rps = True |
| 234 | + rps_entries = [] |
229 | 235 | if not skip_rps: |
230 | 236 | for region, tech, _notes in rps_entries: |
231 | 237 | groups[region].add(tech) |
|
239 | 245 | for group, techs in groups.items(): |
240 | 246 | print(f'group: {group} mismatches: {common ^ techs}') |
241 | 247 | if group != 'global': |
242 | | - techs_common &= not common ^ techs |
| 248 | + techs_common &= not (common ^ techs) |
243 | 249 | if not techs_common: |
244 | 250 | print( |
245 | 251 | 'combining RPS techs failed. Some regions are not same. Must be done ' |
|
357 | 363 | data = con_old.execute(read_qry).fetchall() |
358 | 364 | if unlim_cap_present: |
359 | 365 | # need to convert null -> 0 for unlim_cap to match new schema that does not allow null |
360 | | - new_data = [] |
| 366 | + new_data: list[Any] = [] |
361 | 367 | for row in data: |
362 | 368 | new_row = list(row) |
363 | 369 | if new_row[4] is None: |
|
0 commit comments