DistributedMapCache

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

DistributedMapCache

Yari Marchetti
Hello,
I'm running a 3 nodes cluster and I've been trying to implement a deduplication workflow using the DetectDuplicate but, on my first try, I noticed that there were always 3 messages marked as non-duplicates. After some investigation I tracked down this issue to be related to a configuration I did for DistributedMapCache server address which was set to localhost: if instead I set it to the IP of one of the nodes than everything's working as expected.

My concern with this approach is of reliability: if that specific node goes down, than the workflow will not work properly. I know I could implement it using some other kind of storage but wanted to check first whether I got it right and what's the suggested approach.

Thanks,
Yari
Reply | Threaded
Open this post in threaded view
|

Re: DistributedMapCache

Koji Kawamura
Hi Yari,

Thanks for the great question. I looked at the
DistributedMapCacheClient/Server code briefly, but there's no high
availability support with NiFi cluster. As you figured it out, we need
to point one of nodes IP address, in order to share the same Cache
storage among nodes within the same cluster, and if the node goes
down, the client processors stop working until the node recovers or
client service configuration is updated.

Although it has 'Distributed' in its name, the cache storage itself is
not distributed, it just supports multiple client nodes, and it does
not implement any coordination logic that work nicely with NiFi
cluster.

I think we might be able to use primary node to improve its availability.
By adding option to DistributedCacheServer to run only on a primary
node, and also, add option to client service to point a primary node
(without specifying a specific ip address or hostname). Then let
Zookeeper and NiFi cluster handles fail-over scenario.
The whole cache entries will be invalidated when fail-over happens.
So, things like DetectDuplicate won't work right after a fail-over.
This idea only helps NiFi cluster and data flow recover automatically
without human intervention.

We could implement cache replication between nodes as well to provide
higher availability, or hashing to utilize resources on every nodes,
but I think it's overkill for NiFi. If one needs such level of
availability, then I'd recommend to use other NoSQL databases.

How do you think about that? I'd like to hear from others, too, to see
if it's worth for trying.

Thanks,
Koji





On Fri, Nov 4, 2016 at 5:38 PM, Yari Marchetti
<[hidden email]> wrote:

> Hello,
> I'm running a 3 nodes cluster and I've been trying to implement a
> deduplication workflow using the DetectDuplicate but, on my first try, I
> noticed that there were always 3 messages marked as non-duplicates. After
> some investigation I tracked down this issue to be related to a
> configuration I did for DistributedMapCache server address which was set to
> localhost: if instead I set it to the IP of one of the nodes than
> everything's working as expected.
>
> My concern with this approach is of reliability: if that specific node goes
> down, than the workflow will not work properly. I know I could implement it
> using some other kind of storage but wanted to check first whether I got it
> right and what's the suggested approach.
>
> Thanks,
> Yari
Reply | Threaded
Open this post in threaded view
|

Re: DistributedMapCache

Yari Marchetti
Hi Koji,
thanks for the explanation, that's very clear now; as you pointed out I got 
tricked by the "distributed" :)

Regarding the idea to implement a ZK-driven failover, I think it makes sense
to improve the overall stability/manageability of the platform and also to
not implement a cache replication. It would definitely be an overkill.

But, I was also thinking that for use cases where you need to be absolutely 
sure no duplicates are propagated (I've already several similar use cases), 
it would also make senso to create some sort of "DurableMapCacheServer" 
using some sort of NoSQL/SQL backend. 
As I said, you could implement it yourself using standard processors
but I found that it's something that could be very very useful to have "off the
shelf".

In my view both of them make sense: one fast with weak guarantee, when 
duplicates may be acceptable, and one slower and durable with strong 
guarantee. What do you think?

Thanks,
Yari



On 7 November 2016 at 07:02, Koji Kawamura <[hidden email]> wrote:
Hi Yari,

Thanks for the great question. I looked at the
DistributedMapCacheClient/Server code briefly, but there's no high
availability support with NiFi cluster. As you figured it out, we need
to point one of nodes IP address, in order to share the same Cache
storage among nodes within the same cluster, and if the node goes
down, the client processors stop working until the node recovers or
client service configuration is updated.

Although it has 'Distributed' in its name, the cache storage itself is
not distributed, it just supports multiple client nodes, and it does
not implement any coordination logic that work nicely with NiFi
cluster.

I think we might be able to use primary node to improve its availability.
By adding option to DistributedCacheServer to run only on a primary
node, and also, add option to client service to point a primary node
(without specifying a specific ip address or hostname). Then let
Zookeeper and NiFi cluster handles fail-over scenario.
The whole cache entries will be invalidated when fail-over happens.
So, things like DetectDuplicate won't work right after a fail-over.
This idea only helps NiFi cluster and data flow recover automatically
without human intervention.

We could implement cache replication between nodes as well to provide
higher availability, or hashing to utilize resources on every nodes,
but I think it's overkill for NiFi. If one needs such level of
availability, then I'd recommend to use other NoSQL databases.

How do you think about that? I'd like to hear from others, too, to see
if it's worth for trying.

Thanks,
Koji





On Fri, Nov 4, 2016 at 5:38 PM, Yari Marchetti
<[hidden email]> wrote:
> Hello,
> I'm running a 3 nodes cluster and I've been trying to implement a
> deduplication workflow using the DetectDuplicate but, on my first try, I
> noticed that there were always 3 messages marked as non-duplicates. After
> some investigation I tracked down this issue to be related to a
> configuration I did for DistributedMapCache server address which was set to
> localhost: if instead I set it to the IP of one of the nodes than
> everything's working as expected.
>
> My concern with this approach is of reliability: if that specific node goes
> down, than the workflow will not work properly. I know I could implement it
> using some other kind of storage but wanted to check first whether I got it
> right and what's the suggested approach.
>
> Thanks,
> Yari

Reply | Threaded
Open this post in threaded view
|

Re: DistributedMapCache

Mark Payne
Yari,

I have implemented a couple of additional implementations of DistributedMapCacheClient - one for
MySQL and one for Memcached. However, I'd not yet gotten them into Apache, as they need some cleanup
and some refactoring probably. Eventually I need to get that migrated over.

The design of DetectDuplicate, though, makes this work very well. The intent is that you can use DistributeLoad
pointing to Memcached first. If it detects a duplicate, you route that as appropriate. If it routes to 'non-duplicate'
then you would send it to a second DistributeLoad processor that points to MySQL. This way, if you have a non-duplicate,
it will automatically be added to Memcached as well as MySQL (because the first DetectDuplicate will notice that it's not
in Memcached and add it, and the second one will either detect that it's already in MySQL or detect that it's not and add
it). So this gives you the caching layer on top of the persistence layer.

Thanks
-Mark



On Nov 7, 2016, at 5:06 AM, Yari Marchetti <[hidden email]> wrote:

Hi Koji,
thanks for the explanation, that's very clear now; as you pointed out I got 
tricked by the "distributed" :)

Regarding the idea to implement a ZK-driven failover, I think it makes sense
to improve the overall stability/manageability of the platform and also to
not implement a cache replication. It would definitely be an overkill.

But, I was also thinking that for use cases where you need to be absolutely 
sure no duplicates are propagated (I've already several similar use cases), 
it would also make senso to create some sort of "DurableMapCacheServer" 
using some sort of NoSQL/SQL backend. 
As I said, you could implement it yourself using standard processors
but I found that it's something that could be very very useful to have "off the
shelf".

In my view both of them make sense: one fast with weak guarantee, when 
duplicates may be acceptable, and one slower and durable with strong 
guarantee. What do you think?

Thanks,
Yari



On 7 November 2016 at 07:02, Koji Kawamura <[hidden email]> wrote:
Hi Yari,

Thanks for the great question. I looked at the
DistributedMapCacheClient/Server code briefly, but there's no high
availability support with NiFi cluster. As you figured it out, we need
to point one of nodes IP address, in order to share the same Cache
storage among nodes within the same cluster, and if the node goes
down, the client processors stop working until the node recovers or
client service configuration is updated.

Although it has 'Distributed' in its name, the cache storage itself is
not distributed, it just supports multiple client nodes, and it does
not implement any coordination logic that work nicely with NiFi
cluster.

I think we might be able to use primary node to improve its availability.
By adding option to DistributedCacheServer to run only on a primary
node, and also, add option to client service to point a primary node
(without specifying a specific ip address or hostname). Then let
Zookeeper and NiFi cluster handles fail-over scenario.
The whole cache entries will be invalidated when fail-over happens.
So, things like DetectDuplicate won't work right after a fail-over.
This idea only helps NiFi cluster and data flow recover automatically
without human intervention.

We could implement cache replication between nodes as well to provide
higher availability, or hashing to utilize resources on every nodes,
but I think it's overkill for NiFi. If one needs such level of
availability, then I'd recommend to use other NoSQL databases.

How do you think about that? I'd like to hear from others, too, to see
if it's worth for trying.

Thanks,
Koji





On Fri, Nov 4, 2016 at 5:38 PM, Yari Marchetti
<[hidden email]> wrote:
> Hello,
> I'm running a 3 nodes cluster and I've been trying to implement a
> deduplication workflow using the DetectDuplicate but, on my first try, I
> noticed that there were always 3 messages marked as non-duplicates. After
> some investigation I tracked down this issue to be related to a
> configuration I did for DistributedMapCache server address which was set to
> localhost: if instead I set it to the IP of one of the nodes than
> everything's working as expected.
>
> My concern with this approach is of reliability: if that specific node goes
> down, than the workflow will not work properly. I know I could implement it
> using some other kind of storage but wanted to check first whether I got it
> right and what's the suggested approach.
>
> Thanks,
> Yari


Reply | Threaded
Open this post in threaded view
|

Re: DistributedMapCache

Yari Marchetti
Mark,
that would be exactly what I'd like to see in Nifi, I hope you'll be pushing it soon :)

Thanks,
Yari

On 7 November 2016 at 16:12, Mark Payne <[hidden email]> wrote:
Yari,

I have implemented a couple of additional implementations of DistributedMapCacheClient - one for
MySQL and one for Memcached. However, I'd not yet gotten them into Apache, as they need some cleanup
and some refactoring probably. Eventually I need to get that migrated over.

The design of DetectDuplicate, though, makes this work very well. The intent is that you can use DistributeLoad
pointing to Memcached first. If it detects a duplicate, you route that as appropriate. If it routes to 'non-duplicate'
then you would send it to a second DistributeLoad processor that points to MySQL. This way, if you have a non-duplicate,
it will automatically be added to Memcached as well as MySQL (because the first DetectDuplicate will notice that it's not
in Memcached and add it, and the second one will either detect that it's already in MySQL or detect that it's not and add
it). So this gives you the caching layer on top of the persistence layer.

Thanks
-Mark



On Nov 7, 2016, at 5:06 AM, Yari Marchetti <[hidden email]> wrote:

Hi Koji,
thanks for the explanation, that's very clear now; as you pointed out I got 
tricked by the "distributed" :)

Regarding the idea to implement a ZK-driven failover, I think it makes sense
to improve the overall stability/manageability of the platform and also to
not implement a cache replication. It would definitely be an overkill.

But, I was also thinking that for use cases where you need to be absolutely 
sure no duplicates are propagated (I've already several similar use cases), 
it would also make senso to create some sort of "DurableMapCacheServer" 
using some sort of NoSQL/SQL backend. 
As I said, you could implement it yourself using standard processors
but I found that it's something that could be very very useful to have "off the
shelf".

In my view both of them make sense: one fast with weak guarantee, when 
duplicates may be acceptable, and one slower and durable with strong 
guarantee. What do you think?

Thanks,
Yari



On 7 November 2016 at 07:02, Koji Kawamura <[hidden email]> wrote:
Hi Yari,

Thanks for the great question. I looked at the
DistributedMapCacheClient/Server code briefly, but there's no high
availability support with NiFi cluster. As you figured it out, we need
to point one of nodes IP address, in order to share the same Cache
storage among nodes within the same cluster, and if the node goes
down, the client processors stop working until the node recovers or
client service configuration is updated.

Although it has 'Distributed' in its name, the cache storage itself is
not distributed, it just supports multiple client nodes, and it does
not implement any coordination logic that work nicely with NiFi
cluster.

I think we might be able to use primary node to improve its availability.
By adding option to DistributedCacheServer to run only on a primary
node, and also, add option to client service to point a primary node
(without specifying a specific ip address or hostname). Then let
Zookeeper and NiFi cluster handles fail-over scenario.
The whole cache entries will be invalidated when fail-over happens.
So, things like DetectDuplicate won't work right after a fail-over.
This idea only helps NiFi cluster and data flow recover automatically
without human intervention.

We could implement cache replication between nodes as well to provide
higher availability, or hashing to utilize resources on every nodes,
but I think it's overkill for NiFi. If one needs such level of
availability, then I'd recommend to use other NoSQL databases.

How do you think about that? I'd like to hear from others, too, to see
if it's worth for trying.

Thanks,
Koji





On Fri, Nov 4, 2016 at 5:38 PM, Yari Marchetti
<[hidden email]> wrote:
> Hello,
> I'm running a 3 nodes cluster and I've been trying to implement a
> deduplication workflow using the DetectDuplicate but, on my first try, I
> noticed that there were always 3 messages marked as non-duplicates. After
> some investigation I tracked down this issue to be related to a
> configuration I did for DistributedMapCache server address which was set to
> localhost: if instead I set it to the IP of one of the nodes than
> everything's working as expected.
>
> My concern with this approach is of reliability: if that specific node goes
> down, than the workflow will not work properly. I know I could implement it
> using some other kind of storage but wanted to check first whether I got it
> right and what's the suggested approach.
>
> Thanks,
> Yari