node.jsでTwitter Streaming APIを利用する方法


ntwitterを使って簡単に実現できます(他にもいろいろあるかもしれませんが、最初に試したのがntwitterでした)。

取得した情報をコンソールに出力するだけの簡単な例を作ってみましょう。

まず、Twitter Developerでアプリを登録し、consumer_keyconsumer_secretaccess_token_keyaccess_token_secretを取得します。

node.jsをインストールします。

sudo apt-get install git npm # Ubuntu 12.04
sudo apt-get install git npm nodejs-legacy # Ubuntu 12.10

ntwitterをインストールします。

npm install ntwitter

次のようなJavaScriptファイルを作ります(server.jsとします)。consumer_keyconsumer_secretaccess_token_keyaccess_token_secretの部分は適切なもので埋めます。

var twitter = require('ntwitter');
var tw = new twitter({
  consumer_key: '',
  consumer_secret: '',
  access_token_key: '',
  access_token_secret: ''
});

tw.stream('statuses/sample', function(stream) {
  stream.on('data', function (data) {
    console.log(data);
  });
});

実行します。

node server.js

つぶやきを受信すると、「console.log(data)」を実行し、JSON形式で表示します。

続き:Twitter Streaming APIで取得したツイートをブラウザにプッシュする方法

Twitter4Jでつぶやく練習


拙著『Webアプリケーション構築入門』では、「Webアプリを作れるようになる前に、Webアプリを使えるようになるのが大事」という方針のもと、比較的早い段階で、JavaとPHP、JavaScriptでウェブAPIを利用する方法を紹介しています。

そこで採用したウェブAPIの一つがTwitter APIですが、入門書であることと、紙面が限られていることを理由に、OAuth認証なしでも使えるPublic Timelineだけしか紹介しませんでした。

OAuth認証を使ってつぶやく方法は、このブログで以前紹介しましたが(該当記事)、その後、Mavenを紹介したり(該当記事)、Twitterのサイト上で自分用のトークンを作れるようになったりしたので、「プログラムからつぶやく方法」を改めて書いておきましょう。

Twitterアプリの準備

  1. まず、Twitter DevelopersにTwitterアカウントでログインし、「Create a new app」をクリック、アプリケーションを登録します(コールバックURLはあとで変えられるので適当に入れておけばいいでしょう。今回は使いませんが)。
  2. つぶやくためにはWrite権限が必要なので、SettingsタブでApplication Typeを「Read & Write」に変更します。
  3. Detailsタブに戻って、「Create my access token」をクリックし、Access tokenとAccess token secretを生成します。このページに書かれているConsumer keyとConsumer secretも使います。

Mavenプロジェクトの作成

Mavenプロジェクトを作成し、pom.xmlのdependencies要素の中に次のように記述します(TwitterのためのライブラリであるTwitter4Jを使います)。(参考:『Webアプリケーション構築入門』のウェブアプリをMavenで管理する方法

<dependency>
  <groupId>org.twitter4j</groupId>
  <artifactId>twitter4j-core</artifactId>
  <version>[2.2,)</version>
</dependency>

つぶやくためのコード

つぎのようなコードでつぶやきます。

import twitter4j.*;
import twitter4j.conf.*;

public class Tweet {

  public static void main(String[] args) throws Exception {
    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true)
            .setOAuthConsumerKey(**Consumer key**)
            .setOAuthConsumerSecret(**Consumer secret**)
            .setOAuthAccessToken(**Access token**)
            .setOAuthAccessTokenSecret(**Access token secret**);
    TwitterFactory tf = new TwitterFactory(cb.build());
    Twitter twitter = tf.getInstance();
    twitter.updateStatus("テスト at " + (new java.util.Date()));
  }
}

Streaming APIで取得したつぶやきの処理方法


かつて、Streaming APIで大量のつぶやきをリアルタイムに「保存する方法」を紹介しました(cURL編Python編)。つぶやきはJSON形式で保存していましたが、そこから何かを見出すためにはまず、JSON形式のデータを処理できなければなりません。ここではその方法を確認します。

Pythonで簡単に処理する方法を、@nokunoさんが紹介しています(Twitter Streaming APIの使い方)。cURLで取得したつぶやきは、1行に1つずつファイルに格納されるので、次のように1行ずつ取り出してJSONデータをパースすればいいようです(ここではtweet['text']、つまりつぶやきの本文だけを取り出しています)。

#!/usr/bin/env python
import sys, json
for line in sys.stdin:
    tweet = json.loads(line)
    if 'text' in tweet:
        print tweet['text'].encode('utf-8')

cURL編Python編でつぶやきを保存したファイルがresult.datだとすると、「python parse.py < ressult.dat」などとして本文だけを取り出すことができそうです。

Streaming APIのsample.jsonならこれでもいいのですが、filter.jsonを使うときには、条件に合うつぶやきが途絶えることがあります。そういうときには、30秒ごとにCR LFが送信されてきて、それがファイルに書かれることになるので、上のように1行ずつ処理しようとすると、ValueErrorが発生して処理が途中で止まってしまいます(cURLの場合)。ですから、次のようにValueErrorを無視しなければなりません(PythonのコードではJSON形式だけを出力していたので問題ありません)。追記:Streaming APIが最初にfriendsを返すようになったので、それを無視するために、StandardErrorを無視することにしました。

#!/usr/bin/env python
import sys, json
#from pprint import pprint

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        #print type(tweet['text'])
        print tweet['text'].encode('utf-8')
    except StandardError:
        pass

入力がstr型でも出力の一部はunicode型になったりするのが紛らわしい。Pythonを使うのはバージョン3が主流になってからにしたいですね、細かいことですが。

つぶやかれた日時はtweet['created_at']で取得できるのですが(このあたりのことはpprint()で確認できます)、この値は「Thu Dec 29 04:27:28 +0000 2011」のように標準時なので、日本ではちょっと使いにくいです。そこで、日本時間の年月日時分秒(20111231000000)のような表現に変換する関数を用意します。

#!/usr/bin/env python
import sys, json, time, calendar
#from pprint import pprint

def YmdHMS(created_at):
    time_utc = time.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
    unix_time = calendar.timegm(time_utc)
    time_local = time.localtime(unix_time)
    return int(time.strftime("%Y%m%d%H%M%S", time_local))

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        print YmdHMS(tweet['created_at']), tweet['text'].encode('utf-8')
    except StandardError:
        pass

リツイートを除外したいときは、次のようにすればいいでしょう。

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        if 'retweeted_status' not in tweet:
            print YmdHMS(tweet['created_at']), tweet['text'].encode('utf-8')
    except StandardError:
        pass

cURL編で書いたように、こういうことをしているのは、Ustreamなどで配信された動画の整理のためなので、保存したつぶやきは、時間を指定してとりだせるようにしておきましょう。

以下のようなスクリプト(parse.py)を使えば、「python parse.py 20111230150000 20111230174600 < result.dat」などとして、日本時間の2011年12月30日15時から2011年12月30日17時46分までのつぶやきだけを取得できます(日時の指定は省略してもかまいません)。

#!/usr/bin/env python
import sys, json, time, calendar
#from pprint import pprint

def YmdHMS(created_at):
    time_utc = time.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y')
    unix_time = calendar.timegm(time_utc)
    time_local = time.localtime(unix_time)
    return int(time.strftime("%Y%m%d%H%M%S", time_local))

argv = sys.argv
start_time = 0
end_time = 99999999999999
if 1 < len(argv):
    start_time = int(argv[1])
    end_time = int(argv[2])

for line in sys.stdin:
    try:
        tweet = json.loads(line)
        #pprint(tweet)
        if 'retweeted_status' not in tweet:
            tweet_time = YmdHMS(tweet['created_at'])
            if start_time <= tweet_time and tweet_time <= end_time:
                tweet_sec = tweet_time-start_time
                screen_name = tweet['user']['screen_name']
                text = tweet['text'].encode('utf-8')
                url = "https://twitter.com/#!/%s/status/%s"\
                    % (screen_name, tweet['id_str'])
                #print tweet_sec, url, text
                #print text
                t = time.strptime(str(tweet_time), "%Y%m%d%H%M%S")
                print time.strftime("%H:%M:%S", t), text
    except StandardError:
        pass

どのような形式で出力するかは、後で何に使いたいかで決まります。たとえばyouTube上でキャプションを付けたいなら、YouTubeがサポートする形式(参考)にあわせるといいでしょう。

実際に、「#iwakamiyasumi」というハッシュタグで検索し保存したつぶやきのうち、「年末特番 ジャーナリスト休業直前!上杉隆氏ラストインタビュー」の配信時間中(2011年12月30日15:00から17:45)のものを抜き出すと下のようになります(スクロールバーを表示できない環境では閲覧できないかもしれません)。

Streaming APIで大量のつぶやきをリアルタイムに保存する方法(Python編)


前提:Twitter Developersでアプリのconsumer_key等を取得しておく必要があります。詳しく知りたい人は「Twitter OAuth」などを調べるといいでしょう。

なぜこんな技術が必要なのかは、cURL編で書きました。

Streaming APIを使うときは、接続が切れてしまったときにいかに接続し直すかというのがキモなわけですが、cURLで簡単に実現する方法がわからなかったので、240秒ごとに強制的に接続し直すという方法を採用しました。この方法には、(1)再接続のときにつぶやきを取りこぼす危険と重複して取得する可能性があること、(2)接続が切れてから再接続を試みるまでに平均で120秒かかる、という問題がありました。

Streaming APIは、つぶやきが無いときには「CR LF」を30秒おきに送信してくるので、これをチェックすれば切断を検出できるのですが、検出と再接続の機能を自分で実装するのは大変なので、ライブラリに頼りましょう。

ここでは、PythonでTwitterを利用するためのライブラリ、Tweepyを使います。Ubuntuなら次のように簡単にインストールできます。

sudo apt-get install python-setuptools python-pip
sudo easy_install tweepy

CentOSなら、suで管理者になってから、

yum install python-setuptools
easy_install tweepy

Windowsでやりたいときは「Windows Python easy_install」などを調べてください。

Tweepyは切断の検出と再接続の機能を持っているので、Streaming APIで受け取ったデータを出力するプログラム(stream.py)は、サンプルを参考にして次のように簡単に書けます。StreamListenerで、つぶやきに対応するon_status()ではなくon_data()をオーバーライドしているのは、データをそのまま(JSON形式で)出力したいからです。とりあえず全部保存しておいて、必要なものをあとで取り出すというわけです。JSON以外のものがdataとして来る場合もあるので、最初が「{」のものだけをJSONデータと見なして出力するようにしています(ちょっとかっこわるい)。

# -*- coding: utf-8 -*-

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream

consumer_key = ""#引用符の中にconsumer_keyの情報を記述する
consumer_secret = ""#引用符の中にconsumer_secretの情報を記述する

access_token = ""#引用符の中にaccess_tokenの情報を記述する
access_token_secret = ""#引用符の中にaccess_token_secretの情報を記述する

class StdOutListener(StreamListener):
    def on_data(self, data):
        if data.startswith("{"):
            print data
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    l = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    stream = Stream(auth, l)
    stream.filter(track = [keyword])#検索する場合
#    stream.sample()#ツイートのランダムサンプリングを取得する場合
#    stream.userstream()#タイムラインを取得する場合

consumer_keyとconsumer_secret、access_token、access_token_secret、つぶやきを絞り込むためのキーワード(例:「track = "Japan,USA"」、「track = U"日本"」。OR条件はカンマ、AND条件はスペースで連結)を入力して、「python stream.py >> result.dat」などとして実行すれば、条件に合うつぶやきがresult.datに書き込まれます。

「せっかくPython使っているんだから、この場でパースして・・・」と思うかもしれませんが、後で何が必要になるかわからないので、とりあえず生データを保存しておくのがいいでしょう(リアルタイムでアウトプットしたいという場合は別です)。ファイルではなくデータベースに保存してもいいのですが、話が複雑になるのでやめておきます(データベースを使う場合でも、後で何が必要になるかわからないので、全データを保存しておくのがいいでしょう)。

再接続できるからといって、通信の不安定な場所での利用はおすすめできません。通信の安定した環境に置いたサーバで動かしっぱなしにするのがいいでしょう。何らかの原因でプロセス自体が落ちたときのために、以下のようなスクリプトを使うといいかもしれません(通常の切断ではプロセスは落ちません)。

#!/usr/bin/env bash

while :
do
  python stream.py >> result.dat
  sleep 240
done

Streaming APIで取得したつぶやきの処理方法

Streaming APIで大量のつぶやきをリアルタイムに保存する方法(cURL編)


つぶやきを大量に取得したいときには、TwitterのふつうのAPIではなく、Streaming APIを使います。ふつうのAPIは1時間あたり350回しか使えないので、対象となるつぶやきが、APIを目一杯使って取得できる数を超えるとどうしようもありません。Streaming APIならこのような心配は無用で、条件を設定して一度接続すれば、設定した条件に合うつぶやきを大量に取得できます。厳密に言えば、これは程度の問題でしかなく、Streaming APIでも、すべてのつぶやきを取得できるわけではないのですが、ここでは、「ふつうのAPIに比べれば遙かにたくさん取得できる」と考えてください。

cURLを使うのが簡単です。次のようなシェルスクリプト(stream.sh)を作成し、「bash stream.sh >> result.dat」などとして実行すれば、指定したキーワードを持つつぶやきを、result.datに保存できます(「sudo apt-get install curl」などとしてcURLをインストールする必要があるかもしれません)。

キーワードはURLエンコーディングが必要です(ハッシュは「%23」、OR検索のためのカンマは「%2C」、AND検索のためのスペースは「%20」とすればいいのですが、日本語は現時点では難しそうです)。#iwakamiyasumiと#iwakamiyasumi2のどちらかを含むつぶやきを取得したいときは「%23iwakamiyasumi%2C%23iwakamiyasumi2」となります。

#!/usr/bin/env bash

username=Twitterのユーザ名
password=Twitterのパスワード
keyword=つぶやきをしぼりこむためのキーワード(URLエンコーディングが必要)

url=https://stream.twitter.com/1/statuses/filter.json
while :
do
  curl -d track=$keyword -m 240 -u $username:$password $url
done

240秒ごとにcURLの接続をリセットしています。この方法には、途中で接続が切れても最大240秒待てば自動的に再接続されるというメリットがありますが、リセット時につぶやきを取りこぼす危険や、つぶやきを重複して取得する可能性があります。この間隔(240)を小さくすると、Twitterから接続を断られるかもしれないので注意して下さい。この数字は再接続の最大待ち時間です(参考:Streaming API Concepts)。つぶやきがないときは、30秒ごとに「CR LF」が送信されてくるので、これをチェックすれば切断を検出できるのですが、自分で実装するのは大変なので、あとで別の方法を紹介しましょう。(30秒で2バイトなので、cURLの「-y」や「-Y」では対応できないのが残念です。)

取りこぼしを少なくするためにこのスクリプトは、インターネット環境が不安定な出先ではなく、インターネット環境が安定したサーバで動かしたままにしておくといいでしょう。とりあえず全部取っておいて、あとで必要な部分だけ抜き出すというわけです(抜き出し方はあとで紹介しましょう)。

以下の2点については後で書きます。

  • 接続が切れたときにスマートに再接続する方法Python編で紹介しました)
  • 保存したデータから必要な部分だけを取り出す方法書きました

なぜこういう技術が必要なのでしょう。

Ustreamのような動画配信サービスが登場し、それを活用するジャーナリストの活躍を見ると、「記者クラブメディアが諸悪の根源だ」という状況も少しずつ改善するのではないか、と期待してしまいます(最近の面白かった映像は「読売新聞記者vs上杉隆氏と岩上安身さん」)。たとえば、岩上安身(@iwakamiyasumi)さんが率いるIndependent Web Journal (IWJ)のおかげで私たちは、これまでなら記者クラブメディアの能力不足とフィルタリングのせいで人目に触れずに消えていたであろう映像を見ることができます。

しかし、IWJのUstreamスケジュールを見てもわかるように、その量は膨大で、全部見るのはほとんど不可能です。自分にとって重要な映像がすぐに見つかるようになっていればいいのですが、現在の映像検索技術では難しいでしょう。よくやられるのは映像に付与されたメタ情報(タイトルやタグ)などを使って検索することですが、タイトルはともかく、タグを適切に付けるのもかなりの労力だと思います(Togetterもしかり)。ソーシャルメディア上で推薦システムを動かして・・・ということも考えられますが、自分が見た映像の記録がすべて保存され、活用されるのをいやがる人もいるかもしれません(話がそれました)。

幸運なことに、著名なジャーナリストが配信する映像は、一般的な映像とは違います。特に記者会見の映像には、「tsudaる」というボランティア行為による膨大な文字情報が付随します。これを活用するために、まずはその文字情報を保存しよう、というのが今日のお話。

Ustreamには、動画に対する複数のソーシャルメディアからのコメントをまとめて表示する「ソーシャルコメント」という機能があるのですが、とりあえず、Twitterのつぶやきだけを保存することにします。こういうことの大切さが理解され、Ustreamが保存してあるソーシャルコメント()を使いやすい形で提供してくれることを願います。