Commit 85ce8b66 authored by Dominik Rosiek's avatar Dominik Rosiek

working adds and select all from ddb

parent 274b5ab8
......@@ -82,9 +82,20 @@ class DDBUUIDField(DDBField):
return value
class DDBStringField(DDBField):
AMAZON_TYPE = 'S'
@classmethod
def _validate(cls, value):
if isinstance(value, str):
return value
return str(value)
class DDBTable(object):
TABLE_NAME = 'measurements'
TABLE_NAME = ''
REGION_NAME = 'eu-central-1'
KEY_SCHEMA = []
LOCAL_SECONDARY_INDEXES = []
......@@ -189,56 +200,166 @@ class DDBTable(object):
val[self.FIELDS[key].AMAZON_TYPE])
return data
@gen.coroutine
def get_all(self):
message = yield gen.Task(self._dynamodb(operation='Scan').call,
TableName=self._get_table_name())
data = [self.decode_item(item=item) for item in message['Items']]
raise gen.Return(data)
class DDBstations(DDBTable):
TABLE_NAME = 'stations'
KEY_SCHEMA = [{
'AttributeName': 'station_id',
'KeyType': 'HASH',
}]
PROVISIONED_THROUGHPUT = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
FIELDS = {
'station_id': DDBUUIDField,
'city': DDBStringField,
'longitude': DDBIntField,
'latitude': DDBIntField,
'name': DDBStringField
}
@gen.coroutine
def add(self, city, longitude, latitude, name):
station_id = uuid.uuid4()
data = {
'city': city,
'longitude': longitude,
'latitude': latitude,
'name':name
}
message = yield gen.Task(self._dynamodb(operation='UpdateItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'station_id': station_id}),
AttributeUpdates=self.encode_item(data=data, update=True))
raise gen.Return(message)
@gen.coroutine
def get(self, station_id):
message = yield gen.Task(self._dynamodb(operation='GetItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'station_id': station_id})
)
data = self.decode_item(item=message['Item'])
raise gen.Return(data)
class DDBtypes(DDBTable):
TABLE_NAME = 'types'
KEY_SCHEMA = [{
'AttributeName': 'type_id',
'KeyType': 'HASH',
}]
PROVISIONED_THROUGHPUT = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
FIELDS = {
'type_id': DDBUUIDField,
'shortname': DDBStringField,
'unit': DDBStringField,
'norm': DDBIntField,
'longname': DDBStringField,
'descriptions': DDBStringField
}
@gen.coroutine
def add(self, shortname, unit, norm, longname, descriptions):
type_id = uuid.uuid4()
data = {
'shortname': shortname,
'unit': unit,
'norm': norm,
'longname': longname,
'descriptions':descriptions
}
message = yield gen.Task(self._dynamodb(operation='UpdateItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'type_id': type_id}),
AttributeUpdates=self.encode_item(data=data, update=True))
raise gen.Return(message)
@gen.coroutine
def get(self, type_id):
message = yield gen.Task(self._dynamodb(operation='GetItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'type_id': type_id})
)
data = self.decode_item(item=message['Item'])
raise gen.Return(data)
raise gen.Return(data)
class DDBmeasurements(DDBTable):
TABLE_NAME = 'measurements'
KEY_SCHEMA = [{
'AttributeName': 'sid',
'KeyType': 'Number',
},{
'AttributeName': 'tid',
'KeyType': 'Number',
'AttributeName': 'measurement_id',
'KeyType': 'HASH',
}]
PROVISIONED_THROUGHPUT = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
FIELDS = {
'sid': DDBIntField,
'tid': DDBIntField,
'measurement_id': DDBUUIDField,
'station_id': DDBUUIDField,
'type_id': DDBUUIDField,
'value': DDBIntField,
'time': DDBIntField
}
@gen.coroutine
def update(self, measurement_id, type_id, station_id):
def add(self, station_id, type_id, value, measurement_time):
measurement_id = uuid.uuid4()
data = {
'station_id': uuid.uuid4(),
'type_id': uuid.uuid4(),
'value': value,
'time': measurement_time
}
message = yield gen.Task(self._dynamodb(operation='UpdateItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'tid': type_id, 'sid': station_id}),
AttributeUpdates=self.encode_item(data={'time': time.time()}, update=True))
Key=self.encode_item(data={'measurement_id': measurement_id}),
AttributeUpdates=self.encode_item(data=data, update=True))
raise gen.Return(message)
@gen.coroutine
def get(self, tid=1, sid=1):
def get(self, measurement_id):
message = yield gen.Task(self._dynamodb(operation='GetItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'tid': tid, 'sid': sid}))
Key=self.encode_item(data={'measurement_id': measurement_id})
)
data = self.decode_item(item=message['Item'])
raise gen.Return(data)
if __name__ == '__main__':
measurement_id = uuid.uuid4()
types = DDBtypes()
types.create_table()
types.add(1, 2, 3, 4, 5)
ioloop = IOLoop.instance()
result = ioloop.run_sync(partial(types.get_all))
print(result)
measurements = DDBmeasurements()
measurements.create_table()
measurements.add(1, 2, 3, 4)
ioloop = IOLoop.instance()
result = ioloop.run_sync(partial(measurements.get_all))
print(result)
# You still can run code synchronous if required
measurements.update(measurement_id=measurement_id, station_id=1, type_id=7)
# run asynchronous with callback
measurements.get()
# You even can run methods wrapped with @coroutine synchronously
stations = DDBstations()
stations.create_table()
stations.add(1, 2, 3, 4)
ioloop = IOLoop.instance()
result = ioloop.run_sync(partial(measurements.get, sid=1, tid=7))
result = ioloop.run_sync(partial(stations.get_all))
print(result)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment