Multi platform kotlin client for Elasticsearch & Opensearch with easily extendable Kotlin DSLs for queries, mappings, bulk, and more.
KT Search Manual | Previous: Document Manipulation | Next: Efficiently Ingest Content Using Bulk Indexing |
Github | © Jilles van Gurp |
To cut down on the amount of copy pasting of aliases and index names, kt-search includes
a useful abstraction: the IndexRepository
.
An IndexRepository
allows you to work with a specific index. You can perform document CRUD, query,
do bulk indexing, etc. Additionally, you can configure read and write aliases and ensure the correct
aliases are used.
val repo = client.repository("test", TestDoc.serializer())
repo.createIndex(repo.indexNameOrWriteAlias,) {
mappings {
text(TestDoc::message)
}
}
val id = repo.index(TestDoc("A document")).id
repo.delete(id)
// and of course you can search in your index
repo.search {
query=matchAll()
}
The main purpose of the IndexRepository
is to handle document deserialization for you. To enable this
a few extension functions are provided that allow you to deserialize individual hits or lists and flows of
hits.
val documents: List<TestDoc> = repo.search {
query = match(TestDoc::message, "document")
}.parseHits<TestDoc>()
The disadvantage of extension functions is that they are not aware of the ModelSerializationStrategy
.
The version above is uses reified inline generics. There are also variants that take either the
ModelSerializationStrategy
or the kotlinx serialization strategy.
But there’s also a short hand in the IndexRepository
which doesn’t have this disadvantage.
val documents: List<TestDoc> = repo.searchDocuments {
query = match(TestDoc::message, "document")
}
This also works for search_after
, which returns a flow of hits that you can turn
into a flow of your document
Likewise you can get a document with getDocument
:
// returns null if the document is not found
val doc: TestDoc? = repo.getDocument("42")
Or get both the document and the GetResponse
by destructuring:
// throws a RestException if the document is not found
val (doc: TestDoc,resp: GetDocumentResponse) = repo.get("42")
Note. the type specifications above are of course optional because of type inference but added for readability.
repo.bulk {
// no need to specify the index
index(TestDoc("test"))
index(TestDoc("test1"))
index(TestDoc("test2"))
index(TestDoc("test3"))
}
Multi get is of course also supported.
repo.bulk {
index(TestDoc("One"), id = "1")
index(TestDoc("Two"), id = "2")
}
// multi get can be very convenient
repo.mGet("1","2")
// but you can also do use the full dsl
repo.mGet {
ids = listOf("1","2")
}
// or if you insist
repo.mGet {
doc {
id="1"
source=false
}
doc {
id="2"
}
}
Elasticsearch is of course not intended to be a database and it does not have transactions, for example. However, it does have a few features that allow you to (ab)use it as a DB.
Elasticsearch supports optimistic locking. With optimistic locking you can guarantee that you are not
overwriting concurrent updates to documents. Additionally, most write operations have a refresh parameter that you can set to wait_for
to ensure read consistency (read your own writes). Both features combined, make it possible to use
Elasticsearch as a simple document store.
Optimistic locking works by setting the if_primary_term
and if_seq_no
parameters on indexing operations and handling the version conflict http response by trying again with
a freshly fetched version of the document that has the current values of primary_term
and seq_no
.
Conflicts happen any time you have concurrent writes updating a document in between when you fetch it
and when you attempt to replace it. By specifying if_primary_term
and if_seq_no
, the conflict is
detected and you get a version conflict response.
It is called optimistic locking because instead of locking, it simply applies a cheap check that can fail that you can then act on by retrying. Since nothing gets locked, everything stays fast. And with a rare retry operation, performance should not suffer.
Dealing with this is of course a bit fiddly to do manually. To make optimistic locking really easy,
the IndexRepository
supports updates with retry both for single documents and with bulk operations.
val id = repo.index(TestDoc("A document")).id
repo.update(id, maxRetries = 2, block = { oldVersion ->
// return the object with modifications
oldVersion.copy(message = "An updated document")
}, retryDelay = 2.seconds)
This fetches the document and its primary_term
and seq_no
values, applies your update function,
and then stores it. In case of a version conflict, it re-fetches the document with the latest
primary_term
and seq_no
values, and then re-applies your update
block to that version. The number of retries is configurable. If all retries fail, you will get a
version conflict exception. The only time this may happen is if you have a lot of concurrent writes
to the same documents.
You may also want to apply optimistic locking to bulk updates and it has a similar mechanism for
setting if_primary_term
and if_seq_no
. The index repository implements an extended version of the
BulkSession that includes update functions similar to the above and uses a callback based retry mechanism.
See Efficiently Ingest Content Using Bulk Indexing for more information on callbacks.
val aDoc = TestDoc("A document")
val id = repo.index(aDoc).id
repo.bulk(
// these parameters are optional
// and have sensible defaults
maxRetries = 1,
retryTimeout = 2.seconds
) {
update(
id = id,
// you have to provide the original
original = aDoc,
// and the seq_no and primary_term
// these values are probably wrong
// amd will trigger a retry
ifSeqNo = 42,
ifPrimaryTerm = 42
) {
// like before, we use a function block
// to make the changes
it.copy(message = "Changed")
}
}
Digging out primary_term and seq_no numbers manually is of course a bit tedious.
As an alternative, you pass in anything that implements SourceInformation
. This includes document
get responses, multi get responses, and search hits.
val aDoc = TestDoc("A document")
val id = repo.index(aDoc).id
val (_, getResponse) = repo.get(id)
// note, you should use a multi get if you are updating many documents
repo.index(
value = aDoc.copy("This will be overwritten"),
id = getResponse.id
)
repo.bulk {
update(
// GetResponse implements SourceInformation
// however, our getResponse is now out of date
// so it will retry
getResponse
) {
it.copy(message = "Changed it again")
}
}
Using searchAfter is great for applying large amounts of updates to an index. This is how that works:
repo.bulk {
repo.searchAfter {
// this is needed because we need _seq_no and _primary_term
seqNoPrimaryTerm = true
query = matchAll()
}.let { (firstResponse, hitFlow) ->
// this will page through the entire index
hitFlow.collect { hit ->
// if somebody messes with the index while we do this
// bulk update will just retry it
update(
// hit implements SourceInformation
hit
) {
it.copy(message = it.message.reversed())
}
}
}
}
KT Search Manual | Previous: Document Manipulation | Next: Efficiently Ingest Content Using Bulk Indexing |
Github | © Jilles van Gurp |