WYSIWYG

http://kufli.blogspot.com
http://github.com/karthik20522

Tuesday, April 14, 2015

ReIndexing Elasticsearch in Scala

The following scala script reads from one index and writes to another script using Scan and scroll method. The script also takes in a partial function where the values from one index can be manipulated before saving into another index. This script assumes you have a field called "id" and an field called "submitDate" so it can continually perform scan and scroll once the preliminary index copy is done, so keep the index's in sync.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.json4s._
import org.json4s.JsonDSL._
import spray.client.pipelining._
import spray.http._
import spray.httpx._
import spray.httpx.encoding._
import scala.concurrent._
import scala.concurrent.duration._
import akka.actor._
import scala.collection.mutable.ListBuffer
import scala.annotation.tailrec
 
class ElasticsearchReIndexerActor(esInputHost: String,
  esOutputHost: String, 
  inputIndex: String,
  outputIndex: String,
  indexType: String,
  processData: (JValue) => JValue) extends Actor {
 
  import context.dispatcher  
 
  val ouputIndexClient = new ESClient(s"http://$esOutputHost", outputIndex, indexType, context)
  val pipeline = addHeader("Accept-Encoding", "gzip") ~> sendReceive ~> decode(Gzip) ~> unmarshal[HttpResponse]
  var lastUpdateDateTime: String = "1900-01-01"
 
  def receive = {
 case "init" => {
   val scanId: String = (Await.result(getScanId(lastUpdateDateTime), 60 seconds) \\ "_scroll_id").extract[String]
   self ! scanId
 }
 case scanId: String => iterateData(scanId)
 case ReceiveTimeout => self ! "init"
  }
 
  def getScanId(startDate: String): Future[JValue] = {
 println("Query data with date gte: " + lastUpdateDateTime)
 
 val esQuery = "{\"query\":{\"bool\":{\"must\":[{\"range\":{\"submitData\":{\"gte\":\"" + lastUpdateDateTime + "\"}}}]}}}"
 val esURI = s"http://$esInputHost/$inputIndex/$indexType/_search?search_type=scan&scroll=5m&size=50"
 val esResponse: Future[HttpResponse] = pipeline(Post(esURI, esQuery))
  
 esResponse.map(r => { parse(r.entity.asString) })
  }
 
  def iterateData(scanId: String) = {
 val scrollData = ("scroll_id" -> scanId)
 val esURI = Uri(s"http://$esInputHost/_search/scroll?scroll=5m")
 val esResponse: HttpResponse = Await.result(pipeline(Post(esURI, scanId)), 60 seconds)
 val responseData: JValue = ModelJsonHelper.toJValue(esResponse.entity.asString)
 val bulkList = new ListBuffer[JValue]()
 
 val bulkData: ListBuffer[JValue] = (responseData \ "hits" \ "hits" \ "_source") match {
   case JNothing | JNull => throw new Exception("Result set is empty")
   case JArray(dataList) => {
  dataList.foreach { data =>
    val id = (data \ "id").extract[String]
    val bulkIndexType = ("index" -> (("_index" -> outputIndex) ~
   ("_type" -> indexType) ~ ("_id" -> id)))
    bulkList += bulkIndexType
    bulkList += processData(data)
  }
  bulkList
   }
   case x => throw new Exception("UNKNWON TYPE: " + x);
 }
 val bulkResponse: SearchQueryResponse = Await.result(ouputIndexClient.bulk(bulkList.toList), 60 seconds)
  
 (responseData \\ "_scroll_id") match {
   case JNothing | JNull => {
  lastUpdateDateTime = DateTime.now.toString
  context.setReceiveTimeout(1.minute)
  println("Paused at: " + lastUpdateDateTime)
   }
   case x => self ! x.extract[String]
 }
  
}
Notes:
  • The ESClient is an extension of on wabisabi Library for elasticsearch
  • The Actor initially performs a scan-scroll with submit date gte 1900
  • Once the initial scan-scroll is done, it pauses for a minute and performs a scan-scroll again with the submitDate of previous endTime (dateTime.now minus 1 minute)
  • This way every minute after the previous run it will continually keep the index in sync
  • The partial function "processData" provides a way to manipulate the original data, manipulate it and save it to the new index
  • Bulk-indexing is used for saving to the new index, hence a the "id" field is required to determine the "id" of the new document
Usage:
1
2
3
4
5
6
7
8
9
10
11
12
val esReindexActor = system.actorOf(Props(new ElasticsearchReIndexerActor(
   "localhost:9200",
   "localhost:9200",
   "inputIndex",
   "outputIndex",
   "someType",
   doNothing)), name = "esReIndexer")
 
esReindexActor ! "init"
 }
 
  def doNothing(data: JValue): JValue = data

Labels: , ,