Streaming APIで取得したつぶやきの処理方法


かつて、Streaming APIで大量のつぶやきをリアルタイムに「保存する方法」を紹介しました(cURL編Python編)。つぶやきはJSON形式で保存していましたが、そこから何かを見出すためにはまず、JSON形式のデータを処理できなければなりません。ここではその方法を確認します。

Pythonで簡単に処理する方法を、@nokunoさんが紹介しています(Twitter Streaming APIの使い方)。cURLで取得したつぶやきは、1行に1つずつファイルに格納されるので、次のように1行ずつ取り出してJSONデータをパースすればいいようです(ここではtweet['text']、つまりつぶやきの本文だけを取り出しています)。

#!/usr/bin/env python
import sys, json
for line in sys.stdin:
    tweet = json.loads(line)
    if 'text' in tweet:
        print tweet['text'].encode('utf-8')

cURL編Python編でつぶやきを保存したファイルがresult.datだとすると、「python parse.py < ressult.dat」などとして本文だけを取り出すことができそうです。

Streaming APIのsample.jsonならこれでもいいのですが、filter.jsonを使うときには、条件に合うつぶやきが途絶えることがあります。そういうときには、30秒ごとにCR LFが送信されてきて、それがファイルに書かれることになるので、上のように1行ずつ処理しようとすると、ValueErrorが発生して処理が途中で止まってしまいます(cURLの場合)。ですから、次のようにValueErrorを無視しなければなりません(PythonのコードではJSON形式だけを出力していたので問題ありません)。追記:Streaming APIが最初にfriendsを返すようになったので、それを無視するために、StandardErrorを無視することにしました。

#!/usr/bin/env python
import sys, json
#from pprint import pprint

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        #print type(tweet['text'])
        print tweet['text'].encode('utf-8')
    except StandardError:
        pass

入力がstr型でも出力の一部はunicode型になったりするのが紛らわしい。Pythonを使うのはバージョン3が主流になってからにしたいですね、細かいことですが。

つぶやかれた日時はtweet['created_at']で取得できるのですが(このあたりのことはpprint()で確認できます)、この値は「Thu Dec 29 04:27:28 +0000 2011」のように標準時なので、日本ではちょっと使いにくいです。そこで、日本時間の年月日時分秒(20111231000000)のような表現に変換する関数を用意します。

#!/usr/bin/env python
import sys, json, time, calendar
#from pprint import pprint

def YmdHMS(created_at):
    time_utc = time.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
    unix_time = calendar.timegm(time_utc)
    time_local = time.localtime(unix_time)
    return int(time.strftime("%Y%m%d%H%M%S", time_local))

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        print YmdHMS(tweet['created_at']), tweet['text'].encode('utf-8')
    except StandardError:
        pass

リツイートを除外したいときは、次のようにすればいいでしょう。

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        if 'retweeted_status' not in tweet:
            print YmdHMS(tweet['created_at']), tweet['text'].encode('utf-8')
    except StandardError:
        pass

cURL編で書いたように、こういうことをしているのは、Ustreamなどで配信された動画の整理のためなので、保存したつぶやきは、時間を指定してとりだせるようにしておきましょう。

以下のようなスクリプト(parse.py)を使えば、「python parse.py 20111230150000 20111230174600 < result.dat」などとして、日本時間の2011年12月30日15時から2011年12月30日17時46分までのつぶやきだけを取得できます(日時の指定は省略してもかまいません)。

#!/usr/bin/env python
import sys, json, time, calendar
#from pprint import pprint

def YmdHMS(created_at):
    time_utc = time.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
    unix_time = calendar.timegm(time_utc)
    time_local = time.localtime(unix_time)
    return int(time.strftime("%Y%m%d%H%M%S", time_local))

argv = sys.argv
start_time = 0
end_time = 99999999999999
if 1 < len(argv):
    start_time = int(argv[1])
    end_time = int(argv[2])

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        if 'retweeted_status' not in tweet:
            tweet_time = YmdHMS(tweet['created_at'])
            if start_time <= tweet_time and tweet_time <= end_time:
                tweet_sec = tweet_time-start_time
                screen_name = tweet['user']['screen_name']
                text = tweet['text'].encode('utf-8')
                url = "https://twitter.com/#!/%s/status/%s"\
                    % (screen_name, tweet['id_str'])
                #print tweet_sec, url, text
                #print text
                t = time.strptime(str(tweet_time), "%Y%m%d%H%M%S")
                print time.strftime("%H:%M:%S", t), text
    except StandardError:
        pass

どのような形式で出力するかは、後で何に使いたいかで決まります。たとえばyouTube上でキャプションを付けたいなら、YouTubeがサポートする形式(参考)にあわせるといいでしょう。

実際に、「#iwakamiyasumi」というハッシュタグで検索し保存したつぶやきのうち、「年末特番 ジャーナリスト休業直前!上杉隆氏ラストインタビュー」の配信時間中(2011年12月30日15:00から17:45)のものを抜き出すと下のようになります(スクロールバーを表示できない環境では閲覧できないかもしれません)。

Streaming APIで大量のつぶやきをリアルタイムに保存する方法(Python編)


前提:Twitter Developersでアプリのconsumer_key等を取得しておく必要があります。詳しく知りたい人は「Twitter OAuth」などを調べるといいでしょう。

なぜこんな技術が必要なのかは、cURL編で書きました。

Streaming APIを使うときは、接続が切れてしまったときにいかに接続し直すかというのがキモなわけですが、cURLで簡単に実現する方法がわからなかったので、240秒ごとに強制的に接続し直すという方法を採用しました。この方法には、(1)再接続のときにつぶやきを取りこぼす危険と重複して取得する可能性があること、(2)接続が切れてから再接続を試みるまでに平均で120秒かかる、という問題がありました。

Streaming APIは、つぶやきが無いときには「CR LF」を30秒おきに送信してくるので、これをチェックすれば切断を検出できるのですが、検出と再接続の機能を自分で実装するのは大変なので、ライブラリに頼りましょう。

ここでは、PythonでTwitterを利用するためのライブラリ、Tweepyを使います。Ubuntuなら次のように簡単にインストールできます。

sudo apt-get install python-setuptools python-pip
sudo easy_install tweepy

CentOSなら、suで管理者になってから、

yum install python-setuptools
easy_install tweepy

Windowsでやりたいときは「Windows Python easy_install」などを調べてください。

Tweepyは切断の検出と再接続の機能を持っているので、Streaming APIで受け取ったデータを出力するプログラム(stream.py)は、サンプルを参考にして次のように簡単に書けます。StreamListenerで、つぶやきに対応するon_status()ではなくon_data()をオーバーライドしているのは、データをそのまま(JSON形式で)出力したいからです。とりあえず全部保存しておいて、必要なものをあとで取り出すというわけです。JSON以外のものがdataとして来る場合もあるので、最初が「{」のものだけをJSONデータと見なして出力するようにしています(ちょっとかっこわるい)。

# -*- coding: utf-8 -*-

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

consumer_key = ""#引用符の中にconsumer_keyの情報を記述する
consumer_secret = ""#引用符の中にconsumer_secretの情報を記述する

access_token = ""#引用符の中にaccess_tokenの情報を記述する
access_token_secret = ""#引用符の中にaccess_token_secretの情報を記述する

class StdOutListener(StreamListener):
    def on_data(self, data):
        if data.startswith("{"):
            print data
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    l = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    stream = Stream(auth, l)
    stream.filter(track = [keyword])#検索する場合
#    stream.sample()#ツイートのランダムサンプリングを取得する場合
#    stream.userstream()#タイムラインを取得する場合

consumer_keyとconsumer_secret、access_token、access_token_secret、つぶやきを絞り込むためのキーワード(例:「track = "Japan,USA"」、「track = U"日本"」。OR条件はカンマ、AND条件はスペースで連結)を入力して、「python stream.py >> result.dat」などとして実行すれば、条件に合うつぶやきがresult.datに書き込まれます。

「せっかくPython使っているんだから、この場でパースして・・・」と思うかもしれませんが、後で何が必要になるかわからないので、とりあえず生データを保存しておくのがいいでしょう(リアルタイムでアウトプットしたいという場合は別です)。ファイルではなくデータベースに保存してもいいのですが、話が複雑になるのでやめておきます(データベースを使う場合でも、後で何が必要になるかわからないので、全データを保存しておくのがいいでしょう)。

再接続できるからといって、通信の不安定な場所での利用はおすすめできません。通信の安定した環境に置いたサーバで動かしっぱなしにするのがいいでしょう。何らかの原因でプロセス自体が落ちたときのために、以下のようなスクリプトを使うといいかもしれません(通常の切断ではプロセスは落ちません)。

#!/usr/bin/env bash

while :
do
  python stream.py >> result.dat
  sleep 240
done

Streaming APIで取得したつぶやきの処理方法

ソーティングはComparatorを定義してやるくらいがちょうどいい


ソーティング(並び替え)のためにプログラムを書く人はあまりいません。プログラミングを習うとすぐに、ソーティングの例題に出会いますが、それはあくまでアルゴリズムの勉強のためのものであって、実際の問題でソートが必要なときは、言語に組み込まれた、あるいは定評のあるライブラリを使ってソートします。

数値や文字列はそのようなライブラリですぐにソートできますが、複数の属性を持つオブジェクトなどを並び替えるためには、オブジェクト同士を比較するための関数やオブジェクト(以下ではComparatorと総称)を用意してからでないとソートできません。C++のSTLにあるsort()にはパラメータを2つ持つ関数を、JavaのCollections.sort()にはインターフェースComparatorを実装するクラスを与えなければなりません(あるいは、オブジェクトにComparableインターフェースを実装させる)。(C++本ウェブアプリ本を参照)

スクリプト系の言語だと、Comparatorを用意しなくても、なんとなく並び替えてくれるので便利です。たとえば、数値と文字列の組からなる以下のような配列を、Pythonなら簡単にソートできます。

>>> data=[(1,"a"), (2, "d"), (1,"c"), (1,"b")]
>>> data.sort()
>>> data
[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'd')]

C++やJavaでは、数値と文字列の組の配列をこんなに簡単にソートすることはできないので、「やっぱスクリプト言語だよね」ということになるのは仕方ありません。

4873113644しかし、この結果をよく見ると、数値だけでなく文字列も考慮して並び替えられています。多くの場合、これはやりすぎです。たとえば、Toby Segaran『集合知プログラミング』(オライリー・ジャパン, 2008)では、映画の評価データ10万件を使った推薦システム(協調フィルタリング)の出力として、次のような例が紹介されています。

5.0 What’s Eating Gilbert Grape (1993)
5.0 Temptress Moon (Feng Yue) (1996)
5.0 Street Fighter (1994)
5.0 Spice World (1997)
5.0 Sliding Doors (1998)
5.0 Shooting Fish (1997)
5.0 Roseanna’s Grave (For Roseanna) (1997)
5.0 Rendezvous in Paris (Rendez-vous de Paris, Les) (1995)
5.0 Reluctant Debutante, The (1958)
5.0 Police Story 4: Project S (Chao ji ji hua) (1993)
5.0 Palmetto (1998)
5.0 New York Cop (1996)
5.0 Love Is All There Is (1996)
5.0 Johns (1996)
5.0 Innocents, The (1961)
5.0 Hollow Reed (1996)

これは、(スコア, タイトル)の組の配列を、ソーティングして反転したものです。反転することによって、スコアの高い物から出力されるようになっているわけですが、並び替えにはタイトルも使われているため、得られる結果はいつも、アルファベット順で後ろにあるものが優先されてしまっています。独自のComparatorを使うとかスコアに乱数を付加するなど、何らかの対策をしてからでないと、このコードを使うことはできないでしょう。

ソーティングはComparatorを定義してやるくらいがちょうどいい、というわけです。

Pythonで亀型全自動お掃除ロボットを作る


「Eclipse 上でPythonのプログラムを書くためのプラグインPyDev」から続く

辻真吾『Pythonスタートブック』(技術評論社, 2010)4774142298を読みました。

この本は、最初に学ぶプログラミング言語がPythonだという人のために書かれた入門書ですが、全10章の最終章つまり第10章はかなりハードでした。とりあえず、初心者は飛ばしていいと思うのですが、私にとっては昔遊んだLOGOを思い出させる懐かしい内容だったので、途中まで自分なりに作ってみました。本文よりはずいぶん手を抜いたつもりですが、それでもまだこの章だけは初心者お断りかもしれません。

参照:24.4 turtle (Python Documentation)

#coding:utf-8
import turtle
import random

class Kame(turtle.Turtle):
    def __init__(self):
        super(Kame, self).__init__()
        self.xmax = 200
        self.ymax = 200
        self.getscreen().setup(2 * self.xmax + 2, 2 * self.ymax + 2, 0, 0) #ウィンドウサイズ
        self.getscreen().screensize(2 * self.xmax, 2 * self.ymax, 'gray') #キャンバスサイズ
        self.pencolor('white')
        self.width(10)
        self.left(random.randint(1, 360))

    def run(self):
        while True:
            self.forward(10)
            if self.xcor() > self.xmax: #右の壁にぶつかった
                self.rotate(90)
            if self.ycor() > self.ymax: #上の壁にぶつかった
                self.rotate(180)
            if self.xcor() < -self.xmax: #左の壁にぶつかった
                self.rotate(270)
            if self.ycor() < -self.ymax: #下の壁にぶつかった
                self.rotate(360)

    def rotate(self, t): #tはheading()の基準を変換するためのパラメータ
        self.undo()
        self.left(t - self.heading() + random.randint(0, 180))

kame = Kame()
kame.run()

Eclipse上でPythonのプログラムを書くためのプラグインPyDev


「なぜPythonなのか」から続く

『Pythonスタートブック』4774142298を読んでいます。コードが単純な初めのうちは、Pythonの特徴を活用した、その場で試せるサンプルが続いていましたが、後の方ではコードが複雑になることもあって、ファイルにコードを書かなければいけなくなりました。オブジェクト指向のコードを一般的なエディタで書くのは私には無理なので、Python開発のためのEclipseプラグインであるPyDevを導入しました。コード補完機能や、エラーの即時表示機能が便利です。

Eclipse上でHelp→「Install New Software」とし、「Work with」に「http://pydev.org/updates」を入力すればインストールできます。