-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathscroller.rb
More file actions
273 lines (249 loc) · 9.14 KB
/
scroller.rb
File metadata and controls
273 lines (249 loc) · 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# frozen_string_literal: true
module ElastomerClient
class Client
# Create a new Scroller instance for scrolling all results from a `query`.
#
# query - The query to scroll as a Hash or a JSON encoded String
# opts - Options Hash
# :index - the name of the index to search
# :type - the document type to search
# :scroll - the keep alive time of the scrolling request (5 minutes by default)
# :size - the number of documents per shard to fetch per scroll
#
# Examples
#
# scroll = client.scroll('{"query":{"match_all":{}}}', index: 'test')
# scroll.each_document do |document|
# document['_id']
# document['_source']
# end
#
# Returns a new Scroller instance
def scroll(query, opts = {})
Scroller.new(self, query, opts)
end
# Create a new Scroller instance for scrolling all results from a `query`
# via "scan" semantics by sorting by _doc.
#
# query - The query to scan as a Hash or a JSON encoded String
# opts - Options Hash
# :index - the name of the index to search
# :type - the document type to search
# :scroll - the keep alive time of the scrolling request (5 minutes by default)
# :size - the number of documents per shard to fetch per scroll
#
# Examples
#
# scan = client.scan('{"query":{"match_all":{}}}', index: 'test')
# scan.each_document do |document|
# document['_id']
# document['_source']
# end
#
# Returns a new Scroller instance
def scan(query, opts = {})
Scroller.new(self, add_sort_by_doc(query), opts)
end
# Begin scrolling a query.
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
#
# opts - Options Hash
# :body - the query to scroll as a Hash or JSON encoded String
# :index - the name of the index to search
# :type - the document type to search
# :scroll - the keep alive time of the scrolling request (5 minutes by default)
# :size - the number of documents per shard to fetch per scroll
#
# Examples
#
# h = client.start_scroll(body: '{"query":{"match_all":{}},"sort":{"created":"desc"}}', index: 'test')
# scroll_id = h['_scroll_id']
# h['hits']['hits'].each { |doc| ... }
#
# h = client.continue_scroll(scroll_id)
# scroll_id = h['_scroll_id']
# h['hits']['hits'].each { |doc| ... }
#
# # repeat until there are no more hits
#
# Returns the response body as a Hash.
def start_scroll(opts = {})
opts = opts.merge action: "search.start_scroll", rest_api: "search"
opts.delete(:type) if version_support.es_version_8_plus?
response = get "{/index}{/type}/_search", opts
response.body
end
# Continue scrolling a query.
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
#
# scroll_id - The current scroll ID as a String
# scroll - The keep alive time of the scrolling request (5 minutes by default)
#
# Examples
#
# scroll_id = client.start_scroll(body: '{"query":{"match_all":{}}}', index: 'test')['_scroll_id']
#
# h = client.continue_scroll scroll_id # scroll to get the next set of results
# scroll_id = h['_scroll_id'] # and store the scroll_id to use later
#
# h = client.continue_scroll scroll_id # scroll again to get the next set of results
# scroll_id = h['_scroll_id'] # and store the scroll_id to use later
#
# # repeat until the results are empty
#
# Returns the response body as a Hash.
def continue_scroll(scroll_id, scroll = "5m")
response = get "/_search/scroll", body: {scroll_id:}, scroll:, action: "search.scroll", rest_api: "scroll"
response.body
rescue RequestError => err
if err.error && err.error["caused_by"]["type"] == "search_context_missing_exception"
raise SearchContextMissing, "No search context found for scroll ID #{scroll_id.inspect}"
else
raise err
end
end
# Delete one or more scroll IDs.
# see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#_clear_scroll_api
#
# scroll_id - One or more scroll IDs
#
# Returns the response body as a Hash.
def clear_scroll(scroll_ids)
response = delete "/_search/scroll", body: {scroll_id: Array(scroll_ids)}, action: "search.clear_scroll", rest_api: "clear_scroll"
response.body
end
# Internal: Add sort by doc to query.
#
# Raises an exception if the query contains a sort already.
# Returns the query as a hash
def add_sort_by_doc(query)
if query.nil?
query = {}
elsif query.is_a? String
query = MultiJson.load(query)
end
if query.has_key? :sort
raise ArgumentError, "Query cannot contain a sort (found sort '#{query[:sort]}' in query: #{query})"
end
query.merge(sort: [:_doc])
end
DEFAULT_OPTS = {
index: nil,
type: nil,
scroll: "5m",
size: 50,
}.freeze
class Scroller
# Create a new scroller that can be used to iterate over all the documents
# returned by the `query`. The Scroller supports both the 'scan' and the
# 'scroll' search types.
#
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
# and https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-type.html#scan
#
# client - ElastomerClient::Client used for HTTP requests to the server
# query - The query to scroll as a Hash or a JSON encoded String
# opts - Options Hash
# :index - the name of the index to search
# :type - the document type to search
# :scroll - the keep alive time of the scrolling request (5 minutes by default)
# :size - the number of documents per shard to fetch per scroll
#
# Examples
#
# scan = Scroller.new(client, {query: {match_all: {}}}, index: 'test-1')
# scan.each_document { |doc|
# doc['_id']
# doc['_source']
# }
#
def initialize(client, query, opts = {})
@client = client
@opts = DEFAULT_OPTS.merge({ body: query }).merge(opts)
@scroll_id = nil
@offset = 0
end
attr_reader :client, :query, :scroll_id
# Iterate over all the search results from the scan query.
#
# block - The block will be called for each set of matching documents
# returned from executing the scan query.
#
# Yields a hits Hash containing the 'total' number of hits, current
# 'offset' into that total, and the Array of 'hits' document Hashes.
#
# Examples
#
# scan.each do |hits|
# hits['total']
# hits['offset']
# hits['hits'].each { |document| ... }
# end
#
# Returns this Scroller instance.
def each
loop do
body = do_scroll
hits = body["hits"]
break if hits["hits"].empty?
hits["offset"] = @offset
@offset += hits["hits"].length
yield hits
end
self
ensure
clear!
end
# Iterate over each document from the scan query. This method is just a
# convenience wrapper around the `each` method; it iterates the Array of
# documents and passes them one by one to the block.
#
# block - The block will be called for each document returned from
# executing the scan query.
#
# Yields a document Hash.
#
# Examples
#
# scan.each_document do |document|
# document['_id']
# document['_source']
# end
#
# Returns this Scroller instance.
def each_document(&block)
each { |hits| hits["hits"].each(&block) }
end
# Terminate the scroll query. This will remove the search context from the
# cluster and no further documents can be returned by this Scroller
# instance.
#
# Returns nil if the `scroll_id` is not valid; returns the response body if
# the `scroll_id` was cleared.
def clear!
return if scroll_id.nil?
client.clear_scroll(scroll_id)
rescue ::ElastomerClient::Client::IllegalArgument
nil
end
# Internal: Perform the actual scroll requests. This method wil call out
# to the `Client#start_scroll` and `Client#continue_scroll` methods while
# keeping track of the `scroll_id` internally.
#
# Returns the response body as a Hash.
def do_scroll
if scroll_id.nil?
body = client.start_scroll(@opts)
if body["hits"]["hits"].empty?
@scroll_id = body["_scroll_id"]
return do_scroll
end
else
body = client.continue_scroll(scroll_id, @opts[:scroll])
end
@scroll_id = body["_scroll_id"]
body
end
end # Scroller
end # Client
end # ElastomerClient