MergeContent resulting in corrupted JSON

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Darren Govoni
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
What role does Zookeeper have in persistence of flowfile content, and does this relate to how merge content works or check of the content existing? From all observations it appears that merge content is picking up data that is either not written or not completely written to disk. 

Thanks,
Jason

On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <[hidden email]> wrote:
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Joe Witt
Jason

content repo writing isnt related to zk.

you mention a fork of nifi for consume kafka.  have you tried using stock items/albeit without whatever feature you needed so you can narrow in on the problem?

joe

On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <[hidden email]> wrote:
What role does Zookeeper have in persistence of flowfile content, and does this relate to how merge content works or check of the content existing? From all observations it appears that merge content is picking up data that is either not written or not completely written to disk. 

Thanks,
Jason

On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <[hidden email]> wrote:
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Hi Joe,

We've seen the issue with stock ConsumeKafka_2_0 against Nifi 1.10 and 1.11.4.

We've seen this occur at multiple points and all of those points are modifying the content (AttributesToJSON, AttributeToCSV), and really merge content is doing its job and consolidates the records...it just appears to be reading a partially written buffer. With that my theory is that the content isn't yet written to disk, merge content is reading the content and getting partial and in some cases no data.

A summary of what we're seeing is merge content is outputting null entries.

Example:
some,text,goes,here
some,more,text,goes
null null null text,goes
more,test,goes,here

Regarding the fork, its nothing crazy and once I get approvals (from work) see about contributing back.

Thanks,
Jason

On Fri, Jul 3, 2020 at 10:19 AM Joe Witt <[hidden email]> wrote:
Jason

content repo writing isnt related to zk.

you mention a fork of nifi for consume kafka.  have you tried using stock items/albeit without whatever feature you needed so you can narrow in on the problem?

joe

On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <[hidden email]> wrote:
What role does Zookeeper have in persistence of flowfile content, and does this relate to how merge content works or check of the content existing? From all observations it appears that merge content is picking up data that is either not written or not completely written to disk. 

Thanks,
Jason

On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <[hidden email]> wrote:
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Joe Witt
Jason

You mention here you're seeing it happen with AttributeToJson/CSV but in the thread earlier with Mark you mentioned all is well until MergeContent.

I want to make sure we understand precisely what you're seeing.  If you can easily replicate this problem can you do so using GenerateFlowFile as a source and then share a template with precise details on the nifi version/etc..

Mark's comments about using the provenance trail are critical here to focus on.

To rule out MergeContent you could also simple read from kafka and immediately merge content (skip the other bits).

Can you replicate this issue using absolutely nothing custom in your flow?

Running scenarios like this down throughout a long thread can be really hard.  You want help and are trying various things and might assume folks are reading the full thread each time to understand the full context.  It might be best if you overshare for a while in each email explaining the full current context to maximize the odds if things start to be more clear so we can help more effectively.

thanks

On Fri, Jul 3, 2020 at 7:47 AM Jason Iannone <[hidden email]> wrote:
Hi Joe,

We've seen the issue with stock ConsumeKafka_2_0 against Nifi 1.10 and 1.11.4.

We've seen this occur at multiple points and all of those points are modifying the content (AttributesToJSON, AttributeToCSV), and really merge content is doing its job and consolidates the records...it just appears to be reading a partially written buffer. With that my theory is that the content isn't yet written to disk, merge content is reading the content and getting partial and in some cases no data.

A summary of what we're seeing is merge content is outputting null entries.

Example:
some,text,goes,here
some,more,text,goes
null null null text,goes
more,test,goes,here

Regarding the fork, its nothing crazy and once I get approvals (from work) see about contributing back.

Thanks,
Jason

On Fri, Jul 3, 2020 at 10:19 AM Joe Witt <[hidden email]> wrote:
Jason

content repo writing isnt related to zk.

you mention a fork of nifi for consume kafka.  have you tried using stock items/albeit without whatever feature you needed so you can narrow in on the problem?

joe

On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <[hidden email]> wrote:
What role does Zookeeper have in persistence of flowfile content, and does this relate to how merge content works or check of the content existing? From all observations it appears that merge content is picking up data that is either not written or not completely written to disk. 

Thanks,
Jason

On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <[hidden email]> wrote:
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Good point! We are not able to create or reproduce this outside of a specific environment, its occurring with only nifi components, and we've only seen it with flow variations where the content is being modified and written. The provenance shows good data when we view it except for merge content and what my initial subject is asking on merge content (its more so a red herring ). The flow that merge content correlates to (i.e. output from AttributeToCSV) looks fine.


High Level
We are consuming from kafka, extracting certain attributes, encoding the content, and writing to Oracle. During this we're seeing PutDatabaseRecord fail on parsing the content outputted from MergeContent. When inspecting the content of MergeContent we see some null values in some cases that would represent the whole content an in other cases partial content. What's interesting is that its only the tail end of the content that is valid and the nulls are in the beginning.

For example:
some,text,goes,here
some,more,text,goes
null null null text,goes
more,test,goes,here

Cluster Config
  • 3 nodes, each with a RAID6 and encrypted filesystem.
  • Zookeeper, content, etc. are sharing the filesystem (what I'm thinking may be the issue)
  • We are currently running the flow on a single node and now testing running it against all 3 in order to reduce the disk I/O
  • Many cores and ram
  • JVM is oversized at around 32GB
  • Retaining a day in provenance or around 100GB (recent change to triage this)
Data Flow
  1. ConsumeKafka_2_0 - Polling 10k messages at a time, and it is not batching flowfiles due to unique attributes in the kafka headers. This is of concern for throughput and content writing.
  2. Base64EncodeContent
  3. UpdateAttribute - This is adding some expected names, and doing some conversion such as Timestamp to millis
  4. AttributeToCSV
  5. MergeContent - Bin-packing, binary concat, keep only common, 1-1000 bin, max bins of 5
  6. PutDatabaseRecord - Mapping the CSV fields to table columns
Theory
Data is not fully "persisted" and resulting in incomplete read by merge content.

Thanks,
Jason


On Fri, Jul 3, 2020 at 10:57 AM Joe Witt <[hidden email]> wrote:
Jason

You mention here you're seeing it happen with AttributeToJson/CSV but in the thread earlier with Mark you mentioned all is well until MergeContent.

I want to make sure we understand precisely what you're seeing.  If you can easily replicate this problem can you do so using GenerateFlowFile as a source and then share a template with precise details on the nifi version/etc..

Mark's comments about using the provenance trail are critical here to focus on.

To rule out MergeContent you could also simple read from kafka and immediately merge content (skip the other bits).

Can you replicate this issue using absolutely nothing custom in your flow?

Running scenarios like this down throughout a long thread can be really hard.  You want help and are trying various things and might assume folks are reading the full thread each time to understand the full context.  It might be best if you overshare for a while in each email explaining the full current context to maximize the odds if things start to be more clear so we can help more effectively.

thanks

On Fri, Jul 3, 2020 at 7:47 AM Jason Iannone <[hidden email]> wrote:
Hi Joe,

We've seen the issue with stock ConsumeKafka_2_0 against Nifi 1.10 and 1.11.4.

We've seen this occur at multiple points and all of those points are modifying the content (AttributesToJSON, AttributeToCSV), and really merge content is doing its job and consolidates the records...it just appears to be reading a partially written buffer. With that my theory is that the content isn't yet written to disk, merge content is reading the content and getting partial and in some cases no data.

A summary of what we're seeing is merge content is outputting null entries.

Example:
some,text,goes,here
some,more,text,goes
null null null text,goes
more,test,goes,here

Regarding the fork, its nothing crazy and once I get approvals (from work) see about contributing back.

Thanks,
Jason

On Fri, Jul 3, 2020 at 10:19 AM Joe Witt <[hidden email]> wrote:
Jason

content repo writing isnt related to zk.

you mention a fork of nifi for consume kafka.  have you tried using stock items/albeit without whatever feature you needed so you can narrow in on the problem?

joe

On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <[hidden email]> wrote:
What role does Zookeeper have in persistence of flowfile content, and does this relate to how merge content works or check of the content existing? From all observations it appears that merge content is picking up data that is either not written or not completely written to disk. 

Thanks,
Jason

On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <[hidden email]> wrote:
Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see if it detects any memory leaks or dangling unclosed file buffers/io. Throwing darts but the problem could be as deep as the Linux kernel or confined inside the jvm for your specific scenario.

Sent from my Verizon, Samsung Galaxy smartphone


From: Jason Iannone <[hidden email]>
Sent: Tuesday, June 30, 2020 10:36:02 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Previous spotting of the issue was a red herring. We removed our custom code and are still facing random "org.codehaus.jackson.JsonParseException: Illegal Character" during PutDatabaseRecord due to a flowfile containing malformed JSON post MergeContent. Error never occurs immediately and is usually once we've processed several million records. We did a NOOP run, which was ConsumeKafka -> UpdateCounter and everything seemed ok.

Here's the current form of the flow:
  1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some containing binary data
    1. I have a fork of nifi with changes to allow base64 and hex encoding of select nifi headers.
    2. Next test will be without pulling any headers
  2. RouteOnAttribute - Validate attributes
  3. Base64EncodeContent - Content is binary, converting to a format we can store to later process
  4. ExtractText - Copy Base64 encoded content to attribute
  5. AttributesToJson - Provenance shows output as being fine
  6. MergeContent - Provenance shows output of malformed JSON being written in the combined flowflle.
  7. PutDatabaseRecord - Schema specified as Schema Text
Since we've removed all traces of custom code what are peoples thoughts on possible causes? Could this be an OS issue, or are there any known issues with specific versions of RHEL?

Logically I think it makes sense to remove JSON from the equation as a whole.

Thanks,
Jason

On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <[hidden email]> wrote:
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




12