Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/kernelbot/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,105 @@ async def admin_update_problems(
}


@app.post("/admin/backfill")
async def admin_backfill(
payload: dict,
_: Annotated[None, Depends(require_admin)],
db_context=Depends(get_db),
) -> dict:
"""Queue a backfill: re-run top submissions against the current task version.

After an update-problems changes the eval for a leaderboard, old run scores
become stale. This endpoint fetches the top N submissions (best per user)
from any previous task_version and re-submits each one so that new runs are
recorded with the current task_version.

Payload:
leaderboard (str): Leaderboard name (required).
gpu (str): GPU type to backfill (required).
top_n (int): How many top submissions to re-run (default 100).
"""
leaderboard_name = payload.get("leaderboard")
gpu = payload.get("gpu")
top_n = payload.get("top_n", 100)

if not leaderboard_name or not gpu:
raise HTTPException(status_code=400, detail="leaderboard and gpu are required")

if not backend_instance:
raise HTTPException(status_code=500, detail="Backend not initialized")

with db_context as db:
task_version = db.get_leaderboard_task_version(leaderboard_name)
if task_version <= 1:
return {
"status": "ok",
"message": "Leaderboard is still on task_version 1, nothing to backfill",
"queued": 0,
}

submissions = db.get_top_submissions_for_backfill(leaderboard_name, gpu, top_n)
lb = db.get_leaderboard(leaderboard_name)

if not submissions:
return {
"status": "ok",
"message": "No eligible submissions found from previous versions",
"queued": 0,
}

if not background_submission_manager:
raise HTTPException(
status_code=500,
detail="Background submission manager not available",
)

queued_ids = []
errors = []
for sub in submissions:
try:
req = ProcessedSubmissionRequest(
code=sub["code"],
file_name=sub["file_name"],
user_id=sub["user_id"],
user_name=sub["user_name"],
leaderboard=leaderboard_name,
gpus=[gpu],
task=lb["task"],
secret_seed=lb.get("secret_seed", 0),
task_gpus=[gpu],
)
with db_context as db:
new_sub_id = db.create_submission(
leaderboard=leaderboard_name,
file_name=sub["file_name"],
code=sub["code"],
user_id=sub["user_id"],
time=datetime.datetime.now(),
user_name=sub["user_name"],
)
await background_submission_manager.enqueue(
req, SubmissionMode.LEADERBOARD, new_sub_id
)
queued_ids.append(new_sub_id)
except Exception as e:
errors.append({
"submission_id": sub["submission_id"],
"user_id": sub["user_id"],
"error": str(e),
})

return {
"status": "ok",
"leaderboard": leaderboard_name,
"gpu": gpu,
"task_version": task_version,
"queued": len(queued_ids),
"queued_submission_ids": queued_ids,
"errors": errors,
}


@app.get("/leaderboards")
async def get_leaderboards(db_context=Depends(get_db)):
"""An endpoint that returns all leaderboards.
Expand Down
139 changes: 124 additions & 15 deletions src/libkernelbot/leaderboard_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,45 @@ def update_leaderboard(
task = definition.task
try:
lb_id = self.get_leaderboard_id(name)

# Check if the task actually changed; if so, bump task_version
self.cursor.execute(
"""
UPDATE leaderboard.leaderboard
SET deadline = %s, task = %s, description = %s
WHERE id = %s;
""",
(
deadline.astimezone(datetime.timezone.utc),
task.to_str(),
definition.description,
lb_id,
),
"SELECT task FROM leaderboard.leaderboard WHERE id = %s",
(lb_id,),
)
old_task_json = self.cursor.fetchone()[0]
old_task = LeaderboardTask.from_dict(old_task_json)
task_changed = old_task != task

if task_changed:
self.cursor.execute(
"""
UPDATE leaderboard.leaderboard
SET deadline = %s, task = %s, description = %s,
task_version = task_version + 1
WHERE id = %s;
""",
(
deadline.astimezone(datetime.timezone.utc),
task.to_str(),
definition.description,
lb_id,
),
)
else:
self.cursor.execute(
"""
UPDATE leaderboard.leaderboard
SET deadline = %s, task = %s, description = %s
WHERE id = %s;
""",
(
deadline.astimezone(datetime.timezone.utc),
task.to_str(),
definition.description,
lb_id,
),
)

# replace templates
self.cursor.execute(
Expand Down Expand Up @@ -429,6 +455,7 @@ def create_submission_run(
compilation: Optional[CompileResult],
result: RunResult,
system: SystemInfo,
task_version: Optional[int] = None,
):
try:
if compilation is not None:
Expand All @@ -437,11 +464,15 @@ def create_submission_run(
# check validity
self.cursor.execute(
"""
SELECT done FROM leaderboard.submission WHERE id = %s
SELECT s.done, l.task_version
FROM leaderboard.submission s
JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id
WHERE s.id = %s
""",
(submission,),
)
if self.cursor.fetchone()[0]:
row = self.cursor.fetchone()
if row[0]:
logger.error(
"Submission '%s' is already marked as done when trying to add %s run.",
submission,
Expand All @@ -452,16 +483,20 @@ def create_submission_run(
"but submission was already marked as done."
)

# Use provided task_version or fall back to leaderboard's current version
run_task_version = task_version if task_version is not None else row[1]

meta = {
k: result.__dict__[k]
for k in ["stdout", "stderr", "success", "exit_code", "command", "duration"]
}
self.cursor.execute(
"""
INSERT INTO leaderboard.runs (submission_id, start_time, end_time, mode,
secret, runner, score, passed, compilation, meta, result, system_info
secret, runner, score, passed, compilation, meta, result, system_info,
task_version
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
submission,
Expand All @@ -476,6 +511,7 @@ def create_submission_run(
json.dumps(meta),
json.dumps(result.result),
json.dumps(dataclasses.asdict(system)),
run_task_version,
),
)
self.connection.commit()
Expand Down Expand Up @@ -643,6 +679,7 @@ def get_leaderboard_submissions(
AND NOT r.secret
AND r.score IS NOT NULL
AND r.passed
AND r.task_version = l.task_version
AND s.user_id = %s
ORDER BY r.score ASC
LIMIT %s OFFSET %s
Expand All @@ -665,6 +702,7 @@ def get_leaderboard_submissions(
JOIN leaderboard.user_info ui ON s.user_id = ui.id
WHERE l.name = %s AND r.runner = %s AND NOT r.secret
AND r.score IS NOT NULL AND r.passed
AND r.task_version = l.task_version
ORDER BY s.user_id, r.score ASC
)
SELECT
Expand Down Expand Up @@ -713,6 +751,75 @@ def get_leaderboard_submissions(

return result

def get_top_submissions_for_backfill(
self,
leaderboard_name: str,
gpu_name: str,
top_n: int = 100,
) -> list[dict]:
"""Get the top N submissions (best per user) from any previous task_version.

Returns dicts with: submission_id, user_id, user_name, file_name, code, score, task_version
"""
self.cursor.execute(
"""
WITH best_submissions AS (
SELECT DISTINCT ON (s.user_id)
s.id as submission_id,
s.user_id,
s.file_name,
r.score,
r.task_version
FROM leaderboard.runs r
JOIN leaderboard.submission s ON r.submission_id = s.id
JOIN leaderboard.leaderboard l ON s.leaderboard_id = l.id
WHERE l.name = %s AND r.runner = %s AND NOT r.secret
AND r.score IS NOT NULL AND r.passed
AND r.task_version < l.task_version
ORDER BY s.user_id, r.score ASC
)
SELECT
bs.submission_id,
bs.user_id,
bs.file_name,
convert_from(cf.code, 'UTF8') as code,
bs.score,
bs.task_version,
ui.user_name
FROM best_submissions bs
JOIN leaderboard.submission s ON bs.submission_id = s.id
JOIN leaderboard.code_files cf ON s.code_id = cf.id
JOIN leaderboard.user_info ui ON bs.user_id = ui.id
ORDER BY bs.score ASC
LIMIT %s
""",
(leaderboard_name, gpu_name, top_n),
)

return [
{
"submission_id": row[0],
"user_id": row[1],
"file_name": row[2],
"code": row[3],
"score": row[4],
"task_version": row[5],
"user_name": row[6],
}
for row in self.cursor.fetchall()
]

def get_leaderboard_task_version(self, leaderboard_name: str) -> int:
"""Get the current task_version for a leaderboard."""
self.cursor.execute(
"SELECT task_version FROM leaderboard.leaderboard WHERE name = %s",
(leaderboard_name,),
)
row = self.cursor.fetchone()
if row is None:
raise LeaderboardDoesNotExist(leaderboard_name)
return row[0]

def generate_stats(self, last_day: bool, leaderboard_name: Optional[str] = None):
try:
return self._generate_stats(last_day, leaderboard_name)
Expand Down Expand Up @@ -1041,6 +1148,7 @@ def get_leaderboard_submission_count(
AND NOT r.secret
AND r.score IS NOT NULL
AND r.passed
AND r.task_version = l.task_version
AND s.user_id = %s
"""
args = (leaderboard_name, gpu_name, user_id)
Expand All @@ -1055,6 +1163,7 @@ def get_leaderboard_submission_count(
AND NOT r.secret
AND r.score IS NOT NULL
AND r.passed
AND r.task_version = l.task_version
"""
args = (leaderboard_name, gpu_name)

Expand Down
46 changes: 46 additions & 0 deletions src/migrations/20260313_01_backfill-add-task-version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
add-task-version
"""

from yoyo import step

__depends__ = {'20260226_01_WgYAV-queryindex'}


steps = [
step(
"""
ALTER TABLE leaderboard.leaderboard
ADD COLUMN task_version INT NOT NULL DEFAULT 1;
""",
"""
ALTER TABLE leaderboard.leaderboard
DROP COLUMN task_version;
"""
),
step(
"""
ALTER TABLE leaderboard.runs
ADD COLUMN task_version INT NOT NULL DEFAULT 1;
""",
"""
ALTER TABLE leaderboard.runs
DROP COLUMN task_version;
"""
),
# Update the partial index to include task_version for efficient filtering
step(
"""
DROP INDEX IF EXISTS leaderboard.idx_runs_valid_scores;
CREATE INDEX idx_runs_valid_scores
ON leaderboard.runs (submission_id, runner, score, task_version)
WHERE NOT secret AND score IS NOT NULL AND passed;
""",
"""
DROP INDEX IF EXISTS leaderboard.idx_runs_valid_scores;
CREATE INDEX idx_runs_valid_scores
ON leaderboard.runs (submission_id, runner, score)
WHERE NOT secret AND score IS NOT NULL AND passed;
"""
),
]