Multi platform kotlin client for Elasticsearch & Opensearch with easily extendable Kotlin DSLs for queries, mappings, bulk, and more.
KT Search Manual | Previous: Specialized Queries | Next: Deep Paging Using search_after and scroll |
Github | © Jilles van Gurp |
The aggregations DSL in Elasticsearch is both very complicated and vast. This is an area where using Kotlin can vastly simplify things for programmers.
The search dsl provides several levels of convenience here:
First lets create some sample documents to aggregate on:
@Serializable
data class MockDoc(
val name: String,
val tags: List<String>? = null,
val color: String? = null,
val timestamp: Instant? = null
)
val indexName = "docs-aggs-demo"
client.createIndex(indexName) {
mappings {
text(MockDoc::name)
keyword(MockDoc::color)
keyword(MockDoc::tags)
date(MockDoc::timestamp)
}
}
val repo = client.repository(indexName, MockDoc.serializer())
repo.bulk {
val now = Clock.System.now()
index(
MockDoc(
name = "1",
tags = listOf("bar"),
color = "green",
timestamp = now
)
)
index(
MockDoc(
name = "2",
tags = listOf("foo"),
color = "red",
timestamp = now - 1.days
)
)
index(
MockDoc(
name = "3",
tags = listOf("foo", "bar"),
color = "red",
timestamp = now - 5.days
)
)
index(
MockDoc(
name = "4",
tags = listOf("foobar"),
color = "green",
timestamp = now - 10.days
)
)
}
Probably the most used aggregation is the terms
aggregation:
val response = client.search(indexName) {
// we don't care about the results here
resultSize = 0
agg(MyAggNames.BY_TAG, TermsAgg(MockDoc::tags)) {
// aggregations can be nested
agg(MyAggNames.BY_COLOR, TermsAgg(MockDoc::color) {
minDocCount = 1
aggSize = 3
})
}
}
// a pretty printing Json configuration that comes with kt-search
println(DEFAULT_PRETTY_JSON.encodeToString(response))
This prints:
{
"took": 3,
"_shards": {
"total": 1,
"successful": 1,
"failed": 0,
"skipped": 0
},
"timed_out": false,
"hits": {
"total": {
"value": 4,
"relation": "eq"
},
"hits": []
},
"aggregations": {
"BY_TAG": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "bar",
"doc_count": 2,
"BY_COLOR": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "green",
"doc_count": 1
},
{
"key": "red",
"doc_count": 1
}
]
}
},
{
"key": "foo",
"doc_count": 2,
"BY_COLOR": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "red",
"doc_count": 2
}
]
}
},
{
"key": "foobar",
"doc_count": 1,
"BY_COLOR": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "green",
"doc_count": 1
}
]
}
}
]
}
}
}
Note that we are using enum values for the aggregation names. Here is the enum we are using:
enum class MyAggNames {
BY_COLOR,
BY_TAG,
BY_DATE,
MIN_TIME,
MAX_TIME,
TIME_SPAN,
SPAN_STATS,
TAG_CARDINALITY,
TOP_RESULTS,
}
You can also use string literals of course.
As you can see from the captured output, parsing this to a type safe structure is a bit of a challenge because the response mixes aggregation names that we specified with aggregation query specific objects.
So, coming up with a model class that captures that is a challenge. The solution for this has to
be a bit more complicated than that.
However, kotlinx.serialization
gives us a way out in the form of the schema less JsonObject
and the
ability to deserialize those into custom model classes. In this case we have TermsAggregationResult
and
TermsBucket
classes that we can use for picking apart a terms aggregation using extension functions.
// response.aggregations is a JsonObject?
// termsResult(name) extracts a TermsAggregationResult from there
val tags = response.aggregations.termsResult(MyAggNames.BY_TAG)
println("Number of buckets: " + tags.buckets.size)
// since buckets can contain sub aggregations, those too are JsonObjects
// buckets is a List<JsonObject>
tags.buckets.forEach { jsonObject ->
// but we can parse those to a TermsBucket
// with another extension function
val tb = jsonObject.parse<TermsBucket>()
println("${tb.key}: ${tb.docCount}")
// and we can get to the named sub aggregations from jsonObject
val colors = jsonObject.termsResult(MyAggNames.BY_COLOR)
// you can also use parsedBuckets to the type safe TermsBucket
colors.buckets.forEach { colorBucketObject ->
val tb = colorBucketObject.parse<TermsBucket>()
println(" ${tb.key}: ${tb.docCount}")
}
}
This prints:
Number of buckets: 3
bar: 2
green: 1
red: 1
foo: 2
red: 2
foobar: 1
green: 1
With some more extension function magic we can make this a bit nicer.
val tags = response.aggregations.termsResult(MyAggNames.BY_TAG)
// use parsedBucket to get a Bucket<TermsBucket>
// this allows us to get to the TermsBucket and the aggregations
tags.parsedBuckets.forEach { tagBucket ->
println("${tagBucket.parsed.key}: ${tagBucket.parsed.docCount}")
tagBucket.aggregations
.termsResult(MyAggNames.BY_COLOR)
.parsedBuckets.forEach { colorBucket ->
println(" ${colorBucket.parsed.key}: ${colorBucket.parsed.docCount}")
}
}
This prints:
bar: 2
green: 1
red: 1
foo: 2
red: 2
foobar: 1
green: 1
Here is a more complicated example where we use various other aggregations.
Note, we do not support all aggregations currently but it’s easy to add support for more as needed. Pull requests for this are welcome.
val response = repo.search {
resultSize = 0 // we only care about the aggs
agg(MyAggNames.BY_DATE, DateHistogramAgg(MockDoc::timestamp) {
calendarInterval = "1d"
})
agg(MyAggNames.BY_COLOR, TermsAgg(MockDoc::color)) {
agg(MyAggNames.MIN_TIME, MinAgg(MockDoc::timestamp))
agg(MyAggNames.MAX_TIME, MaxAgg(MockDoc::timestamp))
// this is a cool way to calculate duration
agg(MyAggNames.TIME_SPAN, BucketScriptAgg {
script = "params.max - params.min"
bucketsPath = BucketsPath {
this["min"] = MyAggNames.MIN_TIME
this["max"] = MyAggNames.MAX_TIME
}
})
// throw in a top_hits aggregation as well
agg(MyAggNames.TOP_RESULTS, TopHitsAgg())
}
// we can do some stats on the calculated duration!
agg(MyAggNames.SPAN_STATS, ExtendedStatsBucketAgg {
bucketsPath = "${MyAggNames.BY_COLOR}>${MyAggNames.TIME_SPAN}"
})
agg(MyAggNames.TAG_CARDINALITY, CardinalityAgg(MockDoc::tags))
}
// date_histogram works very similar to the terms aggregation
response.aggregations.dateHistogramResult(MyAggNames.BY_DATE)
.parsedBuckets.map { it.parsed }.forEach { db ->
println("${db.keyAsString}: ${db.docCount}")
}
// We have extension functions for picking apart
// each of the aggregation results.
response.aggregations.termsResult(MyAggNames.BY_COLOR).buckets.forEach { b ->
val tb = b.parse<TermsBucket>()
println("${tb.key}: ${tb.docCount}")
println(" Min: ${b.minResult(MyAggNames.MIN_TIME).value}")
println(" Max: ${b.maxResult(MyAggNames.MAX_TIME).value}")
println(" Time span: ${b.bucketScriptResult(MyAggNames.TIME_SPAN).value}")
// top_hits returns the hits part of a normal search response
println(
" Top: [${
b.topHitResult(MyAggNames.TOP_RESULTS)
.hits.hits.map {
it.source?.parse<MockDoc>()?.name
}.joinToString(",")
}]"
)
}
println(
"Avg time span: ${
response.aggregations
.extendedStatsBucketResult(MyAggNames.SPAN_STATS).avg
}"
)
println(
"Tag cardinality: ${
response.aggregations.cardinalityResult(MyAggNames.TAG_CARDINALITY).value
}"
)
This prints:
2024-11-29T00:00:00.000Z: 1
2024-11-30T00:00:00.000Z: 0
2024-12-01T00:00:00.000Z: 0
2024-12-02T00:00:00.000Z: 0
2024-12-03T00:00:00.000Z: 0
2024-12-04T00:00:00.000Z: 1
2024-12-05T00:00:00.000Z: 0
2024-12-06T00:00:00.000Z: 0
2024-12-07T00:00:00.000Z: 0
2024-12-08T00:00:00.000Z: 1
2024-12-09T00:00:00.000Z: 1
green: 2
Min: 1.732890746439E12
Max: 1.733754746439E12
Time span: 8.64E8
Top: [1,4]
red: 2
Min: 1.733322746439E12
Max: 1.733668346439E12
Time span: 3.456E8
Top: [2,3]
Avg time span: 6.048E8
Tag cardinality: 3
You can use the filter aggregation to narrow down the results and do sub aggregations on the filtered results.
repo.search {
resultSize = 0
agg("filtered", FilterAgg(this@search.term(MockDoc::tags, "foo"))) {
agg("colors", TermsAgg(MockDoc::color))
}
}.let {
it.aggregations.filterResult("filtered")?.let { fb ->
println("filtered: ${fb.docCount}")
fb.bucket.termsResult("colors")
.parsedBuckets
.forEach { b ->
println("${b.parsed.key}: ${b.parsed.docCount}")
}
}
}
This prints:
filtered: 2
red: 2
You can also use the filters aggregation to use multiple named filter aggregations at the same time
repo.search {
resultSize = 0
agg("filtered", FiltersAgg {
namedFilter("foo", this@search.term(MockDoc::tags, "foo"))
namedFilter("bat", this@search.term(MockDoc::tags, "bar"))
}) {
agg("colors", TermsAgg(MockDoc::color))
}
}.let {
it.aggregations
.filtersResult("filtered")
.namedBuckets.forEach { fb ->
println("${fb.name}: ${fb.docCount}")
println(fb.bucket.termsResult("colors")
.parsedBuckets.joinToString(", ") { b ->
b.parsed.key + ": " + b.parsed.docCount
})
}
}
Like with the Search DSL, we do not provide exhaustive coverage of all the features and instead provide implementations for commonly used aggregations and make it really easy to extend this.
So, if you get stuck without support for something you need, you should be able to fix it easily yourself.
You can add your own classes and extension functions to deal with aggregation results. We’ll illustrate that by showing how the Terms aggregation works:
// let's do a simple terms aggregation
val response = client.search(indexName) {
// we don't care about the results here
resultSize = 0
agg(MyAggNames.BY_TAG, TermsAgg("tags"))
}
// the termsResult function we used before is short for this:
val tags = response.aggregations
.getAggResult<TermsAggregationResult>(MyAggNames.BY_TAG)
We need two classes for picking apart terms aggregation results:
@Serializable
data class TermsBucket(
val key: String,
@SerialName("doc_count")
val docCount: Long,
)
@Serializable
data class TermsAggregationResult(
@SerialName("doc_count_error_upper_bound")
val docCountErrorUpperBound: Long,
@SerialName("sum_other_doc_count")
val sumOtherDocCount: Long,
override val buckets: List<JsonObject>
) : BucketAggregationResult<TermsBucket>
The genericBucketAggregationResult
interface that we implement looks like this:
interface BucketAggregationResult<T> {
val buckets: List<JsonObject>
}
This should be implemented on all bucket aggregations. The T
parameter allows us to deserialize the
bucket JsonObject
instances easily with either parse
or parsedBuckets
, which is an extension function
on BucketAggregationResult
.
Now all that’s left to do is add some extension functions:
val TermsAggregationResult.parsedBuckets get() = buckets.map { Bucket(it, TermsBucket.serializer()) }
parsedBuckets
is an extension property on TermsAggregationResult that returns a Bucket
Using that, we can add some more convenience:
// counts as a map of key to count
fun List<TermsBucket>.counts() =
this.associate { it.key to it.docCount }
// extract terms result by name
fun Aggregations?.termsResult(
name: String,
json: Json = DEFAULT_JSON
): TermsAggregationResult =
getAggResult(name, json)
// and by enum value
fun Aggregations?.termsResult(
name: Enum<*>,
json: Json = DEFAULT_JSON
): TermsAggregationResult =
getAggResult(name, json)
For more examples for other aggregations, refer to the source code.
KT Search Manual | Previous: Specialized Queries | Next: Deep Paging Using search_after and scroll |
Github | © Jilles van Gurp |