diff --git a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala index 3e833c1..11c763d 100644 --- a/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala +++ b/src/main/scala/org/embulk/output/fluentd/sender/Sender.scala @@ -1,6 +1,10 @@ package org.embulk.output.fluentd.sender +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit + import akka._ +import akka.io.Inet.SO.ReuseAddress import akka.pattern.ask import akka.stream._ import akka.stream.scaladsl._ @@ -15,7 +19,7 @@ trait Sender { def close(): Unit val instance: SourceQueueWithComplete[Seq[Map[String, AnyRef]]] def apply(value: Seq[Map[String, AnyRef]]): Future[QueueOfferResult] - def tcpHandling(size: Int, byteString: ByteString): Future[Done] + def sendCommand(size: Int, byteString: ByteString): Future[Done] def waitForComplete(): Result } @@ -87,21 +91,27 @@ case class SenderImpl private[sender] (host: String, withThrottle .mapAsync(asyncSize) { case (size, byteString) => - tcpHandling(size, byteString) + sendCommand(size, byteString) } .to(Sink.ignore) .run() } - def sendCommand(byteString: ByteString): Future[Done] = - Source - .single(byteString) - .via(senderFlow.tcpConnectionFlow(host, port)) - .runWith(Sink.ignore) + val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection( + InetSocketAddress.createUnresolved(host, port), + None, + List(ReuseAddress(false)), + halfClose = true, + Duration(3, TimeUnit.MINUTES), + Duration(3, TimeUnit.MINUTES) + ) - def tcpHandling(size: Int, byteString: ByteString): Future[Done] = { - def _tcpHandling(size: Int, byteString: ByteString, c: Int)(retried: Boolean): Future[Done] = { - val futureCommand = sendCommand(byteString) + def sendCommand(size: Int, byteString: ByteString): Future[Done] = { + val command = Source + .single(byteString) + .via(connection) + def _sendCommand(size: Int, c: Int)(retried: Boolean): Future[Done] = { + val futureCommand = command.runWith(Sink.ignore) futureCommand.onComplete { case Success(_) => actorManager.supervisor ! Complete(size) @@ -110,8 +120,8 @@ case class SenderImpl private[sender] (host: String, s"Sending fluentd ${size.toString} records was failed. - will retry ${c - 1} more times ${retryDelayIntervalSecondDuration.toSeconds} seconds later.", e) actorManager.supervisor ! Retried(size) - akka.pattern.after(retryDelayIntervalSecondDuration, actorManager.system.scheduler)( - _tcpHandling(size, byteString, c - 1)(retried = true)) + Thread.sleep(retryDelayIntervalSecondDuration.toSeconds) + _sendCommand(size, c - 1)(retried = true) case Failure(e) => actorManager.supervisor ! Failed(size) logger.error( @@ -122,7 +132,7 @@ case class SenderImpl private[sender] (host: String, } futureCommand } - _tcpHandling(size, byteString, retryCount)(retried = false).recoverWith { + _sendCommand(size, retryCount)(retried = false).recoverWith { case _: Exception => Future.successful(Done) } diff --git a/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala b/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala index 94e6d10..6c55b9e 100644 --- a/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala +++ b/src/main/scala/org/embulk/output/fluentd/sender/SenderFlow.scala @@ -11,8 +11,6 @@ import scala.concurrent.Future trait SenderFlow { val msgPackFlow: Flow[Seq[Seq[Map[String, AnyRef]]], (Int, ByteString), NotUsed] - def tcpConnectionFlow(host: String, port: Int)( - implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] } case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyOpt: Option[String]) @@ -29,8 +27,4 @@ case class SenderFlowImpl private[sender] (tag: String, unixtime: Long, timeKeyO } (packing.size, ByteString(MsgPack.pack(Seq(tag, packing)))) } - override def tcpConnectionFlow(host: String, port: Int)( - implicit s: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - Tcp().outgoingConnection(host, port) - }