Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of custom Coders #319

Closed
ElPicador opened this issue Oct 31, 2016 · 15 comments
Closed

Support of custom Coders #319

ElPicador opened this issue Oct 31, 2016 · 15 comments
Assignees
Labels
bug Something isn't working

Comments

@ElPicador
Copy link
Contributor

It seems, with scio 0.2.6, that custom coders are not supported. I did:

val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.pipeline.getCoderRegistry.registerCoder(classOf[Log], classOf[LogCoder])
sc.pubsubTopic(...).etc

And when profiling my job, I don't see LogCoder, only the default KryoAtomicCoder

@andrewsmartin andrewsmartin added the bug Something isn't working label Oct 31, 2016
@andrewsmartin andrewsmartin self-assigned this Oct 31, 2016
@nevillelyh
Copy link
Contributor

pubsubTopic returns SCollection[String], do you have a .map afterwards that converts them to Log?

Also is it possible to provide code for Log and LogCoder so we can look into it?

@ElPicador
Copy link
Contributor Author

Yes you are right, I wanted to simplify the code example, and I removed the map.

The LogCoder is only a big copy/pasting of KryoAtomicCoder

Here is the code, I only removed some fields in Log:

case class Log(timestamp: Instant,
               ip: String)

The LogCoder:

class LogCoder extends AtomicCoder[Log] {

  @transient
  private lazy val kryo: ThreadLocal[Kryo] = new ThreadLocal[Kryo] {
    override def initialValue(): Kryo = {
      val k = KryoSerializer.registered.newKryo()
      k.register(classOf[Log], new LogKryoSerializer)
      k
    }
  }
  val classOfLog = classOf[Log]

  override def encode(value: Log, outStream: OutputStream, context: Context): Unit = {
    if (value == null) {
      throw new CoderException("cannot encode a null value")
    }
    if (context.isWholeStream) {
      val output = new Output(outStream)
      kryo.get().writeClassAndObject(output, value)
      output.flush()
    } else {
      val s = new ByteArrayOutputStream()
      val output = new Output(s)
      kryo.get().writeClassAndObject(output, value)
      output.flush()
      s.close()

      VarInt.encode(s.size(), outStream)
      outStream.write(s.toByteArray)
    }
  }

  override def decode(inStream: InputStream, context: Context): Log = {
    if (context.isWholeStream) {
      kryo.get().readObject(new Input(inStream), classOfLog)
    } else {
      val length = VarInt.decodeInt(inStream)
      if (length < 0) {
        throw new IOException("invalid length " + length)
      }

      val value = Array.ofDim[Byte](length)
      ByteStreams.readFully(inStream, value)
      kryo.get().readObject(new Input(value), classOfLog)
    }
  }

}

object LogAtomicCoder {
  def of: LogAtomicCoder = new LogAtomicCoder
}

final class LogKryoSerializer extends Serializer[Log] {
  setImmutable(true)

  def write(kryo: Kryo, output: Output, log: Log): Unit = {
    kryo.writeClassAndObject(output, log.timestamp)
    output.writeString(log.ip)
  }

  def read(kryo: Kryo, input: Input, `type`: Class[Log]): Log = {
    new Log(timestamp = kryo.readObject(input, classOf[Instant]),
            ip = input.readString)
  }
}

@andrewsmartin
Copy link
Contributor

@ElPicador I'm having trouble reproducing this; when I use this code in a test, I can see that on an SCollection[Log] the coder is indeed a LogCoder. Are you sure that there aren't any problems when you register the coder? I'll note that I did have to rename LogAtomicCoder to LogCoder.

@ElPicador
Copy link
Contributor Author

ElPicador commented Nov 2, 2016

@andrewsmartin: Sorry, my mistake. It is indeed LogAtomicCoder. I have to take course on copy/pasting.

We haven't tried it in tests, only directly on Dataflow. And when profiling the job (for more than 1 hour), we do not see the class LogAtomicCoder being used, only KryoAtomicCoder. And there is no exceptions in the logs

@ElPicador ElPicador reopened this Nov 2, 2016
@ElPicador
Copy link
Contributor Author

I tried to override the fallback coder with, and I still see KryoAtomicCoder being used 😕 .

I might do something bad. Is there an example of using specific coders somewhere?

I tried with this.

sc.pipeline.getCoderRegistry.setFallbackCoderProvider(new CoderProvider {
      override def getCoder[T](`type`: TypeDescriptor[T]): Coder[T] = {
        null
      }
    })

@ElPicador
Copy link
Contributor Author

After some testing, it works locally but not on Dataflow :/

@andrewsmartin
Copy link
Contributor

andrewsmartin commented Nov 2, 2016

@ElPicador that's interesting - perhaps there are differences to the coder inference process when using the Direct / InProcessPipelineRunner vs DataflowRunner but that doesn't seem likely. How are you submitting your job? And how are you checking what Coders are being used on Dataflow?

@ElPicador
Copy link
Contributor Author

I'm submitting my job with sbt and run-main MyClass --projectId=...

On local I'm checking with a breakpoint.
On dataflow I'm using --enableProfilingAgent and then look at all the classes that are used.

@andrewsmartin
Copy link
Contributor

@ElPicador ok, so I just tried with your "null" coder provider, and I actually get back KryoAtomicCoder as well, even when I run locally. This is expected because of this logic here: https://github.com/spotify/scio/blob/master/scio-core/src/main/scala/com/spotify/scio/Implicits.scala#L74

Above when it tries to get the default coder, because your fallback coder is null it just goes to Kryo anyway. Can you try this with a non-null coder?

@ElPicador
Copy link
Contributor Author

I did try with a non null coder. And it works locally but not on dataflow

@andrewsmartin
Copy link
Contributor

@ElPicador You're still talking about setting the fallback coder, right? I'm still unable to reproduce after changing the fallback coder to be your LogCoder like this:

sc.pipeline.getCoderRegistry.setFallbackCoderProvider(new CoderProvider {
    override def getCoder[T](`type`: TypeDescriptor[T]): Coder[T] = {
        LogCoder.of.asInstanceOf[Coder[T]]
    }
})

I then looked at the worker logs for the job and saw that LogCoder was indeed being used.
Do you have a minimal code sample you can share? Are you doing something different with the fallback coder provider?

@ElPicador
Copy link
Contributor Author

My answers were not clear enough. I was talking about using only sc.pipeline.getCoderRegistry.registerCoder(classOf[Log], classOf[LogCoder]), without changing the FallbackCoderProvider

@andrewsmartin
Copy link
Contributor

@ElPicador I am unable to reproduce this. What exactly is the --enableProfilingAgent you're using, is that something dataflow provides (cannot find any info) or is it a jvm tool?

I added logging statements to LogCoder.encode() and after submitting the job and checking the worker logs, the log statements I expected to see were there - so this works for me.

@ElPicador
Copy link
Contributor Author

It's an option to profile the jobs, it's official see: GoogleCloudPlatform/DataflowJavaSDK#72

I'll continue to test on my side then

@ElPicador
Copy link
Contributor Author

I can't reproduce my issue, I'm closing this. And sorry for the noise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants