Distributed Key-Value Store

KVStore is a place for data sharing. Think of it as a single object shared across different devices (GPUs and computers), where each device can push data in and pull data out.

Initialization

Let’s consider a simple example: initializing a (int, NDArray) pair into the store, and then pulling the value out:

[1]:
import mxnet as mx

kv = mx.kv.create('local') # create a local kv store.
shape = (2,3)
kv.init(3, mx.np.ones(shape)*2)
a = mx.np.zeros(shape)
kv.pull(3, out = a)
print(a.asnumpy())
[[2. 2. 2.]
 [2. 2. 2.]]
[04:51:19] /work/mxnet/src/storage/storage.cc:202: Using Pooled (Naive) StorageManager for CPU
[04:51:19] /work/mxnet/src/storage/storage.cc:202: Using Pooled (Naive) StorageManager for CPU_PINNED

[[ 2.  2.  2.],[ 2.  2.  2.]]

Push, Aggregate, and Update

For any key that has been initialized, you can push a new value with the same shape to the key:

[2]:
kv.push(3, mx.np.ones(shape)*8)
kv.pull(3, out = a) # pull out the value
print(a.asnumpy())
[[8. 8. 8.]
 [8. 8. 8.]]

[[ 8.  8.  8.],[ 8.  8.  8.]]

The data for pushing can be stored on any device. Furthermore, you can push multiple values into the same key, where KVStore will first sum all of these values and then push the aggregated value. Here we will just demonstrate pushing a list of values on CPU. Please note summation only happens if the value list is longer than one

[3]:
devices = [mx.cpu(i) for i in range(4)]
b = [mx.np.ones(shape=shape, device=device) for device in devices]
kv.push(3, b)
kv.pull(3, out = a)
print(a.asnumpy())
[[4. 4. 4.]
 [4. 4. 4.]]

[[ 4.  4.  4.],[ 4.  4.  4.]]

For each push, KVStore combines the pushed value with the value stored using an updater. The default updater is ASSIGN. You can replace the default to control how data is merged:

[4]:
def update(key, input, stored):
    print("update on key: %d" % key)
    stored += input * 2
kv._set_updater(update)
kv.pull(3, out=a)
print(a.asnumpy())
[[4. 4. 4.]
 [4. 4. 4.]]

[[ 4.  4.  4.],[ 4.  4.  4.]]

[5]:
kv.push(3, mx.np.ones(shape))
kv.pull(3, out=a)
print(a.asnumpy())
update on key: 3
[[6. 6. 6.]
 [6. 6. 6.]]

update on key: 3

[[ 6.  6.  6.],[ 6.  6.  6.]]

Pull

You’ve already seen how to pull a single key-value pair. Similarly, to push, you can pull the value onto several devices with a single call:

[6]:
b = [mx.np.ones(shape=shape, device=device) for device in devices]
kv.pull(3, out = b)
print(b[1].asnumpy())
[[6. 6. 6.]
 [6. 6. 6.]]

[ 6.  6.  6.]],[[ 6.  6.  6.]

Handle a List of Key-Value Pairs

All operations introduced so far involve a single key. KVStore also provides an interface for a list of key-value pairs.

For a single device:

[7]:
keys = [5, 7, 9]
kv.init(keys, [mx.np.ones(shape)]*len(keys))
kv.push(keys, [mx.np.ones(shape)]*len(keys))
b = [mx.np.zeros(shape)]*len(keys)
kv.pull(keys, out = b)
print(b[1].asnumpy())
update on key: 5
update on key: 7
update on key: 9
[[3. 3. 3.]
 [3. 3. 3.]]

update on key: 5

update on key: 7

update on key: 9

[[ 3.  3.  3.],[ 3.  3.  3.]]

For multiple devices:

[8]:
b = [[mx.np.ones(shape=shape, device=device) for device in devices]] * len(keys)
kv.push(keys, b)
kv.pull(keys, out = b)
print(b[1][1].asnumpy())
update on key: 5
update on key: 7
update on key: 9
[[11. 11. 11.]
 [11. 11. 11.]]

update on key: 5

update on key: 7

update on key: 9

[[ 11.  11.  11.],[ 11.  11.  11.]]

Run on Multiple Machines

Based on parameter server, the updater runs on the server nodes. When the distributed version is ready, we will update this section.