3

I have a Kafka Topic wit JSON data. Now im trying to send those JSON strings to an ES topic using the new "Java API Client" (https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/index.html), but im running into a parser exception:

co.elastic.clients.elasticsearch._types.ElasticsearchException: [es/index] failed: [mapper_parsing_exception] failed to parse
at co.elastic.clients.transport.rest_client.RestClientTransport.getHighLevelResponse(RestClientTransport.java:281)
at co.elastic.clients.transport.rest_client.RestClientTransport.performRequest(RestClientTransport.java:147)
at co.elastic.clients.elasticsearch.ElasticsearchClient.index(ElasticsearchClient.java:953)

This exception occurs in the last line of the following code:

final IndexRequest<String> request =
          new IndexRequest.Builder<String>()
              .index("myIndex")
              .id(String.valueOf(UUID.randomUUID()))
              .document(consumerRecord.value()) //already serialized json data
              .build();
elasticsearchClient.index(request);

As far as I understand this exception occurs, because the ES client tries to serialize the data im providing, which is already serialized, resulting in a malformed JSON string.

Is there anyway to get around this and just send simple JSON strings? Also I believe this was possible with the earlier "Low Level Java Library", right? And yes, I know there are ways to allow communication between Kafka and ES without writing a Consumer.

Thanks for any hints.

3 Answers 3

3

If you use a JacksonJsonpMapper when creating your ElasticsearchTransport, you can use a custom PreserializedJson class to send already-serialized JSON.

ElasticsearchTransport transport = new RestClientTransport(
    createLowLevelRestClient(), // supply your own!
    new JacksonJsonpMapper()
);

ElasticsearchClient client = new ElasticsearchClient(transport);

IndexResponse response = client.index(indexReq -> indexReq
    .index("my-index")
    .id("docId")
    .document(new PreserializedJson("{\"foo\":\"bar\"}"))
);
System.out.println(response);

Here is the source for PreserializedJson:

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import static java.util.Objects.requireNonNull;

@JsonSerialize(using = PreserializedJson.Serializer.class)
public class PreserializedJson {
  private final String value;

  public PreserializedJson(String value) {
    this.value = requireNonNull(value);
  }

  public PreserializedJson(byte[] value) {
    this(new String(value, StandardCharsets.UTF_8));
  }

  public static class Serializer extends StdSerializer<PreserializedJson> {
    public Serializer() {
      super(PreserializedJson.class);
    }

    @Override
    public void serialize(PreserializedJson value, JsonGenerator gen, SerializerProvider provider) throws IOException {
      gen.writeRaw(value.value);
    }
  }
}
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks, even though it's been some time I dug up the project and tried your approach. It's working as advertised! :)
@wolle271 I updated this answer to use a custom PerserializedJson class instead of Jackson's SerializedString. After some more testing, it looks like using SerializedString didn't really work, since it wraps the argument in a "value" node and serializes the argument as a String.
0

I solved the problem by substituting "Java API Client" (https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/introduction.html) with "Java Low Level Rest Client" (https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/java-rest-low.html).

This Library allows sending of arbitrary JSON-Strings to ES:

  final Request request = new Request("POST", "/twitter/_doc");
  request.setJsonEntity(record.value());
  restClient.performRequest(request);

Comments

0

With the new API Client, you can natively insert raw json into it. As specified here : Using raw json data

IndexRequest<JsonData> request = IndexRequest.of(i -> i
    .index("logs")
    .withJson(input)
);

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.