- # -*- coding: utf-8 -*-
- import collections
- import gzip
- import urllib
- import urllib2
- from urlparse import urlparse
- try:
- from cStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
- try:
- import json
- except ImportError:
- import simplejson as json
- __author__ = 'myth'
- _HTTP_GET = 1
- _HTTP_POST = 2
- # 超时时间(秒)
- TIMEOUT = 45
- RETURN_TYPE = {"json": 0, "xml": 1, "html": 2, "text": 3}
- _METHOD_MAP = {'GET': _HTTP_GET, 'POST': _HTTP_POST}
- class APIError(StandardError):
- """
- raise APIError if receiving json message indicating failure.
- """
- def __init__(self, error_code, error, request):
- self.error_code = error_code
- self.error = error
- self.request = request
- StandardError.__init__(self, error)
- def __str__(self):
- return 'APIError: %s: %s, request: %s' % (self.error_code, self.error, self.request)
- # def callback_type(return_type='json'):
- #
- # default_type = "json"
- # default_value = RETURN_TYPE.get(default_type)
- # if return_type:
- # if isinstance(return_type, (str, unicode)):
- # default_value = RETURN_TYPE.get(return_type.lower(), default_value)
- # return default_value
- def _format_params(_pk=None, encode=None, **kw):
- """
- :param kw:
- :type kw:
- :param encode:
- :type encode:
- :return:
- :rtype:
- """
- __pk = '%s[%%s]' % _pk if _pk else '%s'
- encode = encode if encode else urllib.quote
- for k, v in kw.iteritems():
- _k = __pk % k
- if isinstance(v, basestring):
- qv = v.encode('utf-8') if isinstance(v, unicode) else v
- _v = encode(qv)
- yield _k, _v
- elif isinstance(v, collections.Iterable):
- if isinstance(v, dict):
- for _ck, _cv in _format_params(_pk=_k, encode=encode, **v):
- yield _ck, _cv
- else:
- for i in v:
- qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
- _v = encode(qv)
- yield _k, _v
- else:
- qv = str(v)
- _v = encode(qv)
- yield _k, _v
- def encode_params(**kw):
- """
- do url-encode parameters
- >>> encode_params(a=1, b='R&D')
- 'a=1&b=R%26D'
- >>> encode_params(a=u'\\u4e2d\\u6587', b=['A', 'B', 123])
- 'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'
- >>> encode_params(**{
- 'a1': {'aa1': 1, 'aa2': {'aaa1': 11}},
- 'b1': [1, 2, 3, 4],
- 'c1': {'cc1': 'C', 'cc2': ['Q', 1, '@'], 'cc3': {'ccc1': ['s', 2]}}
- })
- 'a1[aa1]=1&a1[aa2][aaa1]=11&c1[cc1]=C&c1[cc3][ccc1]=s&c1[cc3][ccc1]=2&c1[cc2]=Q&c1[cc2]=1&c1[cc2]=%40&b1=1&b1=2&b1=3&b1=4'
- """
- # args = []
- # for k, v in kw.iteritems():
- # if isinstance(v, basestring):
- # qv = v.encode('utf-8') if isinstance(v, unicode) else v
- # args.append('%s=%s' % (k, urllib.quote(qv)))
- # elif isinstance(v, collections.Iterable):
- # for i in v:
- # qv = i.encode('utf-8') if isinstance(i, unicode) else str(i)
- # args.append('%s=%s' % (k, urllib.quote(qv)))
- # else:
- # qv = str(v)
- # args.append('%s=%s' % (k, urllib.quote(qv)))
- # return '&'.join(args)
- args = []
- _urlencode = kw.pop('_urlencode', urllib.quote)
- for k, v in _format_params(_pk=None, encode=_urlencode, **kw):
- args.append('%s=%s' % (k, v))
- args = sorted(args, key=lambda s: s.split("=")[0])
- return '&'.join(args)
- def _read_body(obj):
- using_gzip = obj.headers.get('Content-Encoding', '') == 'gzip'
- body = obj.read()
- if using_gzip:
- gzipper = gzip.GzipFile(fileobj=StringIO(body))
- fcontent = gzipper.read()
- gzipper.close()
- return fcontent
- return body
- class JsonDict(dict):
- """
- general json object that allows attributes to be bound to and also behaves like a dict
- """
- def __getattr__(self, attr):
- try:
- return self[attr]
- except KeyError:
- raise AttributeError(r"'JsonDict' object has no attribute '%s'" % attr)
- def __setattr__(self, attr, value):
- self[attr] = value
- def _parse_json(s):
- """
- parse str into JsonDict
- """
- def _obj_hook(pairs):
- """
- convert json object to python object
- """
- o = JsonDict()
- for k, v in pairs.iteritems():
- o[str(k)] = v
- return o
- return json.loads(s, object_hook=_obj_hook)
- def _parse_xml(s):
- """
- parse str into xml
- """
- raise NotImplementedError()
- def _parse_html(s):
- """
- parse str into html
- """
- raise NotImplementedError()
- def _parse_text(s):
- """
- parse str into text
- """
- raise NotImplementedError()
- def _http_call(the_url, method, return_type="json", request_type=None, request_suffix=None, headers={}, _timeout=30, **kwargs):
- """
- the_url: 请求地址
- method 请求方法(get,post)
- return_type: 返回格式解析
- request_suffix: 请求地址的后缀,如jsp,net
- _timeout: 超时时间
- kwargs: 请求参数
- """
- http_url = "%s.%s" (the_url, request_suffix) if request_suffix else the_url
- if request_type == 'json':
- headers['Content-Type'] = 'application/json'
- # method = _HTTP_POST
- # json_data = json.dumps(kwargs)
- # # convert str to bytes (ensure encoding is OK)
- # params = json_data.encode('utf-8')
- params = json.dumps(kwargs)
- else:
- params = encode_params(**kwargs)
- http_url = '%s?%s' % (http_url, params) if method == _HTTP_GET else http_url
- print http_url
- # u = urlparse(http_url)
- # headers.setdefault("host", u.hostname)
- http_body = None if method == _HTTP_GET else params
- req = urllib2.Request(http_url, data=http_body, headers=headers)
- callback = globals().get('_parse_{0}'.format(return_type))
- if not hasattr(callback, '__call__'):
- print "return '%s' unable to resolve" % return_type
- callback = _parse_json
- try:
- resp = urllib2.urlopen(req, timeout=_timeout if _timeout else TIMEOUT)
- body = _read_body(resp)
- r = callback(body)
- # if hasattr(r, 'error_code'):
- # raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
- return r
- except urllib2.HTTPError, e:
- try:
- body = _read_body(e)
- r = callback(body)
- return r
- except:
- r = None
- # if hasattr(r, 'error_code'):
- # raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
- raise e
- class HttpObject(object):
- def __init__(self, client, method):
- self.client = client
- self.method = method
- def __getattr__(self, attr):
- def wrap(**kw):
- if attr:
- the_url = '%s/%s' % (self.client.api_url, attr.replace('__', '/'))
- else:
- the_url = self.client.api_url
- return _http_call(the_url, self.method, **kw)
- return wrap
- def __call__(self, **kw):
- return _http_call(self.client.api_url, self.method, **kw)
- class APIClient(object):
- """
- 使用方法:
- 比如:api 请求地址为:http://api.open.zbjdev.com/kuaiyinserv/kuaiyin/billaddress
- 请求方式为: GET
- 需要的参数为:user_id 用户的UID
- is_all 是否查询所有数据,0为默认邮寄地址 1为全部邮寄地址
- access_token 平台认证
- 返回数据为:json
- 那么此时使用如下:
- domain = "api.open.zbjdev.com"
- #如果是https请求,需要将is_https设置为True
- client = APIClient(domain)
- data = {"user_id": "14035462", "is_all": 1, "access_token": "XXXXXXXXXX"}
- # 如果是post请求,请将get方法改为post方法
- result = client.kuaiyinserv.kuaiyin.billaddress.get(return_type="json", **data)
- #等同于
- # result = client.kuaiyinserv__kuaiyin__billaddress__get(return_type="json", **data)
- # result = client.kuaiyinserv__kuaiyin__billaddress(return_type="json", **data)
- """
- def __init__(self, domain, is_https=False):
- http = "http"
- if domain.startswith("http://") or domain.startswith("https://"):
- http, domain = domain.split("://")
- # else:
- if is_https:
- http = "https"
- self.api_url = ('%s://%s' % (http, domain)).rstrip("/")
- self.get = HttpObject(self, _HTTP_GET)
- self.post = HttpObject(self, _HTTP_POST)
- def __getattr__(self, attr):
- if '__' in attr:
- method = self.get
- if attr[-6:] == "__post":
- attr = attr[:-6]
- method = self.post
- elif attr[-5:] == "__get":
- attr = attr[:-5]
- if attr[:2] == '__':
- attr = attr[2:]
- return getattr(method, attr)
- return _Callable(self, attr)
- class _Executable(object):
- def __init__(self, client, method, path):
- self._client = client
- self._method = method
- self._path = path
- def __call__(self, **kw):
- method = _METHOD_MAP[self._method]
- return _http_call('%s/%s' % (self._client.api_url, self._path), method, **kw)
- def __str__(self):
- return '_Executable (%s %s)' % (self._method, self._path)
- __repr__ = __str__
- class _Callable(object):
- def __init__(self, client, name):
- self._client = client
- self._name = name
- def __getattr__(self, attr):
- if attr == 'get':
- return _Executable(self._client, 'GET', self._name)
- if attr == 'post':
- return _Executable(self._client, 'POST', self._name)
- name = '%s/%s' % (self._name, attr)
- return _Callable(self._client, name)
- def __str__(self):
- return '_Callable (%s)' % self._name
- __repr__ = __str__
- def test_logistics():
- domain = "https://api.kuaidi100.com/api"
- #如果是https请求,需要将is_https设置为True
- client = APIClient(domain)
- data = {"id": "45f2d1f2sds", "com": "yunda", "nu": "1500066330925"}
- result = client.__get(_timeout=2, **data)
- print result
- print result["message"]
- print result.get("message")
- print result.message
- if __name__ == '__main__':
- # test_logistics()
- data = {
- "data":{
- "id": 1,
- "pid": 3,
- "name": u'中的阿斯顿'
- },
- "sign": 'asdfdsdsfsdf',
- }
- domain = "kuaiyin.zhubajie.com"
- client = APIClient(domain)
- # headers = {'Content-Type': 'application/json'}
- headers = {"host": domain}
- result = client.api.zhubajie.fz.info.post(return_type="json", request_type="json", headers=headers, **data)
- print result
- print result['data']['msg']
- c = APIClient('task.6.zbj.cn')
- r = getattr(c.api, 'kuaiyin-action-delcache').post(request_type="json",
- headers={},
- **{"sign": "",
- "data": {
- "product": "pvc",
- "category": "card",
- "req_type": "pack_list"
- }})
- print r
- print r['data']['msg']
- #该片段来自于http://www.codesnippet.cn/detail/250420149354.html
来源: http://www.codesnippet.cn/detail/250420149354.html