Merging two Elasticsearch indices based on common key

Martin Beranek
3 min readApr 23, 2023
DALL-E idea of merging data based on common key

I recently stumbled upon an assignment to merge two indices based on a shared key in each document. I was surprised that the answer to my problem seemed not simple. With heavy help from Google, ChatGPT and Copilot, I figured out a solution which does somewhat what I was looking for. Hopefully, you find this post when struggling with the same issue and let me know if there is something better to do about it.

Data

Let’s have following data in index test1:

POST test1/_doc
{
"a": 1,
"b": 2
}
POST test1/_doc
{
"a": 11,
"b": 22
}

And second index test2:

POST test2/_doc
{
"a": 1,
"c": 3
}
POST test2/_doc
{
"a": 11,
"c": 33
}

Now let’s create an index test3 to which we merge data from test1 and test2. Each document will contain data from both indices. The key on which we merge the data will be a. Therefore correct result will look like this:

[
{
"a": 1,
"b": 2,
"c": 3
},
{
"a": 11,
"b": 22,
"c": 33
}
]

Enrich policy

Elasticsearch has a feature to enrich the documents during the ingestion. We will use that for matching each document with the key a. That will merge the data as we wanted.

Let’s prepare the enrich policy:

PUT _enrich/policy/merge
{
"match": {
"indices": "test2",
"match_field": "a",
"enrich_fields": ["c"]
}
}
POST /_enrich/policy/merge-test/_execute

Now let’s create an ingestion pipeline and see the result:

PUT _ingest/pipeline/enrich
{
"processors": [
{
"enrich": {
"description": "Add 'c' data based on 'a'",
"policy_name": "merge",
"field": "a",
"target_field": "new",
"max_matches": "1"
}
}
]
}
POST _reindex
{
"source": {
"index": "test1"
},
"dest": {
"index": "test5",
"pipeline": "enrich"
}
}
GET /test5/_search
{
...
"hits": {
...
"hits": [
{
...
"_source": {
"a": 1,
"new": {
"a": 1,
"c": 3
},
"b": 2
}
},
{
...
"_source": {
"a": 11,
"new": {
"a": 11,
"c": 33
},
"b": 22
}
}
]
}
}

That’s not exactly what we wanted. Now we have a new field called new. Honestly, I feel that I missed something. target_field attribute where we mention the new field is required by Elasticsearch. Therefore we can’t just remove it and hope for a pipeline to merge data correctly.

To avoid the new field, I had to find a small script to use.

A script

We can use painless with no hardcode programming for the script. We check if the new field new is present. Once there, we will put it into the document and remove the field new afterwards. Everything could be done in the same ingest pipeline.

PUT _ingest/pipeline/enrich_and_flatten_new_field
{
"processors": [
{
"enrich": {
"description": "Add 'c' data based on 'a'",
"policy_name": "merge",
"field": "a",
"target_field": "new",
"max_matches": "1"
}
},
{
"script": {
"source": """
if (ctx.new != null) {
ctx.putAll(ctx.new);
ctx.remove("new");
}
"""
}
}
]
}

Let’s reindex one more time and check our data:

POST _reindex
{
"source": {
"index": "test1"
},
"dest": {
"index": "test5",
"pipeline": "enrich_and_flatten_new_field"
}
}
GET /test5/_search
{
...
"hits": {
...
"hits": [
{
...
"_source": {
"a": 1,
"b": 2,
"c": 3
}
},
{
...
"_source": {
"a": 11,
"b": 22,
"c": 33
}
}
]
}
}

And that’s the result we were looking for.

Summary

As you can see, the final pipeline is not as simple as an example like this should be. I was thinking I can fetch data from different indices in the script, but that would make the thing more convoluted as it already is. Anyway, if you’d be able to find something easier, let me know. Maybe I just missed something.

And how all of those Youtube people say: like & subscribe.

--

--

Martin Beranek

I am an Infra Team Lead at Shipmonk. My interest is Terraform mainly in GCP. I am also enthusiastic about backend and related topics: Golang, Typescript, ...