Skip to content

Commit 2b9dac2

Browse files
authored
Feature/bulk operations support (#39)
* Adding bulk operations support. * Removed obsolete import. * Minor fixes. * Updated changelog.
1 parent a353447 commit 2b9dac2

File tree

7 files changed

+402
-7
lines changed

7 files changed

+402
-7
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Changelog
22

3+
* Adding support for bulk operations.
4+
35
## Version 1.6.1
46

57
* Adding `c8y_tk` namespace to distribution.

c8y_api/_main_api.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from c8y_api.model.inventory import Inventory, DeviceInventory, DeviceGroupInventory
1818
from c8y_api.model.measurements import Measurements
1919
from c8y_api.model.notification2 import Subscriptions, Tokens
20-
from c8y_api.model.operations import Operations
20+
from c8y_api.model.operations import Operations, BulkOperations
2121
from c8y_api.model.tenant_options import TenantOptions
2222

2323

@@ -44,6 +44,7 @@ def __init__(self, base_url: str, tenant_id: str, username: str = None, password
4444
self.__events = Events(self)
4545
self.__alarms = Alarms(self)
4646
self.__operations = Operations(self)
47+
self.__bulk_operations = BulkOperations(self)
4748
self.__tenant_options = TenantOptions(self)
4849
self.__notification2_subscriptions = Subscriptions(self)
4950
self.__notification2_tokens = Tokens(self)
@@ -118,6 +119,11 @@ def operations(self) -> Operations:
118119
"""Provide access to the Operation API."""
119120
return self.__operations
120121

122+
@property
123+
def bulk_operations(self) -> BulkOperations:
124+
"""Provide access to the BulkOperation API."""
125+
return self.__bulk_operations
126+
121127
@property
122128
def tenant_options(self) -> TenantOptions:
123129
"""Provide access to the Tenant Options API."""

c8y_api/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
'User', 'GlobalRole', 'InventoryRole', 'Users', 'GlobalRoles', 'InventoryRoles', 'InventoryRoleAssignment',
2121
'Permission', 'ReadPermission', 'WritePermission', 'AnyPermission',
2222
'Application',
23+
'Operation', 'BulkOperation', 'Operations', 'BulkOperations',
2324
'ManagedObject', 'Device', 'DeviceGroup', 'Fragment', 'NamedObject',
2425
'Inventory', 'DeviceInventory', 'DeviceGroupInventory',
2526
'Identity', 'ExternalId', 'Binary', 'Binaries',

c8y_api/model/operations.py

Lines changed: 187 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,19 @@
1111

1212
from c8y_api._base_api import CumulocityRestApi
1313

14-
from c8y_api.model._base import CumulocityResource, ComplexObject, SimpleObject
14+
from c8y_api.model._base import CumulocityResource, ComplexObject, SimpleObject, _DictWrapper
1515
from c8y_api.model._parser import ComplexObjectParser
1616
from c8y_api.model._util import _DateUtil
1717

1818

1919
class Operation(ComplexObject):
20-
""" Represents an instance of a operation object in Cumulocity.
20+
""" Represents an instance of an operation object in Cumulocity.
2121
2222
Instances of this class are returned by functions of the corresponding
2323
Operation API. Use this class to create new or update existing
2424
operation.
2525
26-
See also: https://cumulocity.com/api/10.11.0/#tag/Operations
26+
See also: https://cumulocity.com/api/core/#tag/Operations
2727
"""
2828

2929
class Status:
@@ -131,7 +131,7 @@ class Operations(CumulocityResource):
131131
This class can be used for get, search for, create, update and
132132
delete operations within the Cumulocity database.
133133
134-
See also: https://cumulocity.com/api/10.11.0/#tag/Operations
134+
See also: https://cumulocity.com/api/core/#tag/Operations
135135
"""
136136

137137
def __init__(self, c8y: CumulocityRestApi):
@@ -141,7 +141,7 @@ def get(self, operation_id: str | int) -> Operation:
141141
""" Read a specific operation from the database.
142142
143143
params:
144-
operation_id (str|int): database ID of a operation
144+
operation_id (str|int): database ID of an operation
145145
146146
Returns:
147147
Operation object
@@ -211,7 +211,7 @@ def get_all(self, agent_id: str = None, device_id: str = None, status: str = Non
211211
available results are read immediately and returned as list.
212212
213213
Returns:
214-
List of matching Measurement objects
214+
List of matching Operation objects
215215
"""
216216
return list(self.select(agent_id=agent_id, device_id=device_id, status=status, bulk_id=bulk_id,
217217
fragment=fragment, before=before, after=after, min_age=min_age, max_age=max_age,
@@ -274,3 +274,184 @@ def delete_by(self, agent_id: str = None, device_id: str = None, status: str = N
274274
# remove &page_number= from the end
275275
query = base_query[:base_query.rindex('&')]
276276
self.c8y.delete(query)
277+
278+
279+
class BulkOperation(ComplexObject):
280+
""" Represents an instance of a bulk operation object in Cumulocity.
281+
282+
Instances of this class are returned by functions of the corresponding
283+
Bulk Operation API. Use this class to create new or update existing
284+
operation.
285+
286+
See also: https://cumulocity.com/api/core/#tag/Bulk-operations
287+
"""
288+
289+
class Status:
290+
"""Bulk Operation statuses."""
291+
ACTIVE = 'ACTIVE'
292+
IN_PROGRESS = 'IN_PROGRESS'
293+
COMPLETED = 'COMPLETED'
294+
DELETED = 'DELETED'
295+
296+
class GeneralStatus:
297+
"""Bulk Operation general statuses."""
298+
SCHEDULED = 'PENDING'
299+
EXECUTING = 'EXECUTING'
300+
EXECUTING_WITH_ERRORS = 'EXECUTING_WITH_ERRORS'
301+
SUCCESSFUL = 'SUCCESSFUL'
302+
FAILED = 'FAILED'
303+
CANCELED = 'CANCELED'
304+
COMPLETED_SUCCESSFULLY = 'COMPLETED SUCCESSFULLY'
305+
COMPLETED_WITH_FAILURES = 'COMPLETED WITH FAILURES'
306+
307+
# these need to be defined like this for the abstract super functions
308+
_resource = '/devicecontrol/bulkoperations'
309+
_parser = ComplexObjectParser({
310+
'_u_group_id': 'groupId',
311+
'_u_failed_parent_id': 'failedParentId',
312+
'_u_start_time': 'startDate',
313+
'_u_creation_ramp': 'creationRamp',
314+
'status': 'status',
315+
'general_status': 'generalStatus'}, [])
316+
_accept = 'application/vnd.com.nsn.cumulocity.bulkoperation+json'
317+
318+
def __init__(self, c8y=None, group_id: str = None, failed_parent_id: str = None,
319+
start_time: str | datetime = None, creation_ramp: float = None,
320+
operation_prototype: dict = None, **kwargs):
321+
""" Create a new Operation object.
322+
323+
Params:
324+
c8y (CumulocityRestApi): Cumulocity connection reference; needs
325+
to be set for direct manipulation (create, delete)
326+
device_id (str): Device ID which this operation is for
327+
kwargs: All additional named arguments are interpreted as
328+
custom fragments e.g. for data points.
329+
330+
Returns:
331+
Operation object
332+
"""
333+
super().__init__(c8y, operationPrototype=operation_prototype, **kwargs)
334+
self._u_group_id = group_id
335+
self._u_failed_parent_id = failed_parent_id
336+
self._u_start_time = _DateUtil.ensure_timestring(start_time)
337+
self._u_creation_ramp = creation_ramp
338+
self.status: str | None = None
339+
self.general_status: str | None = None
340+
341+
group_id = SimpleObject.UpdatableProperty('_u_group_id')
342+
failed_parent_id = SimpleObject.UpdatableProperty('_u_failed_parent_id')
343+
start_time = SimpleObject.UpdatableProperty('_u_start_time')
344+
creation_ramp = SimpleObject.UpdatableProperty('_u_creation_ramp')
345+
346+
@property
347+
def operation_prototype(self) -> _DictWrapper:
348+
return self.operationPrototype
349+
350+
@operation_prototype.setter
351+
def operation_prototype(self, fragment):
352+
self.__setitem__('operationPrototype', fragment)
353+
354+
@property
355+
def start_datetime(self) -> datetime:
356+
"""Convert the operation's start time to a Python datetime object.
357+
358+
Returns:
359+
Standard Python datetime object for the operation's start time.
360+
"""
361+
return super()._to_datetime(self._u_start_time)
362+
363+
@classmethod
364+
def from_json(cls, json) -> BulkOperation:
365+
""" Build a new Operation instance from Cumulocity JSON.
366+
367+
The JSON is assumed to be in the format as it is used by the
368+
Cumulocity REST API.
369+
370+
Params:
371+
json (dict): JSON object (nested dictionary)
372+
representing an operation within Cumulocity
373+
374+
Returns:
375+
Operation object
376+
"""
377+
obj = cls._from_json(json, BulkOperation())
378+
return obj
379+
380+
def create(self) -> BulkOperation:
381+
""" Store the Bulk Operation within the database.
382+
383+
Returns: A fresh BulkOperation object representing what was
384+
created within the database (including the ID).
385+
"""
386+
return self._create()
387+
388+
def update(self) -> BulkOperation:
389+
"""Update the BulkOperation within the database.
390+
391+
Returns: A fresh BulkOperation object representing the updated
392+
object within the database (including the ID).
393+
"""
394+
return super()._update()
395+
396+
397+
class BulkOperations(CumulocityResource):
398+
""" A wrapper for the standard Bulk Operation API.
399+
400+
This class can be used for get, search for, create, update and
401+
delete bulk operations within the Cumulocity database.
402+
403+
See also: https://cumulocity.com/api/core/#tag/Bulk-operations
404+
"""
405+
406+
def __init__(self, c8y: CumulocityRestApi):
407+
super().__init__(c8y, 'devicecontrol/bulkoperations')
408+
409+
def get(self, operation_id: str | int) -> BulkOperation:
410+
""" Read a specific bulk operation from the database.
411+
412+
params:
413+
operation_id (str|int): database ID of a bulk operation
414+
415+
Returns:
416+
BulkOperation object
417+
418+
Raises:
419+
KeyError: If the ID cannot be resolved.
420+
"""
421+
operation = BulkOperation.from_json(self._get_object(operation_id))
422+
operation.c8y = self.c8y # inject c8y connection into instance
423+
return operation
424+
425+
def select(self, limit: int = None, page_size: int = 1000) -> Generator[BulkOperation]:
426+
""" Query the database for operations and iterate over the results.
427+
428+
This function is implemented in a lazy fashion - results will only be
429+
fetched from the database as long there is a consumer for them.
430+
431+
All parameters are considered to be filters, limiting the result set
432+
to objects which meet the filters' specification. Filters can be
433+
combined (within reason).
434+
435+
Params:
436+
limit (int): Limit the number of results to this number.
437+
page_size (int): Define the number of operations which are
438+
read (and parsed in one chunk). This is a performance
439+
related setting.
440+
441+
Returns:
442+
Generator[BulkOperation]: Iterable of matching BulkOperation objects
443+
"""
444+
base_query = self._build_base_query(page_size=page_size)
445+
return super()._iterate(base_query, limit, BulkOperation.from_json)
446+
447+
def get_all(self, limit: int = None, page_size: int = 1000) -> List[BulkOperation]:
448+
""" Query the database for bulk operations and return the results
449+
as list.
450+
451+
This function is a greedy version of the select function. All
452+
available results are read immediately and returned as list.
453+
454+
Returns:
455+
List of matching BulkOperation objects
456+
"""
457+
return list(self.select(limit=limit, page_size=page_size))
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright (c) 2020 Software AG,
2+
# Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA,
3+
# and/or its subsidiaries and/or its affiliates and/or their licensors.
4+
# Use, reproduction, transfer, publication or disclosure is prohibited except
5+
# as specifically provided for in your License Agreement with Software AG.
6+
7+
import time
8+
9+
from c8y_api import CumulocityApi
10+
from c8y_api.model import BulkOperation, DeviceGroup, Operation
11+
12+
13+
def test_CRU(live_c8y: CumulocityApi, sample_device): # noqa
14+
"""Verify that basic creation, lookup and update of Operations works as expected."""
15+
16+
# (1) Create a device group for the sample device
17+
group:DeviceGroup = DeviceGroup(live_c8y,
18+
root=True,
19+
name=sample_device.name + '_Group').create()
20+
group.add_child_asset(sample_device)
21+
22+
23+
# (2) create bulk operation
24+
bulk:BulkOperation = BulkOperation(live_c8y,
25+
group_id=group.id,
26+
start_time='now',
27+
creation_ramp=1,
28+
operation_prototype={
29+
'description': f"Update firmware for device group '{group.name}'.",
30+
'c8y_FirmWare': {
31+
'version': '1.0.0'
32+
}}
33+
).create()
34+
35+
# wait for the bulk operation to be processed
36+
time.sleep(5)
37+
38+
# (3) initially the status should be EXECUTING/COMPLETED as all
39+
# child operations should have been created but not completed
40+
bulk = live_c8y.bulk_operations.get(bulk.id)
41+
assert bulk.general_status == BulkOperation.GeneralStatus.EXECUTING
42+
assert bulk.status == BulkOperation.Status.COMPLETED
43+
assert bulk.progress.all == 1
44+
assert bulk.progress.pending == 1
45+
46+
# (4) find child operations
47+
op = live_c8y.operations.get_all(bulk_id=bulk.id)[0]
48+
assert op.status == Operation.Status.PENDING
49+
50+
# (5) kill child operations
51+
op.status = Operation.Status.FAILED
52+
op.update()
53+
54+
# (5) bulk operation should now be in PENDING state
55+
bulk = live_c8y.bulk_operations.get(bulk.id)
56+
assert bulk.general_status in [BulkOperation.GeneralStatus.COMPLETED_WITH_FAILURES,
57+
BulkOperation.GeneralStatus.FAILED]
58+
assert bulk.progress.all == 1
59+
assert bulk.progress.failed == 1
60+
61+
# (6) cleanup
62+
# The bulk operation cannot be deleted physically
63+
group.delete()

0 commit comments

Comments
 (0)