App Engine上的后台任务
如何在App Engine上运行后台任务?
解决方案
GAE是构建可伸缩Web应用程序的非常有用的工具。许多人指出的限制中,很少有不支持后台任务,缺乏周期性任务以及对每个HTTP请求花费多少时间的严格限制,如果请求超过该时间限制,操作将被终止,这使得运行耗时的任务成为不可能。
如何运行后台任务?
在GAE中,仅当有HTTP请求时才执行代码。代码需要多长时间有严格的时间限制(我认为是10秒)。因此,如果没有请求,则不会执行代码。建议的解决方法之一是使用外部设备连续发送请求,这样就创建了后台任务。但是为此,我们需要一个外部盒子,现在我们又依赖于一个元素。另一种选择是发送302重定向响应,以便客户端重新发送请求,这也使我们依赖于作为客户端的外部元素。如果该外部设备是GAE本身怎么办?使用过不支持该语言中的循环构造的功能语言的每个人都知道替代方法,即,递归是循环的替代品。那么,如果我们完成了部分计算并在很短的超时时间内说出1秒钟对同一个URL进行HTTP GET怎么办?这会在apache上运行的php代码上创建一个循环(递归)。
<?php $i = 0; if(isset($_REQUEST["i"])){ $i= $_REQUEST["i"]; sleep(1); } $ch = curl_init("http://localhost".$_SERVER["PHP_SELF"]."?i=".($i+1)); curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_TIMEOUT, 1); curl_exec($ch); print "hello world\n"; ?>
有些方法在GAE上不起作用。那么,如果我们对其他url进行HTTP GET操作,比如说url2,那么对第一个url执行HTTP GET操作呢?这似乎在GAE中有效。此代码看起来像这样。
class FirstUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url2') class SecondUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url1') application = webapp.WSGIApplication([('/url1', FirstUrl), ('/url2', SecondUrl)]) def main(): run_wsgi_app(application) if __name__ == "__main__": main()
由于我们找到了一种运行后台任务的方法,因此让我们为周期性任务(计时器)建立抽象,并为跨越许多HTTP请求(foreach)的循环构造建立结构。
计时器
现在,构建计时器非常简单。基本思想是拥有计时器列表以及应调用每个计时器的间隔。一旦达到该间隔,请调用回调函数。我们将使用内存缓存来维护计时器列表。为了找出何时调用回调,我们将密钥存储在内存缓存中,并将间隔作为到期时间。我们定期(例如5秒)检查该键是否存在,如果不存在,则调用回调并再次设置该键的间隔。
def timer(func, interval): timerlist = memcache.get('timer') if(None == timerlist): timerlist = [] timerlist.append({'func':func, 'interval':interval}) memcache.set('timer-'+func, '1', interval) memcache.set('timer', timerlist) def checktimers(): timerlist = memcache.get('timer') if(None == timerlist): return False for current in timerlist: if(None == memcache.get('timer-'+current['func'])): #reset interval memcache.set('timer-'+current['func'], '1', current['interval']) #invoke callback function try: eval(current['func']+'()') except: pass return True return False
佛瑞奇
当我们想进行长时间计算时,例如对1000个数据库行执行某些操作或者获取1000个url等,这是需要的。基本思想是维护memcache中的回调和参数列表,并每次使用参数调用回调。
def foreach(func, args): looplist = memcache.get('foreach') if(None == looplist): looplist = [] looplist.append({'func':func, 'args':args}) memcache.set('foreach', looplist) def checkloops(): looplist = memcache.get('foreach') if(None == looplist): return False if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): arg = looplist[0]['args'].pop(0) func = looplist[0]['func'] if(len(looplist[0]['args']) == 0): looplist.pop(0) if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): memcache.set('foreach', looplist) else: memcache.delete('foreach') try: eval(func+'('+repr(arg)+')') except: pass return True else: return False # instead of # foreach index in range(0, 1000): # someoperaton(index) # we will say # foreach('someoperaton', range(0, 1000))
现在,构建一个每隔一小时获取一次网址列表的程序就很简单了。这是代码。
def getone(url): try: result = urlfetch.fetch(url) if(result.status_code == 200): memcache.set(url, '1', 60*60) #process result.content except : pass def getallurl(): #list of urls to be fetched urllist = ['http://www.google.com/', 'http://www.cnn.com/', 'http://www.yahoo.com', 'http://news.google.com'] fetchlist = [] for url in urllist: if (memcache.get(url) is None): fetchlist.append(url) #this is equivalent to #for url in fetchlist: getone(url) if(len(fetchlist) > 0): foreach('getone', fetchlist) #register the timer callback timer('getallurl', 3*60)
完整的代码在这里http://groups.google.com/group/httpmr-discuss/t/1648611a54c01aa
我在appengine上运行此代码已有几天,没有太大问题。
警告:我们大量使用urlfetch。每天的urlfetch限制为160000。因此请注意不要达到该限制。
即将发布的运行时版本将具有某种定期执行引擎a'la cron。请在AppEngine组上查看此消息。
So, all the SDK pieces appear to work, but my testing indicates this isn't running on the production servers yet-- I set up an "every 1 minutes" cron that logs when it runs, and it hasn't been called yet
不过,很难说何时可用。
我们可以使用Task Queue Python API。