Commit 90106a46 authored by Maksymilian Chodacki's avatar Maksymilian Chodacki

database

parent 9402ff8f
import logging
import re
import six
import uuid
from functools import partial
from tornado import gen
from tornado.ioloop import IOLoop
from tornado_botocore import Botocore
logger = logging.getLogger()
class AmazonException(Exception):
def __init__(self, message, code='unknown'):
self.message = message
self.code = code
def __str__(self):
return self.message
class DDBException(AmazonException):
ITEM_ENCODE_ERROR = 'ItemEncodeError'
ITEM_DECODE_ERROR = 'ItemDecodeError'
class DDBField(object):
@classmethod
def _validate(cls, value):
raise NotImplementedError('Not implemented.')
@classmethod
def decode(cls, value):
try:
return cls._validate(value)
except (TypeError, ValueError):
raise DDBException(
message='Invalid value for {cls} decode.'.format(cls=cls.__name__),
code=DDBException.ITEM_DECODE_ERROR)
@classmethod
def encode(cls, value):
try:
return str(cls._validate(value))
except (TypeError, ValueError):
raise DDBException(
message='Invalid value for {cls} encode.'.format(cls=cls.__name__),
code=DDBException.ITEM_ENCODE_ERROR)
class DDBIntField(DDBField):
AMAZON_TYPE = 'N'
@classmethod
def _validate(cls, value):
if isinstance(value, int):
return value
return int(value)
class DDBUUIDField(DDBField):
AMAZON_TYPE = 'S'
_UUID_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
@classmethod
def _validate(cls, value):
if not isinstance(value, str):
value = str(value)
if cls._UUID_REGEXP.match(value) is None:
raise ValueError('UUID required.')
return value
class DDBTable(object):
TABLE_NAME = ''
REGION_NAME = 'us-west-2'
KEY_SCHEMA = []
LOCAL_SECONDARY_INDEXES = []
GLOBAL_SECONDARY_INDEXES = []
PROVISIONED_THROUGHPUT = {}
FIELDS = {}
_AMAZON_SESSION = None
def _get_table_name(self):
return self.TABLE_NAME
def _get_table_kwargs(self):
key_fields = set()
for key in self.KEY_SCHEMA:
key_fields.add(key['AttributeName'])
for index in self.LOCAL_SECONDARY_INDEXES:
for key in index['KeySchema']:
key_fields.add(key['AttributeName'])
for index in self.GLOBAL_SECONDARY_INDEXES:
for key in index['KeySchema']:
key_fields.add(key['AttributeName'])
attribute_definitions = []
for field_name in key_fields:
attribute_definitions.append({
'AttributeName': field_name,
'AttributeType': self.FIELDS[field_name].AMAZON_TYPE
})
kwargs = {
'TableName': self._get_table_name(),
'AttributeDefinitions': attribute_definitions,
'KeySchema': self.KEY_SCHEMA,
'ProvisionedThroughput': self.PROVISIONED_THROUGHPUT,
}
if getattr(self, 'LOCAL_SECONDARY_INDEXES', None):
kwargs['LocalSecondaryIndexes'] = self.LOCAL_SECONDARY_INDEXES
if getattr(self, 'GLOBAL_SECONDARY_INDEXES', None):
kwargs['GlobalSecondaryIndexes'] = self.GLOBAL_SECONDARY_INDEXES
return kwargs
def _get_endpoint_url(self):
return None
def _dynamodb(self, operation):
if DDBTable._AMAZON_SESSION is None:
ddb_operation = Botocore(
service='dynamodb', operation=operation,
region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url())
DDBTable._AMAZON_SESSION = ddb_operation.session
else:
ddb_operation = Botocore(
service='dynamodb', operation=operation,
region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url(),
session=DDBTable._AMAZON_SESSION)
return ddb_operation
def create_table(self):
try:
message = self._dynamodb(operation='DescribeTable').call(
TableName=self._get_table_name())
except AmazonException as e:
if e.code != 'ResourceNotFoundException':
raise e
logger.warning('Creation {table_name} table ...'.format(
table_name=self._get_table_name()))
message = self._dynamodb(operation='CreateTable').call(
**self._get_table_kwargs())
else:
logger.warning('{table_name} table already exists.'.format(
table_name=self._get_table_name()))
def encode_item(self, data, keys=None, update=False):
if not data:
return {}
keys = keys or data.keys()
item = {}
for key in keys:
if key not in data:
continue
val = self.FIELDS[key].encode(value=data[key])
if update:
item[key] = {
'Value': {
self.FIELDS[key].AMAZON_TYPE: val
},
'Action': 'PUT'
}
else:
item[key] = {
self.FIELDS[key].AMAZON_TYPE: val
}
return item
def decode_item(self, item, keys=None):
data = {}
for key, val in six.iteritems(item):
if key not in self.FIELDS:
continue
if keys and key not in keys:
continue
data[key] = self.FIELDS[key].decode(
val[self.FIELDS[key].AMAZON_TYPE])
return data
class DDBmeasurements(DDBTable):
TABLE_NAME = 'measurements'
KEY_SCHEMA = [{
'AttributeName': 'measurement_id',
'KeyType': 'HASH',
}]
PROVISIONED_THROUGHPUT = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
FIELDS = {
'measurement_id': DDBUUIDField,
'type_id': DDBIntField,
'station_id': DDBIntField,
}
@gen.coroutine
def update(self, measurement_id, type_id, station_id):
message = yield gen.Task(self._dynamodb(operation='UpdateItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'measurement_id': measurement_id}),
AttributeUpdates=self.encode_item(data={'type_id': type_id, 'station_id': station_id}, update=True))
raise gen.Return(message)
@gen.coroutine
def get(self, measurement_id, type_id, station_id):
message = yield gen.Task(self._dynamodb(operation='GetItem').call,
TableName=self._get_table_name(),
Key=self.encode_item(data={'measurement_id': measurement_id}))
data = self.decode_item(item=message['Item'])
raise gen.Return(data)
if __name__ == '__main__':
measurement_id = uuid.uuid4()
measurements = DDBmeasurements()
measurements.create_table()
# You still can run code synchronous if required
measurements.update_(measurement_id=measurement_id, station_id=1, type_id=1)
# run asynchronous with callback
measurements.get_(measurement_id=measurement_id, callback=print)
# You even can run methods wrapped with @coroutine synchronously
ioloop = IOLoop.instance()
result = ioloop.run_sync(partial(measurements.get, measurement_id=measurement_id))
print(result)
from db import DDBmeasurements
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.options
class MeasurementHandler(web.RequestHandler):
@gen.coroutine
def post(self, measurement_id):
station_id = self.get_body_argument('station_id')
type_id = self.get_body_argument('type_id')
measurement = DDBUserWallet()
yield user_wallet.update(user_id=user_id, balance=int(balance))
self.write('Updated\n')
@gen.coroutine
def get(self, user_id):
measurement = DDBmeasurements()
response = yield measurement.get(measurement_id=measurement_id)
self.write('{station_id}\n'.format(station_id=response['station_id']))
self.write('{type_id}\n'.format(type_id=response['type_id']))
application = web.Application([
(r'/measurement/(?P<measurement_id>\w{8}-\w{4}-\w{4}-\w{4}-\w{12})', MeasurementHandler),
], debug=True)
if __name__ == "__main__":
DDBmeasurements().create_table()
application.listen(80)
#ioloop.IOLoop.instance().start()
tornado.ioloop.IOLoop.current().start()
tornado == 4.4.2
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment