kt-search

Multi platform kotlin client for Elasticsearch & Opensearch


Project maintained by jillesvangurp Hosted on GitHub Pages — Theme by mattgraham

Deep paging using search_after and scroll

You can page through results using the from and size parameters (called resultSize in the DSL because the name clashes with Map.size). However, this has performance issues and for that result, the from is limited to 10000. To retrieve more than that number of results, you need deep paging.

There are two ways of doing this:

Both are somewhat complex APIs to use and kt-search provides an easy alternative that uses the Kotlin Flow API. It simply returns you a flow and handles the paging for you; completely automatically.

This works both with scrolling and search_after. In the examples below, we use the same index with TestDoc documents.

Search after

The main principle with using search_after is relying on the sort order of the results and specifying the sort values of the last value you have processed as the search_after. Because you can sort on multiple fields, this has to be an array. To guarantee consistency, search_after is generally used in combination with the Point in Time API.

val (resp,hitsFlow) = client.searchAfter(indexName,1.minutes) {
  // 1 result per page
  // use something higher obviously, 500 would be good
  resultSize = 1
  query = matchAll()
  // note if you want to add sorting, you should also short on _shard_doc
}
println("reported result set size ${resp.total}")
println("results in the hits flow: ${hitsFlow.count()}")

Captured Output:

reported result set size 3
results in the hits flow: 3

This orchestrates creating a point in time and paging through results using sort values returned in the response.

The hitsFlow is a flow that allows you to process all the results. Doing so, will fetch page after page of result and you can process very large indices with this.

Reindexing using search_after and bulk

A key use case for deep paging is processing all documents in an index. And of course indices in Elasticsearch can be extremely large. So, it is important to this incrementally.

A good use case is reindexing documents. This is easy with kt-search!

// we will rely on dynamic mapping
val newIndex="${indexName}-v2"
// WaitFor enables us to search right after
client.bulk(refresh = Refresh.WaitFor) {
  val (_,flow) = client.searchAfter(
    target = indexName,
    keepAlive = 1.minutes
  )
  // for each document ...
  flow.onEach { hit ->
    val doc = hit.parseHit<TestDoc>()
    // modify the name and put it in our new index
    index(doc?.copy(name= "${doc.name} v2"), index=newIndex)
  }.collect()
}
println("$newIndex has ${client.search(newIndex).total} documents")

Captured Output:

docs-search-demo-v2 has 3 documents

Scrolling searches

Even though search_after is the recommended way to do this, you can still use scrolling searches. Scrolling searches work in a very similar way to search_after but are a bit easier to use.

The main difference is that you have more control over consistency of the index with search_after if changes happen while you page through results.

For a scrolling search, simply search as normally but set the scroll parameter:

val response = client.search(indexName, scroll = "1m")
// the response has a scrollId that we can use to scroll all the results
val hitsFlow = client.scroll(response)
// the flow contains all the hits
println("Found ${hitsFlow.count()} results")

Captured Output:

Found 3 results

Doing it the hard way

If you want to do deep paging manually, this is of course possible. The example below serves as an illustration why you might prefer to use the easy way above instead:

// create a point in time
val pit = client.createPointInTime(indexName,1.minutes)
val q = SearchDSL().apply {
  resultSize=2

  query = matchAll()

  // this is not part of the DSL
  this["pit"] = JsonDsl().apply {
    this["id"] = pit
  }
  // it's recommended to sort on _shard_doc
  sort {
    add("_shard_doc", SortOrder.ASC)
  }
}
// don't specify the index
// the pit id implies the index
var resp = client.search(null, q)

var results=resp.hits?.hits?.size ?:0
// specify from where to continue searching
resp.hits?.hits?.last()?.sort?.let { sort ->
  q["search_after"] = sort
}
// the response will include a pit id
resp.pitId?.let { pid ->
  q["pit"] = JsonDsl().apply {
    this["id"] = pid
    this["keep_alive"] = "60s"
  }
}
// get the next page of results
resp = client.search(null, q)
results += resp.hits?.hits?.size ?: 0

// repeat ...
println("found $results results")

Captured Output:

found 3 results

Like with search_after you can also choose to specify scroll ids manually. If you do this, be sure to delete your scrolls after you are done searching.

var resp = client.search(indexName, scroll = "1m") {
  resultSize=2
  query=matchAll()
}
var results=resp.hits?.hits?.size ?:0
// fetch the next page with the scroll API
resp = client.scroll(
  scrollId = resp.scrollId?:error("no scrollId"),
  scroll = 1.minutes
)
results+=resp.hits?.hits?.size ?:0

// .. repeat this until done

// finally release the scrollId
client.deleteScroll(resp.scrollId)
println("found $results results")

Captured Output:

found 3 results