Posting douban saying using python / 用python發送豆瓣廣播
Finally I got sometime to work on implementing my own python script for posting douban saying, again. This time I used the library written by Leah Culver, and everything went smoothly. It’s just that I have no idea why douban chose to use a different signing algorithm(if a thing as stupid as it is can be called a algorithm) for requesting access token.
終於又有時間再一次捯鼓豆瓣. 這次在 Leah Culver 寫的 library 的基礎上, 終於把python腳本發送豆瓣廣播終於搞定了. 唯一理解不能的事就是豆瓣在獲取access token的時候用了一個莫名其妙的簽名算法(如果這種也能叫做算法的話 = =).
Click on read more to view the actual code of the script.
查看腳本內容請點擊 read more
#!/usr/bin/env python import binascii import cgi import hashlib import hmac import httplib import os import random import sys import time import urllib SERVER = 'api.douban.com' API_KEY = 'your api key' SECRET = 'your api secret' OAUTH_SERVER = 'www.douban.com' REQUEST_TOKEN_URI = '/service/auth/request_token' AUTHORIZATION_URI = '/service/auth/authorize' ACCESS_TOKEN_URI = '/service/auth/access_token' SAYING_URI = '/miniblog/saying' SIG_METHOD = 'HMAC-SHA1' OAUTH_VER = '1.0' SCHEME = 'http' ACCESS_TOKEN_FILE = 'douban_access_token' def escape(s): return urllib.quote(s, safe='~') def generate_timestamp(): return str(int(time.time())) def generate_nonce(length=8): return ''.join([str(random.randint(0, 9)) for i in range(length)]) def normalize_params(params): key_values = [(escape(k), escape(v)) for k,v in params.items()] key_values.sort() return '&'.join(['%s=%s' % (k, v) for k, v in key_values]) def sign(method, url, params, secret, token_secret): sig = ( escape(method.upper()), escape(SCHEME + '://' + url), escape(normalize_params(params)) ) key = escape(secret) + '&' if url == OAUTH_SERVER + ACCESS_TOKEN_URI: #I have no idea why the fuck douban uses concatenated secrets as signature instead of the computed one return secret + '&' + token_secret if token_secret: key += escape(token_secret) base_string = '&'.join(sig) return binascii.b2a_base64(hmac.new(key, base_string, hashlib.sha1).digest())[:-1] def generate_header(method, url, params, key, token): header = 'OAuth realm=""' params['oauth_version'] = OAUTH_VER signature = sign(method, url, params, key, token) params['oauth_signature'] = signature key_values = [(k, v) for k,v in params.items()] key_values.sort() for k, v in key_values: header += ', %s="%s"' % (k, escape(v)) return {"Authorization": header} def create_connection(server): return httplib.HTTPConnection(server) def read_access_token_file(): if os.access(ACCESS_TOKEN_FILE, os.F_OK): token_file = open(ACCESS_TOKEN_FILE, 'r') data = token_file.read() data = cgi.parse_qs(data, keep_blank_values=False) token = {} for k, v in data.iteritems(): token[k] = urllib.unquote(v[0]) token_file.close() return token else: return {} def write_access_token_file(data): if os.access('./', os.W_OK): token_file = open(ACCESS_TOKEN_FILE, 'w') token_file.write(data) token_file.close() print "Token file successfully updated" else: print "Cannot write to file, please confirm you have write permission" class douban(object): def __init__(self): self.key = API_KEY self.secret = SECRET self.stored_token = read_access_token_file() def update_stored_token(self, data): data = cgi.parse_qs(data, keep_blank_values=False) for k, v in data.iteritems(): self.stored_token[k] = urllib.unquote(v[0]) def get_request_token(self): conn = create_connection(OAUTH_SERVER) params = { 'oauth_consumer_key': self.key, 'oauth_signature_method': SIG_METHOD, 'oauth_timestamp': generate_timestamp(), 'oauth_nonce': generate_nonce() } header = generate_header('GET', OAUTH_SERVER + REQUEST_TOKEN_URI, params, self.secret, None) conn.request("GET", REQUEST_TOKEN_URI, headers=header) response = conn.getresponse() data = response.read() if response.status == 200: self.update_stored_token(data) self.authorize_token() else: print "%s %s\n%s" % (response.status, response.reason, response.read()) conn.close() def authorize_token(self): print "Open the link below to authorize the request token:" print "http://%s%s?oauth_token=%s" % (OAUTH_SERVER, AUTHORIZATION_URI, escape(self.stored_token['oauth_token'])) raw_input("Press enter to continue") self.get_access_token() def get_access_token(self): conn = create_connection(OAUTH_SERVER) params = { 'oauth_consumer_key': self.key, 'oauth_token': self.stored_token['oauth_token'], 'oauth_sgnature_method': SIG_METHOD, 'oauth_timestamp': generate_timestamp(), 'oauth_nonce': generate_nonce() } header = generate_header("GET", OAUTH_SERVER + ACCESS_TOKEN_URI, params, self.secret, self.stored_token['oauth_token_secret']) conn.request("GET", ACCESS_TOKEN_URI, headers=header) response = conn.getresponse() if response.status == 200: data = response.read() write_access_token_file(data) else: print "%s %s\n%s" % (response.status, response.reason, response.read()) def first_run(self): if 'douban_user_id' in self.stored_token: confirm = raw_input('token file exists arleady, overwrite? ([y]/n):') if confirm.upper()[0] == 'N': sys.exit(0) self.get_request_token() def add_saying(self, content): if 'douban_user_id' not in self.stored_token: print 'Access token file does not exist' return None entry = '<?xml version=\'1.0\' encoding=\'UTF-8\'?>'\ + '<entry xmlns:ns0="http://www.w3.org/2005/Atom" xmlns:db="http://www.douban.com/xmlns/">'\ + '<content>' + content + '</content>'\ + '</entry>' conn = create_connection(SERVER) params = { 'oauth_consumer_key': self.key, 'oauth_token': self.stored_token['oauth_token'], 'oauth_signature_method': SIG_METHOD, 'oauth_timestamp': generate_timestamp(), 'oauth_nonce': generate_nonce() } header = generate_header("POST", SERVER + SAYING_URI, params, self.secret, self.stored_token['oauth_token_secret']) header['Content-Type'] = 'application/atom+xml' conn.request("POST", SAYING_URI, entry, header) response = conn.getresponse() if response.status != 201: print "%s %s\n%s" % (response.status, response.reason, response.read())

Thanks a lot for sharing your lib. Works like a charm!
给我围观下84大婶,忒强大了
@jimey
圍觀一下棒子天王
我是用ping.fm来发送豆瓣广播的
@于仁颇黎
ping.fm 沒有直接支持豆瓣