-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBM25.py
More file actions
141 lines (113 loc) · 5.77 KB
/
BM25.py
File metadata and controls
141 lines (113 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import json
import argparse
from IndexEngine import TokenizeStrings
from math import log
import os
from objects import RetrievalTestingOutput, RetrievalOutput
from PorterStemmer import PorterStemmer
def bm_25(k1=1.2, b=0.75, top_retrieved = 1000, use_stemming=False, testing = True, **kwargs):
if testing and (not os.path.exists(kwargs["directory_path"]) or not os.path.exists(kwargs["queries_path"])):
raise ValueError("Please provide a valid path to the contents being retrieved")
ps = PorterStemmer() if use_stemming else None
queries = read_json(kwargs["queries_path"]) if testing else kwargs["queries"]
lexicon = read_json(os.path.join(kwargs["directory_path"], "lexicon.json")) if testing else kwargs["lexicon"]
inverted_index = read_json(os.path.join(kwargs["directory_path"], "inverted-index.json")) if testing else kwargs["inverted_index"]
mapping_to_docno = read_json("mapping.json")["doc_nos"] if testing else kwargs["mapping_to_docno"]
doc_lengths = read_doc_lengths(os.path.join(kwargs["directory_path"], "doc-lengths.txt")) if testing else kwargs["doc_lengths"]
N = len(doc_lengths)
avdl = sum(doc_lengths.values()) / N
list_output = []
if testing:
for topic_number, query_text in queries.items():
tokens = []
TokenizeStrings(query_text.split(" "), tokens)
if ps:
tokens[:] = [ps.stem(token, 0, len(token) - 1) for token in tokens]
scores = {}
for token in tokens:
if token not in lexicon:
continue
token_id = lexicon[token]
postings = inverted_index[str(token_id)]
ni = len(postings) // 2
for i in range(0, len(postings), 2):
doc_id = postings[i]
fi = postings[i + 1]
dl = doc_lengths[str(doc_id)]
score = bm_25_score(fi, N, ni, dl, avdl, k1, b)
if doc_id not in scores:
scores[doc_id] = 0
scores[doc_id] += score
ranked_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_retrieved]
for rank, (doc_id, score) in enumerate(ranked_docs):
list_output.append(RetrievalTestingOutput(topic_number, mapping_to_docno[doc_id], rank + 1, score))
write_to_txt(list_output, kwargs["file_output"])
else:
tokens = []
TokenizeStrings(queries.split(" "), tokens)
if ps:
tokens[:] = [ps.stem(token, 0, len(token) - 1) for token in tokens]
scores = {}
for token in tokens:
if token not in lexicon:
continue
token_id = lexicon[token]
postings = inverted_index[str(token_id)]
ni = len(postings) // 2
for i in range(0, len(postings), 2):
doc_id = postings[i]
fi = postings[i + 1]
dl = doc_lengths[str(doc_id)]
score = bm_25_score(fi, N, ni, dl, avdl, k1, b)
if doc_id not in scores:
scores[doc_id] = 0
scores[doc_id] += score
ranked_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:top_retrieved]
for rank, (doc_id, score) in enumerate(ranked_docs):
meta_data = read_json(os.path.join("IndexEngine","MetaData",mapping_to_docno[doc_id]+".json"))
list_output.append(RetrievalOutput(rank+1, meta_data["headline"], meta_data["date"], mapping_to_docno[doc_id]))
return list_output
def read_doc_lengths(file_path):
doc_lengths = {}
with open(file_path, 'r') as file:
for idx, line in enumerate(file, start=1):
doc_lengths[str(idx-1)] = int(line.strip())
return doc_lengths
def bm_25_score(fi, N, ni, dl, avdl, k1, b):
K = k1 * ((1 - b) + b * (dl / avdl))
frequency_saturation = fi / (fi + K)
idf = log((N - ni + 0.5) / (ni + 0.5))
return idf * frequency_saturation
def write_to_txt(list_output, file_output):
if ".txt" not in file_output:
file_output += ".txt"
with open(file_output, 'w') as f:
for output in list_output:
line = f"{output.topicID} {output.Q} {output.docno} {output.rank} {output.score} {output.runTag}\n"
f.write(line)
def read_json(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return data
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Perform BM25 retrieval on an inverted index.'
)
parser.add_argument('directory_path', type=str,
help='Path to the directory containing the index files.')
parser.add_argument('queries_path', type=str,
help='Path to the queries JSON file.')
parser.add_argument('file_output', type=str,
help='Path to the output file where results will be stored.')
parser.add_argument('--use_stemming', action='store_true',
help='If set, the query will be stemmed using Porter Stemmer.')
args = parser.parse_args()
if not args.directory_path or not args.queries_path or not args.file_output:
raise ValueError(
"Please input the path to the directory that has the output from the IndexEngine\n"
"as well as the path for the file where the queries are stored\n"
"and the path for the output file.\n"
"Would look something like this:\n"
"python bm25.py /home/smucker/latimes-index queries.txt hw2-results-WatIAMUserID.txt"
)
bm_25(directory_path = args.directory_path, queries_path = args.queries_path, file_output = args.file_output, k1=1.2, b=0.75, use_stemming=args.use_stemming)