kt-search

Multi platform kotlin client for Elasticsearch & Opensearch with easily extendable Kotlin DSLs for queries, mappings, bulk, and more.


Project maintained by jillesvangurp Hosted on GitHub Pages — Theme by mattgraham

Index Repository

KT Search Manual Previous: Document Manipulation Next: Efficiently Ingest Content Using Bulk Indexing
Github © Jilles van Gurp  

To cut down on the amount of copy pasting of aliases and index names, kt-search includes a useful abstraction: the IndexRepository.

An IndexRepository allows you to work with a specific index. You can perform document CRUD, query, do bulk indexing, etc. Additionally, you can configure read and write aliases and ensure the correct aliases are used.

Creating a repository

val repo = client.repository("test", TestDoc.serializer())

repo.createIndex {
  mappings {
    text(TestDoc::message)
  }
}
val id = repo.index(TestDoc("A document")).id
repo.delete(id)

// and of course you can search in your index
repo.search {
  query=matchAll()
}

Deserializing documents

The main purpose of the IndexRepository is to handle document deserialization for you. To enable this a few extension functions are provided that allow you to deserialize individual hits or lists and flows of hits.

val documents: List<TestDoc> = repo.search {
  query = match(TestDoc::message, "document")
}.parseHits<TestDoc>()

The disadvantage of extension functions is that they are not aware of the ModelSerializationStrategy. The version above is uses reified inline generics. There are also variants that take either the ModelSerializationStrategy or the kotlinx serialization strategy.

But there’s also a short hand in the IndexRepository which doesn’t have this disadvantage.

val documents: List<TestDoc> = repo.searchDocuments {
  query = match(TestDoc::message, "document")
}

This also works for search_after, which returns a flow of hits that you can turn into a flow of your document

Likewise you can get a document with getDocument:

// returns null if the document is not found
val doc: TestDoc? = repo.getDocument("42")

Or get both the document and the GetResponse by destructuring:

// throws a RestException if the document is not found
val (doc: TestDoc,resp: GetDocumentResponse) = repo.get("42")

Note. the type specifications above are of course optional because of type inference but added for readability.

Bulk Indexing

repo.bulk {
  // no need to specify the index
  index(TestDoc("test"))
  index(TestDoc("test1"))
  index(TestDoc("test2"))
  index(TestDoc("test3"))
}

Multi Get

Multi get is of course also supported.

repo.bulk {
  index(TestDoc("One"), id = "1")
  index(TestDoc("Two"), id = "2")
}
// multi get can be very convenient
repo.mGet("1","2")
// but you can also do use the full dsl
repo.mGet {
  ids = listOf("1","2")
}
// or if you insist
repo.mGet {
  doc {
    id="1"
    source=false
  }
  doc {
    id="2"
  }
}

Optimistic locking and updates

Elasticsearch is of course not intended to be a database and it does not have transactions. However, it does have a few features that allow you to (ab)use it as one.

Elasticsearch supports optimistic locking. With optimistic locking you can guarantee that you are not overwriting concurrent updates to documents. Additionally, most write operations have a refresh parameter that you can set to wait_for to ensure read consistency (read your own writes). Both features combined, make it possible to use Elasticsearch as a simple document store.

Optimistic locking works by setting the if_primary_term and if_seq_no parameters on indexing operations and handling the version conflict http response by trying again with a freshly fetched version of the document that has the current values of primary_term and seq_no.

Conflicts happen any time you have concurrent writes updating a document in between when you fetch it and when you attempt to replace it. By specifying if_primary_term and if_seq_no, the conflict is detected and you get a version conflict response.

It is called optimistic locking because instead of locking, it simply applies a cheap check that can fail that you can then act on by retrying. Since nothing gets locked, everything stays fast. And with a rare retry operation, performance should not suffer.

Dealing with this is of course a bit fiddly to do manually. To make optimistic locking really easy, the IndexRepository supports updates with retry both for single documents and with bulk operations.

val id = repo.index(TestDoc("A document")).id
repo.update(id, maxRetries = 2, block = { oldVersion ->
  oldVersion.copy(message = "An updated document")
}, retryDelay = 2.seconds)

This fetches the document and its primary_term and seq_no values, applies your update function, and then stores it. In case of a version conflict, it re-fetches the document with the latest primary_term and seq_no values, and then re-applies your update function to that version. The number of retries is configurable. If all retries fail, you will get a version conflict exception. The only time this may happen is if you have a lot of concurrent writes to the same documents.

Bulk updates and optimistic locking

You may also want to apply optimistic locking to bulk updates and it has a similar mechanism for setting if_primary_term and if_seq_no. The index repository implements an extended version of the BulkSession that includes update functions similar to the above and uses a callback based retry mechanism.

See Efficiently Ingest Content Using Bulk Indexing for more information on callbacks.

val aDoc = TestDoc("A document")
val id = repo.index(aDoc).id
repo.bulk(
  // these parameters are optional
  // and have sensible defaults
  maxRetries = 1,
  retryTimeout = 2.seconds
) {
  update(
    id = id,
    // you have to provide the original
    original = aDoc,
    // and the seq_no and primary_term
    // these values are probably wrong
    // amd will trigger a retry
    ifSeqNo = 42,
    ifPrimaryTerm = 42
  ) {
    // like before, we use a function block
    // to make the changes
    it.copy(message = "Changed")
  }
}

Digging out primary_term and seq_no numbers manually is of course a bit tedious. As an alternative, you pass in anything that implements SourceInformation. This includes document get responses, multi get responses, and search hits.

val aDoc = TestDoc("A document")
val id = repo.index(aDoc).id

val (_, getResponse) = repo.get(id)
// note, you should use a multi get if you are updating many documents
repo.index(
  value = aDoc.copy("This will be overwritten"),
  id = getResponse.id
)
repo.bulk {
  update(
    // GetResponse implements SourceInformation
    // however, our getResponse is now out of date
    // so it will retry
    getResponse
  ) {
    it.copy(message = "Changed it again")
  }
}

Using searchAfter is great for applying large amounts of updates to an index. This is how that works:

repo.bulk {
  repo.searchAfter {
    // this is needed because we need _seq_no and _primary_term
    seqNoPrimaryTerm = true
    query = matchAll()
  }.let { (firstResponse, hitFlow) ->
    // this will page through the entire index
    hitFlow.collect { hit ->
      // if somebody messes with the index while we do this
      // bulk update will just retry it
      update(
        // hit implements SourceInformation
        hit
      ) {
        it.copy(message = it.message.reversed())
      }
    }
  }
}

KT Search Manual Previous: Document Manipulation Next: Efficiently Ingest Content Using Bulk Indexing
Github © Jilles van Gurp