import org.json4s._
import org.json4s.JsonDSL._
import spray.client.pipelining._
import spray.http._
import spray.httpx._
import spray.httpx.encoding._
import scala.concurrent._
import scala.concurrent.duration._
import akka.actor._
import scala.collection.mutable.ListBuffer
import scala.annotation.tailrec
class
ElasticsearchReIndexerActor(esInputHost: String,
esOutputHost: String,
inputIndex: String,
outputIndex: String,
indexType: String,
processData: (JValue) => JValue) extends Actor {
import context.dispatcher
val ouputIndexClient =
new
ESClient(s
"http://$esOutputHost"
, outputIndex, indexType, context)
val pipeline = addHeader(
"Accept-Encoding"
,
"gzip"
) ~> sendReceive ~> decode(Gzip) ~> unmarshal[HttpResponse]
var lastUpdateDateTime: String =
"1900-01-01"
def receive = {
case
"init"
=> {
val scanId: String = (Await.result(getScanId(lastUpdateDateTime), 60 seconds) \\
"_scroll_id"
).extract[String]
self ! scanId
}
case
scanId: String => iterateData(scanId)
case
ReceiveTimeout => self !
"init"
}
def getScanId(startDate: String): Future[JValue] = {
println(
"Query data with date gte: "
+ lastUpdateDateTime)
val esQuery =
"{\"query\":{\"bool\":{\"must\":[{\"range\":{\"submitData\":{\"gte\":\""
+ lastUpdateDateTime +
"\"}}}]}}}"
val esURI = s
"http://$esInputHost/$inputIndex/$indexType/_search?search_type=scan&scroll=5m&size=50"
val esResponse: Future[HttpResponse] = pipeline(Post(esURI, esQuery))
esResponse.map(r => { parse(r.entity.asString) })
}
def iterateData(scanId: String) = {
val scrollData = (
"scroll_id"
-> scanId)
val esURI = Uri(s
"http://$esInputHost/_search/scroll?scroll=5m"
)
val esResponse: HttpResponse = Await.result(pipeline(Post(esURI, scanId)), 60 seconds)
val responseData: JValue = ModelJsonHelper.toJValue(esResponse.entity.asString)
val bulkList =
new
ListBuffer[JValue]()
val bulkData: ListBuffer[JValue] = (responseData \
"hits"
\
"hits"
\
"_source"
) match {
case
JNothing | JNull =>
throw
new
Exception(
"Result set is empty"
)
case
JArray(dataList) => {
dataList.
foreach
{ data =>
val id = (data \
"id"
).extract[String]
val bulkIndexType = (
"index"
-> ((
"_index"
-> outputIndex) ~
(
"_type"
-> indexType) ~ (
"_id"
-> id)))
bulkList += bulkIndexType
bulkList += processData(data)
}
bulkList
}
case
x =>
throw
new
Exception(
"UNKNWON TYPE: "
+ x);
}
val bulkResponse: SearchQueryResponse = Await.result(ouputIndexClient.bulk(bulkList.toList), 60 seconds)
(responseData \\
"_scroll_id"
) match {
case
JNothing | JNull => {
lastUpdateDateTime = DateTime.now.toString
context.setReceiveTimeout(1.minute)
println(
"Paused at: "
+ lastUpdateDateTime)
}
case
x => self ! x.extract[String]
}
}
}