ReIndexing Elasticsearch in Scala
The following scala script reads from one index and writes to another script using Scan and scroll method. The script also takes in a partial function where the values from one index can be manipulated before saving into another index. This script assumes you have a field called "id" and an field called "submitDate" so it can continually perform scan and scroll once the preliminary index copy is done, so keep the index's in sync.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | import org.json4s._ import org.json4s.JsonDSL._ import spray.client.pipelining._ import spray.http._ import spray.httpx._ import spray.httpx.encoding._ import scala.concurrent._ import scala.concurrent.duration._ import akka.actor._ import scala.collection.mutable.ListBuffer import scala.annotation.tailrec class ElasticsearchReIndexerActor(esInputHost: String, esOutputHost: String, inputIndex: String, outputIndex: String, indexType: String, processData: (JValue) => JValue) extends Actor { import context.dispatcher val pipeline = addHeader( "Accept-Encoding" , "gzip" ) ~> sendReceive ~> decode(Gzip) ~> unmarshal[HttpResponse] var lastUpdateDateTime: String = "1900-01-01" def receive = { case "init" => { val scanId: String = (Await.result(getScanId(lastUpdateDateTime), 60 seconds) \\ "_scroll_id" ).extract[String] self ! scanId } case scanId: String => iterateData(scanId) case ReceiveTimeout => self ! "init" } def getScanId(startDate: String): Future[JValue] = { println( "Query data with date gte: " + lastUpdateDateTime) val esQuery = "{\"query\":{\"bool\":{\"must\":[{\"range\":{\"submitData\":{\"gte\":\"" + lastUpdateDateTime + "\"}}}]}}}" val esURI = s "http://$esInputHost/$inputIndex/$indexType/_search?search_type=scan&scroll=5m&size=50" val esResponse: Future[HttpResponse] = pipeline(Post(esURI, esQuery)) esResponse.map(r => { parse(r.entity.asString) }) } def iterateData(scanId: String) = { val scrollData = ( "scroll_id" -> scanId) val esResponse: HttpResponse = Await.result(pipeline(Post(esURI, scanId)), 60 seconds) val responseData: JValue = ModelJsonHelper.toJValue(esResponse.entity.asString) val bulkList = new ListBuffer[JValue]() val bulkData: ListBuffer[JValue] = (responseData \ "hits" \ "hits" \ "_source" ) match { case JNothing | JNull => throw new Exception( "Result set is empty" ) case JArray(dataList) => { dataList. foreach { data => val id = (data \ "id" ).extract[String] val bulkIndexType = ( "index" -> (( "_index" -> outputIndex) ~ ( "_type" -> indexType) ~ ( "_id" -> id))) bulkList += bulkIndexType bulkList += processData(data) } bulkList } case x => throw new Exception( "UNKNWON TYPE: " + x); } val bulkResponse: SearchQueryResponse = Await.result(ouputIndexClient.bulk(bulkList.toList), 60 seconds) (responseData \\ "_scroll_id" ) match { case JNothing | JNull => { lastUpdateDateTime = DateTime.now.toString context.setReceiveTimeout(1.minute) println( "Paused at: " + lastUpdateDateTime) } case x => self ! x.extract[String] } } } |
- The ESClient is an extension of on wabisabi Library for elasticsearch
- The Actor initially performs a scan-scroll with submit date gte 1900
- Once the initial scan-scroll is done, it pauses for a minute and performs a scan-scroll again with the submitDate of previous endTime (dateTime.now minus 1 minute)
- This way every minute after the previous run it will continually keep the index in sync
- The partial function "processData" provides a way to manipulate the original data, manipulate it and save it to the new index
- Bulk-indexing is used for saving to the new index, hence a the "id" field is required to determine the "id" of the new document
1 2 3 4 5 6 7 8 9 10 11 12 | val esReindexActor = system.actorOf(Props( new ElasticsearchReIndexerActor( "localhost:9200" , "localhost:9200" , "inputIndex" , "outputIndex" , "someType" , doNothing)), name = "esReIndexer" ) esReindexActor ! "init" } def doNothing(data: JValue): JValue = data |
Labels: Actors, Elasticsearch, Scala