Multi platform kotlin client for Elasticsearch & Opensearch with easily extendable Kotlin DSLs for queries, mappings, bulk, and more.
KT Search Manual | Previous: Index Repository | Next: Creating Data Streams |
Github | © Jilles van Gurp |
An important part of working with Elasticsearch is adding content. While the CRUD support is useful for manipulating individual objects in an index, it is not suitable for sending large amounts of data.
For that, bulk indexing should be used. The bulk API in Elasticsearch is one of the more complex APIs in ES. Kt-search provides a few key abstractions to make bulk indexing easy, robust, and straightforward.
@Serializable
data class Foo(val foo: String)
client.createIndex("test") {
mappings {
text(Foo::foo)
}
}
// create a bulk session
client.bulk {
// inside the block we can call index, create, or delete
(0..10).forEach { index ->
index(
// pass the json source (has to be on a single line)
source = DEFAULT_JSON.encodeToString(
Foo.serializer(),
Foo("document $index")
),
index = "test"
)
// same as index but will fail if a document with the id already exists
create(
// you can also just pass a Serializable object directly
doc = Foo("another doc: $index"),
index = "test",
// specify a custom id
id = "doc-$index",
)
// delete a document
delete(id = "666", index = "test")
}
}
You can of course customize the bulk session:
// bulk several parameters that you can set
client.bulk(
// will send a bulk request every 5 bulk operations
// default is 100
bulkSize = 5,
// default index to index to
target = "test",
// default is wait_for
refresh = Refresh.False
) {
// these will all go into the test index
(0..10).forEach { index ->
index(Foo("document $index"))
}
}
Of course the IndexRepository
supports bulk sessions as well.
val repo = client.repository("test", Foo.serializer())
repo.bulk {
create(Foo("will go into the test index"))
}
val repo = client.repository("test", Foo.serializer())
repo.bulk {
index(
doc = Foo("foo"),
id = "foo-1"
)
update(
id = "foo-1",
doc = """{"foo":"bar"}""",
docAsUpsert = true,
)
}
val(doc,resp) = repo.get("foo-1")
// will print bar
println(doc.foo)
repo.bulk {
update(
id = "foo-1",
script = Script.create {
source="ctx._source.foo = params.p1"
params= mapOf(
"p1" to "foobar"
)
}
)
}
repo.get("foo-1").let { (doc,_)->
// prints foobar
println(doc.foo)
}
Prints:
```
bar foobar
```
One of the trickier things with the bulk API is error handling.
To make this easy, you can use a BulkItemCallBack
with your bulk session.
val itemCallBack = object : BulkItemCallBack {
override fun itemFailed(
operationType: OperationType,
item: BulkResponse.ItemDetails
) {
println(
"""
${operationType.name} failed
${item.id} with ${item.status}
""".trimMargin()
)
}
override fun itemOk(
operationType: OperationType,
item: BulkResponse.ItemDetails
) {
println(
"""
operation $operationType completed!
id: ${item.id}
sq_no: ${item.seqNo}
primary_term ${item.primaryTerm}
""".trimIndent()
)
}
override fun bulkRequestFailed(
e: Exception,
ops: List<Pair<String, String?>>) {
println("""
Request failure ${e.message}.
Unless you set
""".trimIndent())
}
}
client.bulk(callBack = itemCallBack) {
// invalid json would cause an error
index("{}}")
This prints:
Request failure RequestIsWrong 400: _bulk
{"index":{}}
{}}
{"error":{"root_cause":[{"type":"action_request_validation_exception","reason":"
Validation Failed: 1: index is missing;"}],"type":"action_request_validation_exc
eption","reason":"Validation Failed: 1: index is missing;"},"status":400}.
Unless you set
For successful items, you might want to know what id was assigned or use the seq_no and primary_term for optimistic locking.
Bulk failures you might want to log or retry.
data class Thing(val name: String, val amount: Long = 42)
KT Search Manual | Previous: Index Repository | Next: Creating Data Streams |
Github | © Jilles van Gurp |