Browse Source

Improved remote thread fetching (#10106)

* Fetch up to 5 replies when discovering a new remote status

This is used for resolving threads downwards. The originating
server must add a “replies” attributes with such replies for it to
be useful.

* Add some tests for ActivityPub::FetchRepliesWorker

* Add specs for ActivityPub::FetchRepliesService

* Serialize up to 5 public self-replies for ActivityPub notes

* Add specs for ActivityPub::NoteSerializer

* Move exponential backoff logic to a worker concern

* Fetch first page of paginated collections when fetching thread replies

* Add specs for paginated collections in replies

* Move Note replies serialization to a first CollectionPage

The collection isn't actually paginable yet as it has no id nor
a `next` field. This may come in another PR.

* Use pluck(:uri) instead of map(&:uri) to improve performances

* Fix fetching replies when they are in a CollectionPage
pull/3/head
ThibG 4 months ago
parent
commit
9d3c6f1849

+ 10
- 0
app/lib/activitypub/activity/create.rb View File

@@ -40,6 +40,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
40 40
     end
41 41
 
42 42
     resolve_thread(@status)
43
+    fetch_replies(@status)
43 44
     distribute(@status)
44 45
     forward_for_reply if @status.public_visibility? || @status.unlisted_visibility?
45 46
   end
@@ -213,6 +214,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
213 214
     ThreadResolveWorker.perform_async(status.id, in_reply_to_uri)
214 215
   end
215 216
 
217
+  def fetch_replies(status)
218
+    collection = @object['replies']
219
+    return if collection.nil?
220
+    replies = ActivityPub::FetchRepliesService.new.call(status, collection, false)
221
+    return if replies.present?
222
+    uri = value_or_id(collection)
223
+    ActivityPub::FetchRepliesWorker.perform_async(status.id, uri) unless uri.nil?
224
+  end
225
+
216 226
   def conversation_from_uri(uri)
217 227
     return nil if uri.nil?
218 228
     return Conversation.find_by(id: OStatus::TagManager.instance.unique_tag_to_local_id(uri, 'Conversation')) if OStatus::TagManager.instance.local_id?(uri)

+ 4
- 0
app/models/concerns/status_threading_concern.rb View File

@@ -11,6 +11,10 @@ module StatusThreadingConcern
11 11
     find_statuses_from_tree_path(descendant_ids(limit, max_child_id, since_child_id, depth), account, promote: true)
12 12
   end
13 13
 
14
+  def self_replies(limit)
15
+    account.statuses.where(in_reply_to_id: id, visibility: [:public, :unlisted]).reorder(id: :asc).limit(limit)
16
+  end
17
+
14 18
   private
15 19
 
16 20
   def ancestor_ids(limit)

+ 1
- 1
app/presenters/activitypub/collection_presenter.rb View File

@@ -1,5 +1,5 @@
1 1
 # frozen_string_literal: true
2 2
 
3 3
 class ActivityPub::CollectionPresenter < ActiveModelSerializers::Model
4
-  attributes :id, :type, :size, :items, :part_of, :first, :last, :next, :prev
4
+  attributes :id, :type, :size, :items, :page, :part_of, :first, :last, :next, :prev
5 5
 end

+ 3
- 2
app/serializers/activitypub/collection_serializer.rb View File

@@ -7,7 +7,8 @@ class ActivityPub::CollectionSerializer < ActiveModel::Serializer
7 7
     super
8 8
   end
9 9
 
10
-  attributes :id, :type
10
+  attribute :id, if: -> { object.id.present? }
11
+  attribute :type
11 12
   attribute :total_items, if: -> { object.size.present? }
12 13
   attribute :next, if: -> { object.next.present? }
13 14
   attribute :prev, if: -> { object.prev.present? }
@@ -37,6 +38,6 @@ class ActivityPub::CollectionSerializer < ActiveModel::Serializer
37 38
   end
38 39
 
39 40
   def page?
40
-    object.part_of.present?
41
+    object.part_of.present? || object.page.present?
41 42
   end
42 43
 end

+ 13
- 0
app/serializers/activitypub/note_serializer.rb View File

@@ -13,6 +13,8 @@ class ActivityPub::NoteSerializer < ActiveModel::Serializer
13 13
   has_many :media_attachments, key: :attachment
14 14
   has_many :virtual_tags, key: :tag
15 15
 
16
+  has_one :replies, serializer: ActivityPub::CollectionSerializer
17
+
16 18
   def id
17 19
     ActivityPub::TagManager.instance.uri_for(object)
18 20
   end
@@ -33,6 +35,17 @@ class ActivityPub::NoteSerializer < ActiveModel::Serializer
33 35
     { object.language => Formatter.instance.format(object) }
34 36
   end
35 37
 
38
+  def replies
39
+    ActivityPub::CollectionPresenter.new(
40
+      type: :unordered,
41
+      first: ActivityPub::CollectionPresenter.new(
42
+        type: :unordered,
43
+        page: true,
44
+        items: object.self_replies(5).pluck(:uri)
45
+      )
46
+    )
47
+  end
48
+
36 49
   def language?
37 50
     object.language.present?
38 51
   end

+ 60
- 0
app/services/activitypub/fetch_replies_service.rb View File

@@ -0,0 +1,60 @@
1
+# frozen_string_literal: true
2
+
3
+class ActivityPub::FetchRepliesService < BaseService
4
+  include JsonLdHelper
5
+
6
+  def call(parent_status, collection_or_uri, allow_synchronous_requests = true)
7
+    @account = parent_status.account
8
+    @allow_synchronous_requests = allow_synchronous_requests
9
+
10
+    @items = collection_items(collection_or_uri)
11
+    return if @items.nil?
12
+
13
+    FetchReplyWorker.push_bulk(filtered_replies)
14
+
15
+    @items
16
+  end
17
+
18
+  private
19
+
20
+  def collection_items(collection_or_uri)
21
+    collection = fetch_collection(collection_or_uri)
22
+    return unless collection.is_a?(Hash)
23
+
24
+    collection = fetch_collection(collection['first']) if collection['first'].present?
25
+    return unless collection.is_a?(Hash)
26
+
27
+    case collection['type']
28
+    when 'Collection', 'CollectionPage'
29
+      collection['items']
30
+    when 'OrderedCollection', 'OrderedCollectionPage'
31
+      collection['orderedItems']
32
+    end
33
+  end
34
+
35
+  def fetch_collection(collection_or_uri)
36
+    return collection_or_uri if collection_or_uri.is_a?(Hash)
37
+    return unless @allow_synchronous_requests
38
+    return if invalid_origin?(collection_or_uri)
39
+    collection = fetch_resource_without_id_validation(collection_or_uri)
40
+    raise Mastodon::UnexpectedResponseError if collection.nil?
41
+    collection
42
+  end
43
+
44
+  def filtered_replies
45
+    # Only fetch replies to the same server as the original status to avoid
46
+    # amplification attacks.
47
+
48
+    # Also limit to 5 fetched replies to limit potential for DoS.
49
+    @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(5)
50
+  end
51
+
52
+  def invalid_origin?(url)
53
+    return true if unsupported_uri_scheme?(url)
54
+
55
+    needle   = Addressable::URI.parse(url).host
56
+    haystack = Addressable::URI.parse(@account.uri).host
57
+
58
+    !haystack.casecmp(needle).zero?
59
+  end
60
+end

+ 12
- 0
app/workers/activitypub/fetch_replies_worker.rb View File

@@ -0,0 +1,12 @@
1
+# frozen_string_literal: true
2
+
3
+class ActivityPub::FetchRepliesWorker
4
+  include Sidekiq::Worker
5
+  include ExponentialBackoff
6
+
7
+  sidekiq_options queue: 'pull', retry: 3
8
+
9
+  def perform(parent_status_id, replies_uri)
10
+    ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri)
11
+  end
12
+end

+ 11
- 0
app/workers/concerns/exponential_backoff.rb View File

@@ -0,0 +1,11 @@
1
+# frozen_string_literal: true
2
+
3
+module ExponentialBackoff
4
+  extend ActiveSupport::Concern
5
+
6
+  included do
7
+    sidekiq_retry_in do |count|
8
+      15 + 10 * (count**4) + rand(10 * (count**4))
9
+    end
10
+  end
11
+end

+ 12
- 0
app/workers/fetch_reply_worker.rb View File

@@ -0,0 +1,12 @@
1
+# frozen_string_literal: true
2
+
3
+class FetchReplyWorker
4
+  include Sidekiq::Worker
5
+  include ExponentialBackoff
6
+
7
+  sidekiq_options queue: 'pull', retry: 3
8
+
9
+  def perform(child_url)
10
+    FetchRemoteStatusService.new.call(child_url)
11
+  end
12
+end

+ 1
- 4
app/workers/thread_resolve_worker.rb View File

@@ -2,13 +2,10 @@
2 2
 
3 3
 class ThreadResolveWorker
4 4
   include Sidekiq::Worker
5
+  include ExponentialBackoff
5 6
 
6 7
   sidekiq_options queue: 'pull', retry: 3
7 8
 
8
-  sidekiq_retry_in do |count|
9
-    15 + 10 * (count**4) + rand(10 * (count**4))
10
-  end
11
-
12 9
   def perform(child_status_id, parent_url)
13 10
     child_status  = Status.find(child_status_id)
14 11
     parent_status = FetchRemoteStatusService.new.call(parent_url)

+ 44
- 0
spec/serializers/activitypub/note_spec.rb View File

@@ -0,0 +1,44 @@
1
+# frozen_string_literal: true
2
+
3
+require 'rails_helper'
4
+
5
+describe ActivityPub::NoteSerializer do
6
+  let!(:account) { Fabricate(:account) }
7
+  let!(:other)   { Fabricate(:account) }
8
+  let!(:parent)  { Fabricate(:status, account: account, visibility: :public) }
9
+  let!(:reply1)  { Fabricate(:status, account: account, thread: parent, visibility: :public) }
10
+  let!(:reply2)  { Fabricate(:status, account: account, thread: parent, visibility: :public) }
11
+  let!(:reply3)  { Fabricate(:status, account: other, thread: parent, visibility: :public) }
12
+  let!(:reply4)  { Fabricate(:status, account: account, thread: parent, visibility: :public) }
13
+  let!(:reply5)  { Fabricate(:status, account: account, thread: parent, visibility: :direct) }
14
+
15
+  before(:each) do
16
+    @serialization = ActiveModelSerializers::SerializableResource.new(parent, serializer: ActivityPub::NoteSerializer, adapter: ActivityPub::Adapter)
17
+  end
18
+
19
+  subject { JSON.parse(@serialization.to_json) }
20
+
21
+  it 'has a Note type' do
22
+    expect(subject['type']).to eql('Note')
23
+  end
24
+
25
+  it 'has a replies collection' do
26
+    expect(subject['replies']['type']).to eql('Collection')
27
+  end
28
+
29
+  it 'has a replies collection with a first Page' do
30
+    expect(subject['replies']['first']['type']).to eql('CollectionPage')
31
+  end
32
+
33
+  it 'includes public self-replies in its replies collection' do
34
+    expect(subject['replies']['first']['items']).to include(reply1.uri, reply2.uri, reply4.uri)
35
+  end
36
+
37
+  it 'does not include replies from others in its replies collection' do
38
+    expect(subject['replies']['first']['items']).to_not include(reply3.uri)
39
+  end
40
+
41
+  it 'does not include replies with direct visibility in its replies collection' do
42
+    expect(subject['replies']['first']['items']).to_not include(reply5.uri)
43
+  end
44
+end

+ 122
- 0
spec/services/activitypub/fetch_replies_service_spec.rb View File

@@ -0,0 +1,122 @@
1
+require 'rails_helper'
2
+
3
+RSpec.describe ActivityPub::FetchRepliesService, type: :service do
4
+  let(:actor)          { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account') }
5
+  let(:status)         { Fabricate(:status, account: actor) }
6
+  let(:collection_uri) { 'http://example.com/replies/1' }
7
+
8
+  let(:items) do
9
+    [
10
+      'http://example.com/self-reply-1',
11
+      'http://example.com/self-reply-2',
12
+      'http://example.com/self-reply-3',
13
+      'http://other.com/other-reply-1',
14
+      'http://other.com/other-reply-2',
15
+      'http://other.com/other-reply-3',
16
+      'http://example.com/self-reply-4',
17
+      'http://example.com/self-reply-5',
18
+      'http://example.com/self-reply-6',
19
+    ]
20
+  end
21
+
22
+  let(:payload) do
23
+    {
24
+      '@context': 'https://www.w3.org/ns/activitystreams',
25
+      type: 'Collection',
26
+      id: collection_uri,
27
+      items: items,
28
+    }.with_indifferent_access
29
+  end
30
+
31
+  subject { described_class.new }
32
+
33
+  describe '#call' do
34
+    context 'when the payload is a Collection with inlined replies' do
35
+      context 'when passing the collection itself' do
36
+        it 'spawns workers for up to 5 replies on the same server' do
37
+          allow(FetchReplyWorker).to receive(:push_bulk)
38
+          subject.call(status, payload)
39
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
40
+        end
41
+      end
42
+
43
+      context 'when passing the URL to the collection' do
44
+        before do
45
+          stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
46
+        end
47
+
48
+        it 'spawns workers for up to 5 replies on the same server' do
49
+          allow(FetchReplyWorker).to receive(:push_bulk)
50
+          subject.call(status, collection_uri)
51
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
52
+        end
53
+      end
54
+    end
55
+
56
+    context 'when the payload is an OrderedCollection with inlined replies' do
57
+      let(:payload) do
58
+        {
59
+          '@context': 'https://www.w3.org/ns/activitystreams',
60
+          type: 'OrderedCollection',
61
+          id: collection_uri,
62
+          orderedItems: items,
63
+        }.with_indifferent_access
64
+      end
65
+
66
+      context 'when passing the collection itself' do
67
+        it 'spawns workers for up to 5 replies on the same server' do
68
+          allow(FetchReplyWorker).to receive(:push_bulk)
69
+          subject.call(status, payload)
70
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
71
+        end
72
+      end
73
+
74
+      context 'when passing the URL to the collection' do
75
+        before do
76
+          stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
77
+        end
78
+
79
+        it 'spawns workers for up to 5 replies on the same server' do
80
+          allow(FetchReplyWorker).to receive(:push_bulk)
81
+          subject.call(status, collection_uri)
82
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
83
+        end
84
+      end
85
+    end
86
+
87
+    context 'when the payload is a paginated Collection with inlined replies' do
88
+      let(:payload) do
89
+        {
90
+          '@context': 'https://www.w3.org/ns/activitystreams',
91
+          type: 'Collection',
92
+          id: collection_uri,
93
+          first: {
94
+            type: 'CollectionPage',
95
+            partOf: collection_uri,
96
+            items: items,
97
+          }
98
+        }.with_indifferent_access
99
+      end
100
+
101
+      context 'when passing the collection itself' do
102
+        it 'spawns workers for up to 5 replies on the same server' do
103
+          allow(FetchReplyWorker).to receive(:push_bulk)
104
+          subject.call(status, payload)
105
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
106
+        end
107
+      end
108
+
109
+      context 'when passing the URL to the collection' do
110
+        before do
111
+          stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
112
+        end
113
+
114
+        it 'spawns workers for up to 5 replies on the same server' do
115
+          allow(FetchReplyWorker).to receive(:push_bulk)
116
+          subject.call(status, collection_uri)
117
+          expect(FetchReplyWorker).to have_received(:push_bulk).with(['http://example.com/self-reply-1', 'http://example.com/self-reply-2', 'http://example.com/self-reply-3', 'http://example.com/self-reply-4', 'http://example.com/self-reply-5'])
118
+        end
119
+      end
120
+    end
121
+  end
122
+end

+ 40
- 0
spec/workers/activitypub/fetch_replies_worker_spec.rb View File

@@ -0,0 +1,40 @@
1
+# frozen_string_literal: true
2
+
3
+require 'rails_helper'
4
+
5
+describe ActivityPub::FetchRepliesWorker do
6
+  subject { described_class.new }
7
+
8
+  let(:account) { Fabricate(:account, uri: 'https://example.com/user/1') }
9
+  let(:status)  { Fabricate(:status, account: account) }
10
+
11
+  let(:payload) do
12
+    {
13
+      '@context': 'https://www.w3.org/ns/activitystreams',
14
+      id: 'https://example.com/statuses_replies/1',
15
+      type: 'Collection',
16
+      items: [],
17
+    }
18
+  end
19
+
20
+  let(:json) { Oj.dump(payload) }
21
+
22
+  describe 'perform' do
23
+    it 'performs a request if the collection URI is from the same host' do
24
+      stub_request(:get, 'https://example.com/statuses_replies/1').to_return(status: 200, body: json)
25
+      subject.perform(status.id, 'https://example.com/statuses_replies/1')
26
+      expect(a_request(:get, 'https://example.com/statuses_replies/1')).to have_been_made.once
27
+    end
28
+
29
+    it 'does not perform a request if the collection URI is from a different host' do
30
+      stub_request(:get, 'https://other.com/statuses_replies/1').to_return(status: 200)
31
+      subject.perform(status.id, 'https://other.com/statuses_replies/1')
32
+      expect(a_request(:get, 'https://other.com/statuses_replies/1')).to_not have_been_made
33
+    end
34
+
35
+    it 'raises when request fails' do
36
+      stub_request(:get, 'https://example.com/statuses_replies/1').to_return(status: 500)
37
+      expect { subject.perform(status.id, 'https://example.com/statuses_replies/1') }.to raise_error Mastodon::UnexpectedResponseError
38
+    end
39
+  end
40
+end

Loading…
Cancel
Save