MergeContent resulting in corrupted JSON

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

MergeContent resulting in corrupted JSON

Jason Iannone
Hi all,

Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.

What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.

Example
Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" } 
Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" } 

Merged Result:
{"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" } 
xbytes": A10F15D14B11", timestamp: "123456790" }  
{"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" } 
{"name": "3", "h  

Mergecontent Configuration:
Concurrent Tasks: 4
Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Attribute Strategy: Keep Only Common Attributes
Min. number of entries 1000
Max number of entries: 20000
Minimum group size: 10 KB
Maximum number of bins: 5
Header, Footer, and Demaractor are not set.

We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.

Any insights?

Thanks,
Jason
Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" }  
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h  
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason

Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Andy LoPresto
It may just be a copy/paste or retyping issue, but in the example you provided, I see unpaired double quotes (the hexBytes values have trailing quotes but not leading ones), which could be causing issues in parsing…


Andy LoPresto
[hidden email]
[hidden email]
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 9, 2020, at 5:02 PM, Mark Payne <[hidden email]> wrote:

Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:

Hi all,

Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.

What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.

Example
Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }

Merged Result:
{"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
xbytes": A10F15D14B11", timestamp: "123456790" }  
{"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
{"name": "3", "h  

Mergecontent Configuration:
Concurrent Tasks: 4
Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Attribute Strategy: Keep Only Common Attributes
Min. number of entries 1000
Max number of entries: 20000
Minimum group size: 10 KB
Maximum number of bins: 5
Header, Footer, and Demaractor are not set.

We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.

Any insights?

Thanks,
Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Andy, it was a retyping  issue and it should be correct JSON.

Mark, the flow is ConsumeKafka_2.0 -> Custom Processor -> MergeContent -> PutDatabaseRecord. ConsumeKafka is consuming encrypted payloads, and the custom processor is decrypting the payload, hex-encoding the decrypted bytes and putting it in a JSON structure. The output from the custom processor appears correct and properly formed JSON (verified in the content repository via the local file system) which is built via Jackson ObjectMapper and returning the byte[] which is then written via ProcessorSession.write. We're working on recreating a sample that can be shared.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:06 PM Andy LoPresto <[hidden email]> wrote:
It may just be a copy/paste or retyping issue, but in the example you provided, I see unpaired double quotes (the hexBytes values have trailing quotes but not leading ones), which could be causing issues in parsing…


Andy LoPresto
[hidden email]
[hidden email]
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 9, 2020, at 5:02 PM, Mark Payne <[hidden email]> wrote:

Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:

Hi all,

Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.

What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.

Example
Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }

Merged Result:
{"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
xbytes": A10F15D14B11", timestamp: "123456790" }  
{"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
{"name": "3", "h  

Mergecontent Configuration:
Concurrent Tasks: 4
Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Attribute Strategy: Keep Only Common Attributes
Min. number of entries 1000
Max number of entries: 20000
Minimum group size: 10 KB
Maximum number of bins: 5
Header, Footer, and Demaractor are not set.

We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.

Any insights?

Thanks,
Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
In reply to this post by Mark Payne
Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason

Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason



Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
I confirmed what you mentioned as well. 

I also looked over many custom processor examples and looking for clarification on a few things which I didn't see explicitly called out in the developers guide.
  • Are their guidelines on when one should modify the original flowfile vs when you should clone vs when you should create net new?
  • Should heavier lifting such as decryption, formatting, etc. be done in a callback?

Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <[hidden email]> wrote:
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason



Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
Jason,

Modify vs. clone vs. create new:

You would clone a FlowFile if you want an exact copy of the FlowFile (with the exception that the clone will have a unique UUID, Entry Date, etc.). Very rare that a Processor will actually do this.

Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - This is a judgment call really. Do you think it will be necessary to have a copy of the original FlowFile and a modified version of it? If so, you may want to create a child FlowFile and send the original FlowFile to original. In reality, you shouldn’t need this often. In most cases, if the user wants both the original and the modified version, they can just add two connections, one going to this processor and one going to wherever else they want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. Where the “create a child and send out the original” matters is just when there’s a feasible use case in which the user would want to have a modified version of the FlowFile and the original version of the FlowFile and also not want to process the original version until after the modified version has been created. This is not common. However, over the years, it has become a common practice to create “original” relationships when they are not needed, likely because a few developers saw a pattern of creating an original relationship and duplicated this to many other processors without really understanding the difference.

“Net New” - there are two ways to create a FlowFile: `session.create()` and `session.create(original);` - the first creates a FlowFile with no parent FlowFile. This should be done only if there is no inbound FlowFile to create it from. I.e., when this is a “source” processor. In 100% of all other cases, it should be done as `session.create(original);` Providing the original FlowFile does 2 important things. Firstly, it creates a linkage in provenance between them. Secondly, it causes the newly created FlowFile to inherit all attributes from the child.

Call vs. non-callback: It doesn’t matter. The callback was originally the only way to read or write content of FlowFiles. It was done this way because it was a straight-forward way to ensure that the framework was able to properly manage InputStream, OutputStream, etc. But there were use cases that didn’t fit the callback mechanism well so we eventually added ability to get the InputStreams and OutputStreams directly and callers can just use try-with-resources. This is probably preferred now for most cases just because it results in cleaner code.

Thanks
-Mark

On Jun 11, 2020, at 10:43 AM, Jason Iannone <[hidden email]> wrote:

I confirmed what you mentioned as well. 

I also looked over many custom processor examples and looking for clarification on a few things which I didn't see explicitly called out in the developers guide.
  • Are their guidelines on when one should modify the original flowfile vs when you should clone vs when you should create net new?
  • Should heavier lifting such as decryption, formatting, etc. be done in a callback?

Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <[hidden email]> wrote:
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Andy LoPresto
To give another perspective on the “callback vs. non”, I’d say “heavy” or “messy” operations (like encryption, for example) should be contained in encapsulated code (other classes which provide a service) and then invoked from the callback or TWR. This allows for much more testable business logic, separation of concerns (a service which implements the behavior and then a component effectively calling the API), and composability/flexibility. If I want to build a processor which exposes a property allowing the user to select different encryption algorithms, I can either detect which one and delegate that to an implementation, or I could have a giant switch statement and the raw crypto primitive code all in a giant spaghetti method/callback definition. I know I would prefer the former. 

Andy LoPresto
[hidden email]
[hidden email]
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 11, 2020, at 8:14 AM, Mark Payne <[hidden email]> wrote:

Jason,

Modify vs. clone vs. create new:

You would clone a FlowFile if you want an exact copy of the FlowFile (with the exception that the clone will have a unique UUID, Entry Date, etc.). Very rare that a Processor will actually do this.

Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - This is a judgment call really. Do you think it will be necessary to have a copy of the original FlowFile and a modified version of it? If so, you may want to create a child FlowFile and send the original FlowFile to original. In reality, you shouldn’t need this often. In most cases, if the user wants both the original and the modified version, they can just add two connections, one going to this processor and one going to wherever else they want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. Where the “create a child and send out the original” matters is just when there’s a feasible use case in which the user would want to have a modified version of the FlowFile and the original version of the FlowFile and also not want to process the original version until after the modified version has been created. This is not common. However, over the years, it has become a common practice to create “original” relationships when they are not needed, likely because a few developers saw a pattern of creating an original relationship and duplicated this to many other processors without really understanding the difference.

“Net New” - there are two ways to create a FlowFile: `session.create()` and `session.create(original);` - the first creates a FlowFile with no parent FlowFile. This should be done only if there is no inbound FlowFile to create it from. I.e., when this is a “source” processor. In 100% of all other cases, it should be done as `session.create(original);` Providing the original FlowFile does 2 important things. Firstly, it creates a linkage in provenance between them. Secondly, it causes the newly created FlowFile to inherit all attributes from the child.

Call vs. non-callback: It doesn’t matter. The callback was originally the only way to read or write content of FlowFiles. It was done this way because it was a straight-forward way to ensure that the framework was able to properly manage InputStream, OutputStream, etc. But there were use cases that didn’t fit the callback mechanism well so we eventually added ability to get the InputStreams and OutputStreams directly and callers can just use try-with-resources. This is probably preferred now for most cases just because it results in cleaner code.

Thanks
-Mark

On Jun 11, 2020, at 10:43 AM, Jason Iannone <[hidden email]> wrote:

I confirmed what you mentioned as well. 

I also looked over many custom processor examples and looking for clarification on a few things which I didn't see explicitly called out in the developers guide.
  • Are their guidelines on when one should modify the original flowfile vs when you should clone vs when you should create net new?
  • Should heavier lifting such as decryption, formatting, etc. be done in a callback?

Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <[hidden email]> wrote:
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason





Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
We currently have it encapsulated in code that allows proper isolation and testing, as this is the same methodology applied for standard development. What I wasn't sure is whether Nifi is opinionated and actually preferred and/or performed better with callbacks. There's a lot of older Nifi examples out there, including Nifi core processors and its hard to discern what's recommended. 

What's TWR?

Appreciate the replies, you both have been immensely helpful!

Thanks,
Jason

On Thu, Jun 11, 2020 at 8:40 PM Andy LoPresto <[hidden email]> wrote:
To give another perspective on the “callback vs. non”, I’d say “heavy” or “messy” operations (like encryption, for example) should be contained in encapsulated code (other classes which provide a service) and then invoked from the callback or TWR. This allows for much more testable business logic, separation of concerns (a service which implements the behavior and then a component effectively calling the API), and composability/flexibility. If I want to build a processor which exposes a property allowing the user to select different encryption algorithms, I can either detect which one and delegate that to an implementation, or I could have a giant switch statement and the raw crypto primitive code all in a giant spaghetti method/callback definition. I know I would prefer the former. 

Andy LoPresto
[hidden email]
[hidden email]
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 11, 2020, at 8:14 AM, Mark Payne <[hidden email]> wrote:

Jason,

Modify vs. clone vs. create new:

You would clone a FlowFile if you want an exact copy of the FlowFile (with the exception that the clone will have a unique UUID, Entry Date, etc.). Very rare that a Processor will actually do this.

Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - This is a judgment call really. Do you think it will be necessary to have a copy of the original FlowFile and a modified version of it? If so, you may want to create a child FlowFile and send the original FlowFile to original. In reality, you shouldn’t need this often. In most cases, if the user wants both the original and the modified version, they can just add two connections, one going to this processor and one going to wherever else they want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. Where the “create a child and send out the original” matters is just when there’s a feasible use case in which the user would want to have a modified version of the FlowFile and the original version of the FlowFile and also not want to process the original version until after the modified version has been created. This is not common. However, over the years, it has become a common practice to create “original” relationships when they are not needed, likely because a few developers saw a pattern of creating an original relationship and duplicated this to many other processors without really understanding the difference.

“Net New” - there are two ways to create a FlowFile: `session.create()` and `session.create(original);` - the first creates a FlowFile with no parent FlowFile. This should be done only if there is no inbound FlowFile to create it from. I.e., when this is a “source” processor. In 100% of all other cases, it should be done as `session.create(original);` Providing the original FlowFile does 2 important things. Firstly, it creates a linkage in provenance between them. Secondly, it causes the newly created FlowFile to inherit all attributes from the child.

Call vs. non-callback: It doesn’t matter. The callback was originally the only way to read or write content of FlowFiles. It was done this way because it was a straight-forward way to ensure that the framework was able to properly manage InputStream, OutputStream, etc. But there were use cases that didn’t fit the callback mechanism well so we eventually added ability to get the InputStreams and OutputStreams directly and callers can just use try-with-resources. This is probably preferred now for most cases just because it results in cleaner code.

Thanks
-Mark

On Jun 11, 2020, at 10:43 AM, Jason Iannone <[hidden email]> wrote:

I confirmed what you mentioned as well. 

I also looked over many custom processor examples and looking for clarification on a few things which I didn't see explicitly called out in the developers guide.
  • Are their guidelines on when one should modify the original flowfile vs when you should clone vs when you should create net new?
  • Should heavier lifting such as decryption, formatting, etc. be done in a callback?

Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <[hidden email]> wrote:
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason





Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Andy LoPresto-2
Sorry, TWR = try-with-resources. 

Definitely a lot of old code that “still works” but is brittle. We should do better about highlighting modern implementations and paying down tech debt, but the project just moves so quickly. 

Not a perfect rule, but if I see code from one of the core contributors with a date in the last couple years, I trust it much more than even code by those same people from 5 years ago (which was likely written even longer ago than that; time “starts” from the initial import in 2014). 

Andy LoPresto
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69


On Jun 11, 2020, at 18:16, Jason Iannone <[hidden email]> wrote:


We currently have it encapsulated in code that allows proper isolation and testing, as this is the same methodology applied for standard development. What I wasn't sure is whether Nifi is opinionated and actually preferred and/or performed better with callbacks. There's a lot of older Nifi examples out there, including Nifi core processors and its hard to discern what's recommended. 

What's TWR?

Appreciate the replies, you both have been immensely helpful!

Thanks,
Jason

On Thu, Jun 11, 2020 at 8:40 PM Andy LoPresto <[hidden email]> wrote:
To give another perspective on the “callback vs. non”, I’d say “heavy” or “messy” operations (like encryption, for example) should be contained in encapsulated code (other classes which provide a service) and then invoked from the callback or TWR. This allows for much more testable business logic, separation of concerns (a service which implements the behavior and then a component effectively calling the API), and composability/flexibility. If I want to build a processor which exposes a property allowing the user to select different encryption algorithms, I can either detect which one and delegate that to an implementation, or I could have a giant switch statement and the raw crypto primitive code all in a giant spaghetti method/callback definition. I know I would prefer the former. 

Andy LoPresto
[hidden email]
[hidden email]
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On Jun 11, 2020, at 8:14 AM, Mark Payne <[hidden email]> wrote:

Jason,

Modify vs. clone vs. create new:

You would clone a FlowFile if you want an exact copy of the FlowFile (with the exception that the clone will have a unique UUID, Entry Date, etc.). Very rare that a Processor will actually do this.

Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - This is a judgment call really. Do you think it will be necessary to have a copy of the original FlowFile and a modified version of it? If so, you may want to create a child FlowFile and send the original FlowFile to original. In reality, you shouldn’t need this often. In most cases, if the user wants both the original and the modified version, they can just add two connections, one going to this processor and one going to wherever else they want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. Where the “create a child and send out the original” matters is just when there’s a feasible use case in which the user would want to have a modified version of the FlowFile and the original version of the FlowFile and also not want to process the original version until after the modified version has been created. This is not common. However, over the years, it has become a common practice to create “original” relationships when they are not needed, likely because a few developers saw a pattern of creating an original relationship and duplicated this to many other processors without really understanding the difference.

“Net New” - there are two ways to create a FlowFile: `session.create()` and `session.create(original);` - the first creates a FlowFile with no parent FlowFile. This should be done only if there is no inbound FlowFile to create it from. I.e., when this is a “source” processor. In 100% of all other cases, it should be done as `session.create(original);` Providing the original FlowFile does 2 important things. Firstly, it creates a linkage in provenance between them. Secondly, it causes the newly created FlowFile to inherit all attributes from the child.

Call vs. non-callback: It doesn’t matter. The callback was originally the only way to read or write content of FlowFiles. It was done this way because it was a straight-forward way to ensure that the framework was able to properly manage InputStream, OutputStream, etc. But there were use cases that didn’t fit the callback mechanism well so we eventually added ability to get the InputStreams and OutputStreams directly and callers can just use try-with-resources. This is probably preferred now for most cases just because it results in cleaner code.

Thanks
-Mark

On Jun 11, 2020, at 10:43 AM, Jason Iannone <[hidden email]> wrote:

I confirmed what you mentioned as well. 

I also looked over many custom processor examples and looking for clarification on a few things which I didn't see explicitly called out in the developers guide.
  • Are their guidelines on when one should modify the original flowfile vs when you should clone vs when you should create net new?
  • Should heavier lifting such as decryption, formatting, etc. be done in a callback?

Thanks,
Jason

On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <[hidden email]> wrote:
I don’t think flushing should matter, if you’re writing directly to the provided OutputStream. If you wrap it in a BufferedOutputStream or something like that, then of course you’ll want to flush that. Assuming that you are extending AbstractProcessor, it will call session.commit() for you automatically when onTrigger() returns.

I did just notice that you said you’re merging 1,000+ FlowFiles. That would make it kind of difficult to follow the provenance. Would recommend for debugging purposes, at least, that you try making small batches, maybe 25 FlowFiles or something like that. Would make it a lot easier to find the culprit

On Jun 10, 2020, at 4:28 PM, Jason Iannone <[hidden email]> wrote:

Excellent advice, thank you! When writing via ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or session.commit()? I noticed we aren't doing either, but we are invoking session.transfer.

Thanks,
Jason


On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason





Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
In reply to this post by Mark Payne
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason


Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason



Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason



Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Mark Payne
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Jason Iannone
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply | Threaded
Open this post in threaded view
|

Re: MergeContent resulting in corrupted JSON

Darren Govoni
Just lurking this thread. You wrote earlier.

The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

All of these activities, including when the append has completed should be fully synchronized with one another. 

Tracking the current offset and appending to it will not be thread safe unless synchronized so all threads have a fully consistent view of a given files current offset after all appends to that file have completed.

Just something to double check.

Sent from my Verizon, Samsung Galaxy smartphone

From: Jason Iannone <[hidden email]>
Sent: Wednesday, June 24, 2020 2:54:26 PM
To: [hidden email] <[hidden email]>
Subject: Re: MergeContent resulting in corrupted JSON
 
Exactly my thought, and we've been combing through the code but nothing significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested upgrading to 1.11.4.

Thanks,
Jason

On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <[hidden email]> wrote:
It should be okay to create a buffer like that. Assuming the FlowFile is small. Typically we try to avoid buffering the content of a FlowFile into memory. But if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the same input, it sounds like a threading/concurrency bug. Do you have a buffer or anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone <[hidden email]> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still saw it occur. What I'm now wondering is if sizing the byte array based on flowFile.getSize() is ideal? The contents of the file are raw bytes coming from ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <[hidden email]> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely invaluable. So now you should be able to trace the lineage of that FlowFile back to the start using data provenance. You can see exactly what it looked like when it was received. If it looks wrong there, the provenance shows exactly where it was received from so you know where to look next. If it looks good on receipt, you can trace the data through the flow and see exactly what the data looked like before and after each processor. And when you see which processor resulted in corruption, you can easily download the data as it looks when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone <[hidden email]> wrote:

I spoke too soon, and must be the magic of sending an email! We found what appears to be corrupted content and captured the binary, hoping to play it through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <[hidden email]> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the content is fine coming into MergeContent but is corrupt on output of Join. Any other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <[hidden email]> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent just copies bytes from one stream to another. It’s also worth noting that attributes don’t really come into play here. MergeContent is combining the FlowFile content, so even if it has some weird attributes, those won’t cause a problem in the output content. NiFi stores attributes as a mapping of String to String key/value pairs (i.e., Map<String, String>). So the processor is assuming that if you want to convert a message header to an attribute, that header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at a time has the opportunity to write to a file in the content repository. When the processor finishes writing and transfers the FlowFile to the next processor, NiFi keeps track of which file its content was written to, the byte offset where its content starts, and the length of the content. The next time that a processor needs to write to the content of a FlowFile, it may end up appending to that same file on disk, but the FlowFile that the content corresponds to will keep track of the byte offset into the file where its content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is corrupt, and then use the data provenance feature [1] to view its lineage. Look at the FlowFiles that were joined together by MergeContent and see if any of those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone <[hidden email]> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson Objectmapper is it possible that hidden and/or control characters are present in the JSON values which would then cause MergeContent to behave this way? I looked over the code and nothing jumped out, but there is something we had to do because of how the publisher is setting kafka header attributes. Some attributes are bytes and not strings converted to bytes, and ConsumeKafka seems to assume that these can always be converted to a String. We had to change the encoding to be ISO8859 due to running into issues with the bytes getting corrupted.

I'm also trying to better understand how the content is being stored in the content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <[hidden email]> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? I ask because the code for the processor that handles writing out the content is pretty straight forward and hasn’t been modified in over 3 years, so I would expect to see it happen often if it were a bug in the MergeContent processor itself. Any chance that you can create a flow template/sample data that recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone <[hidden email]> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The processor is being fed in many flowfiles with individual JSON records. The records have various field types including a hex-encoded byte[]. We are not trying to merge JSON records themselves but rather consolidate many flowfiles into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file to be invalid JSON. When running multiple bins we saw the flowfile split across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" } 
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h 
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




12